RocketMQ学习(六):消息的生命周期上之消息的产生

源代码版本是3.2.6。消息的生命周期包括2部分,消息的产生和消息的消费,这篇先说下前者。消息的产生详细一点可以分为:

a.消息产生后由Producer发送至Broker。

b.Broker接收到消息做持久化。

调试代码得到这样的过程,

1.DefaultMQProducer.send()发出消息。

2.DefaultMQProducerImpl.sendDefaultImpl()发出消息。

3.DefaultMQProducerImpl.tryToFindTopicPublishInfo(),即向Namesrv发出GET_ROUTEINTO_BY_TOPIC的请求,来更新
MQProducerInner的topicPublishInfoTable和MQConsumerInner的topicSubscribeInfoTable。

4.调用topicPublishInfo.selectOneMessageQueue(),从发布的topic中轮询取出一个MessageQueue。默认一个topic对应4个MessageQueue。

5.调用mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()),获取brokerAddr(broker的地址)。

6.调用this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,// 1
mq.getBrokerName(),// 2
msg,// 3
requestHeader,// 4
timeout,// 5
communicationMode,// 6
sendCallback// 7
)发送。

7.调用MQClientAPIIImpl.sendMessageSync(addr, brokerName, msg, timeoutMillis, request)发送。

8.调用NettyRemotingClient.invokeSyncImpl()发送。

######到此Producer端发消息结束######

———我是分割线———-

######接着Request走到Broker######

9.SendMessageProcessor.processRequest(),接收到消息,封装requestHeader成broker内部的消息MessageExtBrokerInner,然后DefaultMessageStore.putMessage(msgInner),调用CommitLog.putMessage(msg)。

10.调用MapedFileQueue.getLastMapedFile()获取将要写入消息的文件,mapedFile.appendMessage(msg,this.appendMessageCallback)写入消息。

11.AppendMessageCallback.doAppend(fileFromOffset, byteBuffer,maxBlank,Object msg),用回调方法存储msg。

12.MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(),wroteOffset),用存储消息的节点ip和端口,加上准备写的偏移量(就是在前面获取的文件中)生成msgId。

13.以(topic-queueId)为key从topicQueueTable取queueOffset,queueOffset如果为null则设为0,存入topicQueueTable。

14.调用MessageSysFlag.getTransactionValue(msgInner.getSysFlag())获取tranType来判断该消息是否是事务消息,如果是TransactionPreparedType或者TransactionRollbackType,则queueOffset=0,这2种类型的消息是不会被消费的。见16,17。

15.调用byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen)写入文件。

16.构造DispatchRequest,然后DispatchMessageService.putRequest(dispatchRequest),异步DispatchMessageService.doDispatch(),分发消息位置信息到ConsumeQueue。如果是TransactionPreparedType或者TransactionRollbackType,则不处理,如果是TransactionNotType或者TransactionCommitType,则调用DefaultMessageStore.this.putMessagePostionInfo()。

17.调用ConsumeQueue.putMessagePostionInfo(),20个字节大小的buffer在内存里,offset即消息对应的在CommitLog的offset,size即消息在CommitLog存储中的大小,tagsCode即计算出来的长整数,写入buffer,this.mapedFileQueue.getLastMapedFile(expectLogicOffset)获取mapedFile,最后mapedFile.appendMessage(this.byteBufferIndex.array())写入文件,作逻辑队列持久化。

说明:当Broker接收到从Consumer发来的拉取消息的请求时,根据请求的Topic和queueId获取对应的ConsumerQueue,由于消息的类型是预备消息或者回滚消息时,不作持久化(即没有把消息体本身存储在CommitLog中的offset保存到ConsumerQueue中),那么自然也找不到该消息的逻辑存储单元(也就是前面的20个字节,根据这20个字节可以在CommitLog中定位到一条消息),最终Consumer也取不到该消息。

打个比喻,CommitLog是书的正文,消息体存在于CommitLog中,相当于是书正文中的一个章节,那么ConsumerQueue就是书的目录,记录着章节和页数的对应关系,如果是预备类型或者回滚类型的章节,目录中没有记录,即使在书的正文中存在,但是我们查找章节时都是通过目录来查找的,目录里没有,就找不到该章节。

18.DefaultMessageStore.this.indexService.putRequest(this.requestsRead.toArray()),新建索引。

时间: 2024-11-28 16:31:54

RocketMQ学习(六):消息的生命周期上之消息的产生的相关文章

RocketMQ学习(七):消息的生命周期下之消息的消费

源代码版本是3.2.6.接着上一篇消息的产生,这篇是消息的消费.Consumer选择DefaultMQPushConsumer为例. 1.DefaultMQPushConsumer.start()开始. 2.RebalanceService.run()方法定时调用RebalanceImpl.doRebalance()方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic). 3.调用RebalanceImpl.updateProcessQueueTableInR

MAVEN学习笔记之Maven生命周期和插件简介(3)

MAVEN学习笔记之Maven生命周期和插件简介(3) clean compile site三套生命周期相互独立. clean pre-clean 执行清理前的工作 clean 清理上一次构建生成的所有文件 post-clean 执行清理后的工作 compile validate generate-sources process-sources generate-resources process-sources process-resources 复制并处理资源文件,至目标目录,准备打包 co

java线程学习2——线程的生命周期

    该图摘自<疯狂的java讲义>,该书讲述的线程很详细.   其中新建状态和死亡状态的线程的isAlive属性均返回false,值得注意的是对死亡的线程调用start方法或 对新建的线程调用两次start 方法均会抛出illegalThreadStateException异常.  

Android四大组件之——Activity的生命周期(图文详解)

      转载请在文章开头处注明本博客网址:http://www.cnblogs.com/JohnTsai       联系方式:JohnTsai.Work@gmail.com       [Android四大组件学习系列Activity篇]       1.Android四大组件之--Activity(一)定义.状态和后退栈(图文详解)       2.Android四大组件之--Activity的生命周期(图文详解)      上一篇文章讲了Activity的定义.状态和后退栈,现在讲讲A

maven生命周期与插件

生命周期 在有关Maven的日常使用中,命令行的输入往往就对应了生命周期,如mvn package就表示执行默认生命周期阶段package.  生命周期概念 Maven的生命周期就是为了对所有的构建过程进行抽象和统一,这个生命周期包含了项目的清理.初始化.编译.测试.打包.集成测试. 验证.部署和站点生成等几乎所有构建步骤.即几乎所有项目的构建,都能映射到这样一个生命周期上.  Maven的生命周期是抽象的,这意味着生命周期本身不做任何实际的工作,在Maven的设计中,实际的任务(如编译源代码)

JavaWeb学习之Servlet(二)----Servlet的生命周期、继承结构、修改Servlet模板

一.http协议回顾: 在上一篇文章中:JavaWeb学习之Servlet(一)----MyEclipse及Tomcat的配置,我们通过在浏览器输入url,就能看到在MyEclipse中编写的Servlet资源,效果如下: 上图中,整个过程是这样的:浏览器中输入url后,会通过hosts文件/dns服务器解析为IP地址,进而找到对应ip地址的服务器. 在这期间,浏览器会通过http协议发出请求.服务器端收到请求后,做了下面这些事: (1)分析出当前请求的是哪台虚拟主机: 查看Host请求头分析出

秋色园QBlog技术原理解析:Module之页面基类-生命周期流程(六)

文章回顾: 1: 秋色园QBlog技术原理解析:开篇:整体认识(一) --介绍整体文件夹和文件的作用 2: 秋色园QBlog技术原理解析:认识整站处理流程(二) --介绍秋色园业务处理流程 3: 秋色园QBlog技术原理解析:UrlRewrite之无后缀URL原理(三) --介绍如何实现无后缀URL 4:  秋色园QBlog技术原理解析:UrlRewrite之URL重定向体系(四) --介绍URL如何定位到处理程序 5:  秋色园QBlog技术原理解析:Module之页面基类设计(五) --介绍

JAVA CDI 学习(2) - Scope 生命周期

在上一节中,我们已经知道了如何用@Inject实现基本注入,这一节研究Bean实例注入后的"生命周期",web application中有几种基本的生命周期(不管哪种编程语言都类似) 1.Application 生命周期 即:web application启动后,处于该生命周期级别的对象/变量,将一直存在,可以被所有web应用的用户共同访问,通常用来做网站计数器,实现流量访问之类.直到web 应用停止或重新启动,该对象才被销毁.简单来说:只要web application处于激活状态,

国内大批社交软件生命周期大概也就五至六年

记者调查:社交软件平均生命只有五六年 对许多手机老用户而言,当年飞信的地位绝不亚于如今的微信.因此,日前一则关于飞信将于6月30日下线停止服务的传言,一经发出就撩动了不少人的怀旧神经.面对媒体求证,重庆移动称,该传言系误读,只是关闭了其中某项功能,目前飞信业务运营正常,还于18日进行了版本更新. 随后,重庆晚报记者调查发现,国内大批社交软件从诞生到消失,其生命周期大概也就五六年. 飞信曾靠免费大杀四方 八九年前,手机短信还在很多人的话费支出中占比不低,于是当中国移动在2007年推出这项综合通信服