RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。但是这样是不能究其然,更不能究其所以然。博主这里翻阅了amqp-client的java客户端的源码,通过其来学习下AMQP协议,进而更深刻的了解RabbitMQ.
注:如无特殊说明,本系列的文章采用的amqp-client版本均为3.5.3。
本系列的文章主要是来阐述客户端与broker交互需要经历那些具体步骤,需要涉及那些重要的类以及方法,整体的轮廓又是如何。
本文主要涉及的类有(本系列的blog地址):
[一]RabbitMQ-客户端源码之ConnectionFactory
[二]RabbitMQ-客户端源码之AMQConnection
[三]RabbitMQ-客户端源码之ChannelManager
[四]RabbitMQ-客户端源码之Frame
[五]RabbitMQ-客户端源码之AMQChannel
[六]RabbitMQ-客户端源码之AMQCommand
[七]RabbitMQ-客户端源码之AMQPImpl+Method
[八]RabbitMQ-客户端源码之ChannelN
[九]RabbitMQ-客户端源码之Consumer
以发送消息来看看从源码级的逻辑流转情况。
首先看看发送消息的业务代码(部分主要的代码):
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();
从上面来看,主要牵涉ConnectionFactory, Connection, Channel这个几个类(有关Connection和Channel的AMQP流转流程可以参考文中最后部分)。实际情况是怎么样的呢,我们来分析下。
首先流转过程如下:
ConnectionFactory.newConnection()
-- AMQConnection.start()
-- MainLoop
-- Frame frame = SocketFrameHandler.readFrame()
-- AMQChannel.handleFrame(Frame frame)
- ConnectionFactory主要用来配置一些参数,并初始化AMQConnection, 这个版本的客户端与broker底层通信用的是java的原生Socket, 处理模块为SocketFrameHandler,SocketFrameHandler也在ConnectionFactory调用newConnection()时创建。之后根据参数以及SocketFrameHandler初始化了AMQConnection对象。
- AMQConnection的核心在于这个start()方法。包括Protocol-Header, Connection.Start/.StartOk, Connection.Tune/.TuneOk, Connection.Open/.OpenOk,以及启动MainLoop线程
- MainLoop是AMQConnection的内部私有类,主要用来(循环)读取(SocketFrameHandler.readFrame)并封装Socket中的帧Frame, 并进行进一步的处理,这个由AMQChannel来完成
- ChannelManager,这个在上面并没有展示出来,但是这里也需要说明下,这个是在MainLoop中处理Frame所使用的,用来管理Channel的,确切的来说是ChannelN.
AMQChannel.handleFrame(Frame frame)
-- AMQCommand.handleFrame(Frame frame)
-- AMQChannel.handleCompleteInboundCommand(AMQCommand command)
-- ChannelN.processAsync(AMQCommand command)
这个接着上面的流程继续:
AMQChannel.handleFrame(Frame frame)首先调用AMQCommand的handleFrame(Frame frame)方法来处理,AMQCommand内部其实是调用了CommandAssember(对Method, Content-Header以及Content-Body做了一下封装,其实忽略这个类也是可以的)的handleFrame, 说白了作用就是处理下Frame帧,方法返回值是boolean, 当一个AMQComand处理完毕后返回true,否则返回false。这里就有疑问了,什么叫做处理完毕?这里就又要说到MainLoop了,MainLoop线程主要循环读取Frame帧,像Connection.Start/.StartOk这种命令(AMQCommand)一般只包括Method类型的帧(Frame),AMQCommand的handleFrame方法直接返回true,但是像Basic.Publish这种命令一般包括Method帧,Content-Header帧,以及若干Content-Body帧,就需要handleFrame多次才能返回true。
AMQP技术术语
Method: 用于在节点之间传递特定类型的AMQP命令帧。
Content: 服务器和应用程序之间传送的数据.这个术语是“message”的同义词。
Content header:描述内容属性特定类型帧。
Content body: 包含原始应用程序数据的特定类型帧.内容体帧完全不透明-服务器不以任何方式检查或修改其body内容。
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
上面这个是AMQChannel中的handleFrame方法,当内部的AMQCommand的handleFrame方法返回true,即表示处理完毕一条AMQCommand之后再调用handleCompleteInboundCommand方法进行进一步处理。而这个handleCompleteInboundCommand方法的精髓在于processAsync方法,这个processAsync方法在AMQChannel中是一个抽象方法,真正的实现要看ChannelN这个类。
说到这里有一个点我没有提及,但是这个不影响主流程的阐述,这个点就是rpc的概念,具体的可以详细参考对AMQChannel类的介绍——[五]RabbitMQ-客户端源码之AMQChannel。
文中开篇的demo示例,采用wireshark工具抓包可得:
这里用来参考,以便更好的阐述Connection类和Channel类。
Connection类
AMQP是一个连接协议. 连接设计为长期的,且可运载多个通道. 连接生命周期是这样的:
- client打开与服务器的TCP/IP连接并发送一个协议头(protocol header).这只是client发送的数据,而不是作为方法格式的数据.
- server使用其协议版本和其它属性,包括它支持安全机制列表(Start方法)进行响应.
- client选择一种安全机制(Start-Ok).
- server开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询(Secure).
- client向server发送一个认证响应(Secure-Ok). 例如,对于使用”plain”机制,响应会包含登录用户名和密码.
(server 重复质询(Secure) 或转到协商,发送一系列参数,如最大帧大小(Tune).) - client接受或降低这些参数(Tune-Ok).
- client 正式打开连接并选择一个虚拟主机(Open).
- 服务器确认虚拟主机是一个有效的选择 (Open-Ok).
- 客户端现在使用希望的连接.
- 一个节点(client 或 server) 结束连接(Close).
- 另一个节点对连接结束握手(Close-Ok).
- server 和 client关闭它们的套接字连接.
没有为不完全打开的连接上的错误进行握手. 根据成功协议头协商(后面有详细定义),在发送或收到Open 或Open-Ok之前,如果一个节点检测到错误,这个节点必须关闭socket,而不需要发送任何进一步的数据。
Channel类
AMQP是一个多通道协议. 通道提供了一种方式来将一个重量级TCP/IP连接分成多个轻量级连接。这使得协议对于防火墙更加友好,因为端口使用是可预测的. 这也意味着传输调整和网络服务质量可以得到更好的利用。通道是独立的,它们可以同时执行不同的功能,可用带宽会在当前活动之间共享。这是令人期待的,我们鼓励多线程客户端应用程序经常使用”每个通道一个线程”编程模型.。然而,从单个client打开一个或多个AMQP servers连接也是完全可以接受的.。
通道生命周期如下:
- client打开一个新通道(Open).
- server确认新通道准备就绪(Open-Ok).
- client和server按预期来使用通道.
- 一个节点(client或server) 关闭了通道(Close).
- 另一个节点对通道关闭进行握手(Close-Ok).
附:本系列全集
- [Conclusion]RabbitMQ-客户端源码之总结
- [一]RabbitMQ-客户端源码之ConnectionFactory
- [二]RabbitMQ-客户端源码之AMQConnection
- [三]RabbitMQ-客户端源码之ChannelManager
- [四]RabbitMQ-客户端源码之Frame
- [五]RabbitMQ-客户端源码之AMQChannel
- [六]RabbitMQ-客户端源码之AMQCommand
- [七]RabbitMQ-客户端源码之AMQPImpl+Method
- [八]RabbitMQ-客户端源码之ChannelN
- [九]RabbitMQ-客户端源码之Consumer