RocketMQ学习(九):顺序消息

rocketmq的顺序消息需要满足2点:

1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。

先看个例子,代码版本跟前面的一样。
Producer类:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61


import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.List;

import com.alibaba.rocketmq.client.exception.MQBrokerException;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.MessageQueueSelector;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

import com.alibaba.rocketmq.common.message.MessageQueue;

import com.alibaba.rocketmq.remoting.exception.RemotingException;

/**

* Producer,发送顺序消息

*/

public class Producer {

public static void main(String[] args) throws IOException {

try {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.0.104:9876");

producer.start();

String[] tags = new String[] { "TagA", "TagC", "TagD" };

Date date = new Date();

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

String dateStr = sdf.format(date);

for (int i = 0; i < 10; i++) {

// 加个时间后缀

String body = dateStr + " Hello RocketMQ " + i;

Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Integer id = (Integer) arg;

return mqs.get(id);

}

}, 0);//0是队列的下标

System.out.println(sendResult + ", body:" + body);

}

producer.shutdown();

} catch (MQClientException e) {

e.printStackTrace();

} catch (RemotingException e) {

e.printStackTrace();

} catch (MQBrokerException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.in.read();

}

}

Consumer端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56


import java.util.List;

import java.util.Random;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)

*/

public class Consumer {

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

consumer.setNamesrvAddr("192.168.0.104:9876");

/**

* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

* 如果非第一次启动,那么按照上次消费的位置继续消费

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

context.setAutoCommit(true);

System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );

for (MessageExt msg: msgs) {

System.out.println(msg + ", content:" + new String(msg.getBody()));

}

try {

//模拟业务逻辑处理中...

TimeUnit.SECONDS.sleep(random.nextInt(10));

} catch (Exception e) {

e.printStackTrace();

}

return ConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

NameServer和BrokerServer起来后,运行打印,把前面的不重要的去掉了,只看后面的几列:
content:2015-12-06 17:03:21 Hello RocketMQ 0
content:2015-12-06 17:03:21 Hello RocketMQ 1
content:2015-12-06 17:03:21 Hello RocketMQ 2
content:2015-12-06 17:03:21 Hello RocketMQ 3
content:2015-12-06 17:03:21 Hello RocketMQ 4
content:2015-12-06 17:03:21 Hello RocketMQ 5
content:2015-12-06 17:03:21 Hello RocketMQ 6
content:2015-12-06 17:03:21 Hello RocketMQ 7
content:2015-12-06 17:03:21 Hello RocketMQ 8
content:2015-12-06 17:03:21 Hello RocketMQ 9

可以看到,消息有序的。

如何在集群消费时保证消费的有序呢?

1.ConsumeMessageOrderlyService类的start()方法,如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。
consumer收到后,设置是否锁住标志位。
这里注意2个变量:
consumer端的RebalanceImpl里的ConcurrentHashMap processQueueTable,是否锁住设置在ProcessQueue里。
broker端的RebalanceLockManager里的ConcurrentHashMap> mqLockTable,这里维护着全局队列锁。,>

2.ConsumeMessageOrderlyService.ConsumeRequest的run方法是消费消息,这里还有个MessageQueueLock messageQueueLock,维护当前consumer端的本地队列锁。保证当前只有一个线程能够进行消费。

3.拉到消息存入ProcessQueue,然后判断,本地是否获得锁,全局队列是否被锁住,然后从ProcessQueue里取出消息,用MessageListenerOrderly进行消费。
拉到消息后调用ProcessQueue.putMessage(final List msgs) 存入,具体是存入TreeMapmsgTreeMap。
然后是调用ProcessQueue.takeMessags(final int batchSize)消费,具体是把msgTreeMap里消费过的消息,转移到TreeMap msgTreeMapTemp。,>,>

4.本地消费的事务控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起一会再消费),在此之前还有一个变量ConsumeOrderlyContext context的setAutoCommit()是否自动提交。
当SUSPEND_CURRENT_QUEUE_A_MOMENT时,autoCommit设置为true或者false没有区别,本质跟消费相反,把消息从msgTreeMapTemp转移回msgTreeMap,等待下次消费。

当SUCCESS时,autoCommit设置为true时比设置为false多做了2个动作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
ProcessQueue.commit() :本质是删除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消费时从msgTreeMap转移过来的。
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。

那么少了这2个动作会怎么样呢,随着消息的消费进行,msgTreeMapTemp里的消息堆积越来越多,消费消息的偏移量一直没有更新到broker导致consumer每次重新启动后都要从头开始重复消费。
就算更新了offset到broker,那么msgTreeMapTemp里的消息堆积呢?不知道这算不算bug。
所以,还是把autoCommit设置为true吧。

时间: 2024-10-31 06:20:19

RocketMQ学习(九):顺序消息的相关文章

《Adobe Premiere Pro CC完全剖析》——学习的顺序

学习的顺序 之前提到,使用这么多有用的软件听起来很吓人,特别是在Creative Cloud套件中有很多(甚至更多)你可能会用到的东西时.那么,你应该按什么顺序来学习这些软件呢?答案或许显而易见,或许又并非如此. 以下是关于你应按什么顺序学习新软件的两条规则. 基于相似性.复杂性来学习.选择一个与Adobe Premiere Pro相似的软件来轻松学习.那些有着明显不同界面的软件(如Adobe SpeedGrade)需要从零开始学起. 基于需要来学习.有时,你需要某个特性(如Foundry Ke

RocketMQ学习(八):事务消息

源代码版本是3.2.6,还是直接跑源代码.rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交. 二阶段提交过程看图: 第一阶段是:步骤1,2,3. 第二阶段是:步骤4,5. 具体说明: 只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交. 其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚. 事务消息原理实现过程: 一阶段: Producer向Broker发送1条类型为

RocketMQ学习(七):消息的生命周期下之消息的消费

源代码版本是3.2.6.接着上一篇消息的产生,这篇是消息的消费.Consumer选择DefaultMQPushConsumer为例. 1.DefaultMQPushConsumer.start()开始. 2.RebalanceService.run()方法定时调用RebalanceImpl.doRebalance()方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic). 3.调用RebalanceImpl.updateProcessQueueTableInR

RocketMQ学习(六):消息的生命周期上之消息的产生

源代码版本是3.2.6.消息的生命周期包括2部分,消息的产生和消息的消费,这篇先说下前者.消息的产生详细一点可以分为: a.消息产生后由Producer发送至Broker. b.Broker接收到消息做持久化. 调试代码得到这样的过程, 1.DefaultMQProducer.send()发出消息. 2.DefaultMQProducerImpl.sendDefaultImpl()发出消息. 3.DefaultMQProducerImpl.tryToFindTopicPublishInfo(),

RocketMQ学习(一):简介和QuickStart

RocketMQ是什么? 引用官方描述: RocketMQ是一款分布式.队列模型的消息中间件,具有以下特点: 支持严格的消息顺序 支持Topic与Queue两种模式 亿级消息堆积能力 比较友好的分布式特性 同时支持Push与Pull方式消费消息 历经多次天猫双十一海量消息考验 RocketMQ是纯java编写,基于通信框架Netty. 代码地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop. 下载完代码后,将各个模块导入eclips

Kafka、RabbitMQ、RocketMQ消息中间件的对比—— 消息发送性能

引言 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka.RabbitMQ.RocketMQ)做了性能比较.   Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目.Kafka主要特点是基于Pull的模式来处理消息消费,

RocketMQ学习(四):rocketmq-filtersrv介绍和filter原理

源代码版本是3.2.6,还是直接跑源代码,启动配置参照前面写的<简介和quickstart>,启动顺序是namesrv,broker,filtersrv,filter和broker有顺序要求,如果filtersrv启动后找不到broker,则会System.exit()退出程序. 看下启动图: 看rocketmq-filtersrv代码,核心processor包下的只有一个Class类且只处理2种类型的请求,即DefaultRequestProcessor.processRequest()只处

Kafka、RabbitMQ、RocketMQ 消息中间件的对比 | 消息发送性能篇

阿里云消息队列测试小组 出品 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,我们自家的产品 RocketMQ (阿里云消息队列(MQ)的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们消息队列测试小组对常见的三类消息产品(Kafka.RabbitMQ.RocketMQ)做了性能比较. Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目.Kafka主要特点是基于P

Kafka、RabbitMQ、RocketMQ消息中间件的对比 —— 消息发送性能

引言 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka.RabbitMQ.RocketMQ)做了性能比较. Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目.Kafka主要特点是基于Pull的模式来处理消息消费,追求