rabbitmq(8)事务

1、概述

rabbitmq的事务机制可以保证生产者发送的消息成功传递到broker。如果在事务提交前由于本地或broker的问题发生异常,则可以回滚事务。

2、代码

2.1、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private final static String QUEUE_NAME = "hello";

@Test
public void testSend() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

/**
* 事务模式吞吐量低
*/
try {
// 开启事务
channel.txSelect();

String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

/**
* 如果在提交事务前发生异常,则通过回滚事务,消费者将不会接收到该消息
*/

// 提交事务
channel.txCommit();
} catch (Exception e) {
// 回滚事务
channel.txRollback();
e.printStackTrace();
}

channel.close();
connection.close();
}

2.2、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.7");
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
>