RocketMQ学习(五):Pull和Push

源代码版本是3.2.6。在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。

区别是:

push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

文字描述可能不是很清楚,前面的文章都是push方式的,所以这里只上pull方式的,贴代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29


<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<logback.version>1.0.13</logback.version>

<rocketmq.version>3.2.6</rocketmq.version>

</properties>

<dependencies>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>${rocketmq.version}</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

Producer:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37


package com.zoo.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

/**

* Producer,发送消息

*

*/

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.0.104:9876");

producer.start();

for (int i = 0; i < 5; i++) {

try {

Message msg = new Message("TopicTest",// topic

"TagA",// tag

("Hello RocketMQ " + i).getBytes()// body

);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

Thread.sleep(6000);

}

catch (Exception e) {

e.printStackTrace();

Thread.sleep(3000);

}

}

producer.shutdown();

}

}

Consumer:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71


package com.zoo.quickstart.pull;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;

import com.alibaba.rocketmq.client.consumer.PullResult;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageQueue;

/**

* PullConsumer,订阅消息

*/

public class PullConsumer {

private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

consumer.setNamesrvAddr("192.168.0.104:9876");

consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");

for (MessageQueue mq : mqs) {

System.out.println("Consume from the queue: " + mq);

SINGLE_MQ: while (true) {

try {

PullResult pullResult =

consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

System.out.println(pullResult);

putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

switch (pullResult.getPullStatus()) {

case FOUND:

// TODO

break;

case NO_MATCHED_MSG:

break;

case NO_NEW_MSG:

break SINGLE_MQ;

case OFFSET_ILLEGAL:

break;

default:

break;

}

}

catch (Exception e) {

e.printStackTrace();

}

}

}

consumer.shutdown();

}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {

offseTable.put(mq, offset);

}

private static long getMessageQueueOffset(MessageQueue mq) {

Long offset = offseTable.get(mq);

if (offset != null)

return offset;

return 0;

}

}

还有一种定时的Consumer:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58


package com.zoo.quickstart.pull;

import com.alibaba.rocketmq.client.consumer.MQPullConsumer;

import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService;

import com.alibaba.rocketmq.client.consumer.PullResult;

import com.alibaba.rocketmq.client.consumer.PullTaskCallback;

import com.alibaba.rocketmq.client.consumer.PullTaskContext;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageQueue;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class PullScheduleService {

public static void main(String[] args) throws MQClientException {

final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("192.168.0.104:9876");

scheduleService.setMessageModel(MessageModel.CLUSTERING);

scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {

@Override

public void doPullTask(MessageQueue mq, PullTaskContext context) {

MQPullConsumer consumer = context.getPullConsumer();

try {

// 获取从哪里拉取

long offset = consumer.fetchConsumeOffset(mq, false);

if (offset < 0)

offset = 0;

PullResult pullResult = consumer.pull(mq, "*", offset, 32);

System.out.println(offset + "\t" + mq + "\t" + pullResult);

switch (pullResult.getPullStatus()) {

case FOUND:

break;

case NO_MATCHED_MSG:

break;

case NO_NEW_MSG:

case OFFSET_ILLEGAL:

break;

default:

break;

}

// 存储Offset,客户端每隔5s会定时刷新到Broker

consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());

// 设置再过100ms后重新拉取

context.setPullNextDelayTimeMillis(100);

}

catch (Exception e) {

e.printStackTrace();

}

}

});

scheduleService.start();

}

}


时间: 2024-12-10 11:34:18

RocketMQ学习(五):Pull和Push的相关文章

教你如何学习五笔

五笔不一定是最好的输入法,也不一定是最快的输入法,但我肯定,五笔有着拼音无可比拟的优势,不是难学这个借口就值得放弃的.你想试试每分钟80字的感觉吗?你想知道什么是真正的盲打吗?加入我们五笔吧. 本方法一年前已经在我朋友身上试了,现在把方法写出来大家分享,也好让五笔输入法发扬光大. 首先说明的三点: 第一.这不是什么奇法,所以你还是要努力,我只是改了人们学五笔传统的错误学习方法. 第二.如果你说两天三小时换来只是学会五笔,每分钟打不到10字,每个句子有一半要问人怎样打,这样太无聊的话你也不要看下去

艾伟:C#多线程学习(五) 多线程的自动管理(定时器)

本系列文章导航 C#多线程学习(一) 多线程的相关概念 C#多线程学习(二) 如何操纵一个线程 C#多线程学习(三) 生产者和消费者 C#多线程学习(四) 多线程的自动管理(线程池) C#多线程学习(五) 多线程的自动管理(定时器) C#多线程学习(六) 互斥对象 Timer类:设置一个定时器,定时执行用户指定的函数. 定时器启动后,系统将自动建立一个新的线程,执行用户指定的函数. 初始化一个Timer对象: Timer timer = new Timer(timerDelegate, s,10

RocketMQ学习(一):简介和QuickStart

RocketMQ是什么? 引用官方描述: RocketMQ是一款分布式.队列模型的消息中间件,具有以下特点: 支持严格的消息顺序 支持Topic与Queue两种模式 亿级消息堆积能力 比较友好的分布式特性 同时支持Push与Pull方式消费消息 历经多次天猫双十一海量消息考验 RocketMQ是纯java编写,基于通信框架Netty. 代码地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop. 下载完代码后,将各个模块导入eclips

docker search, pull, login, push with Docker Hub - public registry

docker HUB是一个公共的image registry, 不注册账号的话, 可以从docker hub下载public image. docker search, pull操作不需要登录docker hub就可以对public image进行检索和下载. 如果需要将本地的image 推送到docker hub, 那么你需要注册一个docker hub的账号, 登录, 然后执行push即可. docker hub的免费用户只能保存public image. 也就是说大家都能搜到并下载你的im

Mysql 5.7 Gtid内部学习(五) mysql.gtid_executed表/gtid_executed变量/gtid_purged变量的更改时机

本节将集中讨论下面三种Gtid更新的时机,这部分相当重要,后面的故障案列会和这节有关.下面先来看一下他们的定义 mysql.gtid_executed表:Gtid持久化的介质,Mysql启动阶段会读取这个表来获取gtid_executed变量的值. gtid_executed变量(show global variables):Mysql数据库已经执行了哪些Gtid事务,处于内存中.show slave status中的Executed_Gtid_Set也取自这里. gtid_purged变量(s

RocketMQ学习(九):顺序消息

rocketmq的顺序消息需要满足2点: 1.Producer端保证发送消息有序,且发送到同一个队列. 2.consumer端保证消费同一个队列. 先看个例子,代码版本跟前面的一样. Producer类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52

RocketMQ学习(八):事务消息

源代码版本是3.2.6,还是直接跑源代码.rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交. 二阶段提交过程看图: 第一阶段是:步骤1,2,3. 第二阶段是:步骤4,5. 具体说明: 只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交. 其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚. 事务消息原理实现过程: 一阶段: Producer向Broker发送1条类型为

RocketMQ学习(四):rocketmq-filtersrv介绍和filter原理

源代码版本是3.2.6,还是直接跑源代码,启动配置参照前面写的<简介和quickstart>,启动顺序是namesrv,broker,filtersrv,filter和broker有顺序要求,如果filtersrv启动后找不到broker,则会System.exit()退出程序. 看下启动图: 看rocketmq-filtersrv代码,核心processor包下的只有一个Class类且只处理2种类型的请求,即DefaultRequestProcessor.processRequest()只处

零基础入门深度学习(五):长短时记忆网络

在上一篇文章<零基础入门深度学习(4):循环神经网络>中,我们介绍了循环神经网络以及它的训练算法.我们也介绍了循环神经网络很难训练的原因,这导致了它在实际应用中,很难处理长距离的依赖.在本文中,我们将介绍一种改进之后的循环神经网络:长短时记忆网络(Long Short Term Memory Network, LSTM),它成功地解决了原始循环神经网络的缺陷,成为当前最流行的RNN,在语音识别.图片描述.自然语言处理等许多领域中成功应用.   但不幸的一面是,LSTM的结构很复杂,因此,我们需