RocketMQ学习(八):事务消息

源代码版本是3.2.6,还是直接跑源代码。rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交。

二阶段提交过程看图:



第一阶段是:步骤1,2,3。
第二阶段是:步骤4,5。

具体说明:

只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交。

其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚。

事务消息原理实现过程:

一阶段:
Producer向Broker发送1条类型为TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通过该变量可以直接定位到消息本身),由于该类型的消息在保存的时候,commitLogOffset没有被保存到consumerQueue中,此时客户端通过consumerQueue取不到commitLogOffset,所以该类型的消息无法被取到,导致不会被消费。

一阶段的过程中,Broker保存了1条消息。

二阶段:
Producer端的TransactionExecuterImpl执行本地操作,返回本地事务的状态,然后发送一条类型为TransactionCommitType或者TransactionRollbackType的消息到Broker确认提交或者回滚,Broker通过Request中的commitLogOffset,获取到上面状态为TransactionPreparedType的消息(简称消息A),然后重新构造一条与消息A内容相同的消息B,设置状态为TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType类型的,会放commitLogOffset到consumerQueue中,TransactionRollbackType类型的,消息体设置为空,不会放commitLogOffset到consumerQueue中。

二阶段的过程中,Broker也保存了1条消息。

总结:事务消息过程中,broker一共保存2条消息。

贴代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29


<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<logback.version>1.0.13</logback.version>

<rocketmq.version>3.2.6</rocketmq.version>

</properties>

<dependencies>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>${rocketmq.version}</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

TransactionCheckListenerImpl.java


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34


package com.zoo.quickstart.transaction;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;

import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* 未决事务,服务器回查客户端,broker端发起请求代码没有被调用,所以此处代码可能没用。

*/

public class TransactionCheckListenerImpl implements TransactionCheckListener {

private AtomicInteger transactionIndex = new AtomicInteger(0);

@Override

public LocalTransactionState checkLocalTransactionState(MessageExt msg) {

System.out.println("server checking TrMsg " + msg.toString());

int value = transactionIndex.getAndIncrement();

if ((value % 6) == 0) {

throw new RuntimeException("Could not find db");

}

else if ((value % 5) == 0) {

return LocalTransactionState.ROLLBACK_MESSAGE;

}

else if ((value % 4) == 0) {

return LocalTransactionState.COMMIT_MESSAGE;

}

return LocalTransactionState.UNKNOW;

}

}

本地操作类TransactionExecuterImpl.java


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32


package com.zoo.quickstart.transaction;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;

import com.alibaba.rocketmq.common.message.Message;

/**

* 执行本地事务

*/

public class TransactionExecuterImpl implements LocalTransactionExecuter {

private AtomicInteger transactionIndex = new AtomicInteger(1);

@Override

public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

int value = transactionIndex.getAndIncrement();

if (value == 0) {

throw new RuntimeException("Could not find db");

}

else if ((value % 5) == 0) {

return LocalTransactionState.ROLLBACK_MESSAGE;

}

else if ((value % 4) == 0) {

return LocalTransactionState.COMMIT_MESSAGE;

}

return LocalTransactionState.UNKNOW;

}

}

Producer类:TransactionProducer.java


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53


package com.zoo.quickstart.transaction;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

import com.alibaba.rocketmq.client.producer.TransactionMQProducer;

import com.alibaba.rocketmq.common.message.Message;

/**

* 发送事务消息例子

*

*/

public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {

TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

// 事务回查最小并发数

producer.setCheckThreadPoolMinSize(2);

// 事务回查最大并发数

producer.setCheckThreadPoolMaxSize(2);

// 队列数

producer.setCheckRequestHoldMax(2000);

producer.setTransactionCheckListener(transactionCheckListener);

producer.setNamesrvAddr("192.168.0.104:9876");

producer.start();

String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };

TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

for (int i = 0; i < 1; i++) {

try {

Message msg =

new Message("TopicTest", tags[i % tags.length], "KEY" + i,

("Hello RocketMQ " + i).getBytes());

SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

System.out.println(sendResult);

Thread.sleep(10);

}

catch (MQClientException e) {

e.printStackTrace();

}

}

for (int i = 0; i < 100000; i++) {

Thread.sleep(1000);

}

producer.shutdown();

}

}


时间: 2025-01-02 04:22:37

RocketMQ学习(八):事务消息的相关文章

学习spring事务与消息队列_java

在开发过程中,遇到一个bug,产生bug的原因是spring事务提交晚于消息队列的生产消息,导致消息队列消费消息时获取到的数据不正确.这篇文章介绍问题的产生和一步步的解决过程. 一.问题的产生: 场景还原:接口中的一个方法,首先修改订单状态,然后向消息队列中生产消息,消息队列的消费者获取到消息检测订单状态,发现订单状态未更改. 代码: @Service(orderApi) public class OrderApiImpl implements OrderApi { @Resource MqSe

RocketMQ学习(六):消息的生命周期上之消息的产生

源代码版本是3.2.6.消息的生命周期包括2部分,消息的产生和消息的消费,这篇先说下前者.消息的产生详细一点可以分为: a.消息产生后由Producer发送至Broker. b.Broker接收到消息做持久化. 调试代码得到这样的过程, 1.DefaultMQProducer.send()发出消息. 2.DefaultMQProducerImpl.sendDefaultImpl()发出消息. 3.DefaultMQProducerImpl.tryToFindTopicPublishInfo(),

最佳实践:如何基于MNS实现事务消息

事务消息的背景: 有时候我们需要实现本地操作和消息发送的事务一致性功能.即:消息发送成功,则本地操作成功:反之,如果消息发送失败,本地操作失败(成功也需要rollback).保证不出现操作成功但消息发送失败:或者操作失败但消息发送成功的情况: 另外,消费端,我们也希望消息一定被成功处理一次,不会因为消息端程序崩溃而导致消息没有成功处理,进而需要人工重置消费进度.   解决方案: 利用消息服务MNS的延迟消息来实现. 准备工作: 创建两个队列: 1.事务消息队列 消息的有效期小于消息延迟时间.即如

启发:从MNS事务消息谈分布式事务

启发:从MNS事务消息谈分布式事务 事务消息本质上解决的问题是业务系统与消息系统之间的事务问题(跨系统分布式事务),其基本原理即两阶段提交以及最终一致性保障.最近看了下阿里云mns事务消息的实现原理,介绍的蛮简洁透彻的,对了解分布式事务实现原理挺有帮助,在阅读本文前推荐大家先仔细阅读下阿里云"mns事务消息"一文. 事务消息 背景描述 有时候我们需要实现本地操作和消息发送的事务一致性功能.即:消息发送成功,则本地操作成功:反之,如果消息发送失败,本地操作失败(成功也需要rollback

RocketMQ学习(九):顺序消息

rocketmq的顺序消息需要满足2点: 1.Producer端保证发送消息有序,且发送到同一个队列. 2.consumer端保证消费同一个队列. 先看个例子,代码版本跟前面的一样. Producer类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52

RocketMQ学习(七):消息的生命周期下之消息的消费

源代码版本是3.2.6.接着上一篇消息的产生,这篇是消息的消费.Consumer选择DefaultMQPushConsumer为例. 1.DefaultMQPushConsumer.start()开始. 2.RebalanceService.run()方法定时调用RebalanceImpl.doRebalance()方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic). 3.调用RebalanceImpl.updateProcessQueueTableInR

Kafka、RabbitMQ、RocketMQ消息中间件的对比—— 消息发送性能

引言 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka.RabbitMQ.RocketMQ)做了性能比较.   Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目.Kafka主要特点是基于Pull的模式来处理消息消费,

Kafka、RabbitMQ、RocketMQ 消息中间件的对比 | 消息发送性能篇

阿里云消息队列测试小组 出品 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,我们自家的产品 RocketMQ (阿里云消息队列(MQ)的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们消息队列测试小组对常见的三类消息产品(Kafka.RabbitMQ.RocketMQ)做了性能比较. Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目.Kafka主要特点是基于P

Kafka、RabbitMQ、RocketMQ消息中间件的对比 —— 消息发送性能

引言 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka.RabbitMQ.RocketMQ)做了性能比较. Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目.Kafka主要特点是基于Pull的模式来处理消息消费,追求