rabbitmq(7)通配符模式

1、模型

topic实际是direct模式的升级版,topic模式下消息可以通过routingKey匹配的方式对应到消费者。
匹配规则如下:#(井号)可以匹配零个或多个单词,*(星号)可以匹配一个单词。

这里写图片描述

2、代码

2.1、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
* 申明交换机, topic
*/
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String message = "Hello world";

/*
* routingKey不是任意的,必须是由点分隔的单词列表,例如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",且最多255个字节
* topic实际是direct模式的升级版,topic模式下消息可以通过routingKey匹配的方式对应到消费者
* 匹配规则如下:#(井号)可以匹配零个或多个单词,*(星号)可以匹配一个单词
* 例如:key1.key2可以匹配key1.*和key1.key2.#,但不能匹配#.key3
*/
String routingKey = "key1.key2";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}

2.2、消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, EXCHANGE_NAME, "key1.*");

System.out.println(" [1] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1] Received '" + message + "'");

channel.basicAck(envelope.getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
}

2.2、消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();

// 一个消费者可以绑定多个routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "#.key3");

System.out.println(" [2] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2] Received '" + message + "'");

channel.basicAck(envelope.getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
}
>