rabbitmq(4)消息队列(公平分发)

1、目标

实现rabbitmq将消息优先发给闲置的消费者。

2、代码

2.1、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final static String QUEUE_NAME = "hello";

@Test
public void testSend() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for(int i=0;i<20;i++) {
String message = "Hello "+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [1] Sent '" + message + "'");
}

channel.close();
connection.close();
}

2.2、消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

// 消费者最多接受1条消息,直到应答后接受新消息。保证rabbitmq每次将消息发送给闲置的消费者
int prefetchCount = 1;
channel.basicQos(prefetchCount);

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [1] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1] Received '" + message + "'");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/*
* 使用公平分发,必须关闭自动应答,使用手动应答
* 当消费者应答rabbitmq后,rabbitmq将删除该消息
* 保证即时消费者接受消息后(未应答)中断,rabbitmq也会将该消息发送给其他消费者而不会出现消息丢失的问题
*/
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}

2.2、消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

int prefetchCount = 1;
channel.basicQos(prefetchCount);

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [2] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2] Received '" + message + "'");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
>