RocketMQ学习(一):简介和QuickStart

RocketMQ是什么?

引用官方描述:
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

支持严格的消息顺序
支持Topic与Queue两种模式
亿级消息堆积能力
比较友好的分布式特性
同时支持Push与Pull方式消费消息
历经多次天猫双十一海量消息考验

RocketMQ是纯java编写,基于通信框架Netty。

代码地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop。

下载完代码后,将各个模块导入eclipse,本地尝试启动看看。

1.启动nameServer,运行rocketmq-namesrv的NamesrvStartup,运行之前需设置环境变量ROCKETMQ_HOME为RocketMQ项目的根目录,这样有一个作用是,指向logback的配置文件路径,保证在nameServer启动时,logback的正常初始化。我本机设置的是:ROCKETMQ_HOME=C:\Users\Administrator\git\RocketMQ。
The Name Server boot success. 表示启动成功。

2.启动brokerServer,运行rocketmq-broker的BrokerStartup,同样,运行之前需设置环境变量ROCKETMQ_HOME,然后启动参数需要带上【-n “192.168.0.109:9876″】,我本机的ip是192.168.0.109。如果不带-n的参数,那么broker会去访问http://jmenv.tbsite.net:8080/rocketmq/nsaddr获取nameServer的地址,这个地址不是我们自己的nameServer。
The broker[LENOVO-PC, 192.168.0.109:10911] boot success. and name server is 192.168.0.109:9876表示成功。

3.这个非必选项,不运行也可以。还可以启动rocketmq-srvutil的FiltersrvStartup,这是Consumer使用Java代码,在服务器做消息过滤。启动方式和broker一样,具体的过滤原理以后再详细的说。

到此就可以运行demo了。

pom.xml依赖:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23


<dependencies>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>3.2.2</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

如果依赖包下载不下来,再给个仓库地址,开源中国的:


1

2

3

4

5

6

7

8

9

10

11

12

13


<repositories>

<repository>

<id>nexus</id>

<name>Nexus</name>

<url>http://maven.oschina.net/content/groups/public/</url>

<releases>

<enabled>true</enabled>

</releases>

<snapshots>

<enabled>true</enabled>

</snapshots>

</repository>

</repositories>

贴代码:
Producer


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37


package com.zoo.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

/**

* Producer,发送消息

*

*/

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.0.109:9876");

producer.start();

for (int i = 0; i < 1000; i++) {

try {

Message msg = new Message("TopicTest",// topic

"TagA",// tag

("Hello RocketMQ " + i).getBytes()// body

);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

Thread.sleep(3000);

}

catch (Exception e) {

e.printStackTrace();

Thread.sleep(3000);

}

}

producer.shutdown();

}

}

Consumer


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45


package com.zoo.quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,订阅消息

*/

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("192.168.0.109:9876");

/**

* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

* 如果非第一次启动,那么按照上次消费的位置继续消费

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeConcurrentlyContext context) {

System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

System.out.println(" Receive Message Size: " + msgs.size());

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

因为demo代码来自于rocketmq-example,所以没有上传Github。

ps:以前rocketmq在Github开源的时候没有学习,后来突然有一天发现Github上404了,心里后悔莫急,这次rocketmq重新开源出来,一定不能错过了。

时间: 2024-10-26 07:29:06

RocketMQ学习(一):简介和QuickStart的相关文章

RocketMQ学习(四):rocketmq-filtersrv介绍和filter原理

源代码版本是3.2.6,还是直接跑源代码,启动配置参照前面写的<简介和quickstart>,启动顺序是namesrv,broker,filtersrv,filter和broker有顺序要求,如果filtersrv启动后找不到broker,则会System.exit()退出程序. 看下启动图: 看rocketmq-filtersrv代码,核心processor包下的只有一个Class类且只处理2种类型的请求,即DefaultRequestProcessor.processRequest()只处

JavaWeb学习----JSP简介及入门(含Eclipse for Java EE及Tomcat的配置)

[前言] JSP本身是JavaWeb中的知识,但是在学习Android网络时,必然要涉及到与服务器之间的交互,所以学一下JSP以及其他JavaWeb的内容还是很有必要的,至少能明白程序在访问服务器时,整个过程的原理. 其实,在学习Android之前,Java和JavaWeb的知识都是要先学习的.本人是在2014年7月正式开始Android方向的研究学习,在这之前没有接触任何和计算机软件相关的知识(唯一相关的是,本科学过一门C语言课程,不过现在已经忘光了). 我们来看下面的这张图就知道了: 两张图

RocketMQ学习(八):事务消息

源代码版本是3.2.6,还是直接跑源代码.rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交. 二阶段提交过程看图: 第一阶段是:步骤1,2,3. 第二阶段是:步骤4,5. 具体说明: 只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交. 其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚. 事务消息原理实现过程: 一阶段: Producer向Broker发送1条类型为

RocketMQ学习(五):Pull和Push

源代码版本是3.2.6.在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息. 区别是: push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的. pull方式里,取消息的过程需要用户自己写,首先通过打算

RocketMQ学习(九):顺序消息

rocketmq的顺序消息需要满足2点: 1.Producer端保证发送消息有序,且发送到同一个队列. 2.consumer端保证消费同一个队列. 先看个例子,代码版本跟前面的一样. Producer类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52

[EXtJS5学习笔记]第一节 Sencha Cmd 学习笔记 简介 Sencha Cmd是什么

本文作者:sushengmiyan -------------------------------------------------------------资源链接----------------------------------------------------------------------- 翻译来源  Sencha Cmd官方网站:    http://www.sencha.com/products/extjs/up-and-running/cmd-introduction J

RocketMQ学习(二):依赖关系和模块功能介绍

现在看的代码版本还是3.2.2 develop.先看张内部结构代码图: 从依赖层次再来看,越是被依赖的,越在底层: rocketmq包含9个子模块: rocketmq-common:通用的常量枚举.基类方法或者数据结构,按描述的目标来分包通俗易懂.包名有:admin,consumer,filter,hook,message等. rocketmq-remoting:用Netty4写的客户端和服务端,fastjson做的序列化,自定义二进制协议. rocketmq-srvutil:只有一个Serve

深度学习---tensorflow简介

什么是深度学习? 在机器学习流行之前,都是基于规则的系统,因此做语音的需要了解语音学,做NLP的需要很多语言学知识,做深蓝需要很多国际象棋大师. 而到后来统计方法成为主流之后,领域知识就不再那么重要,但是我们还是需要一些领域知识或者经验来提取合适的feature(特征),feature的好坏往往决定了机器学习算法的成败.对于NLP来说,feature还相对比较好提取,因为语言本身就是高度的抽象:而对于Speech或者Image来说,我们人类自己也很难描述我们是怎么提取feature的.比如我们识

RocketMQ学习(七):消息的生命周期下之消息的消费

源代码版本是3.2.6.接着上一篇消息的产生,这篇是消息的消费.Consumer选择DefaultMQPushConsumer为例. 1.DefaultMQPushConsumer.start()开始. 2.RebalanceService.run()方法定时调用RebalanceImpl.doRebalance()方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic). 3.调用RebalanceImpl.updateProcessQueueTableInR