rabbitmq(9)延时消息

1、概述

rabbitmq没有直接支持延时消息的功能,但可以通过死信队列实现延时消息的功能。

2、代码

2.1、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final static String QUEUE_NAME = "MAIN_QUEUE";
private final static String _DIRECT_NAME = "_delay_delay";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", QUEUE_NAME);
channel.queueDeclare(_DIRECT_NAME, true, false, false, arguments);

// 延迟5秒
long delay = 5*1000;

String message = "Hello World!";
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration(delay+"").build();
channel.basicPublish("", _DIRECT_NAME, properties, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}

2.2、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final static String QUEUE_NAME = "MAIN_QUEUE";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
>