[六]RabbitMQ-客户端源码之AMQCommand

AMQCommand是用来处理AMQ命令的,其包含了Method, Content Heaeder和Content Body.
下面是通过wireshark抓包的AMQP协议

上图中的Basic.Publish命令就包含Method, Content header以及Content body。

AMQCommand不是直接包含Method等成员变量的,而是通过CommandAssembler又做了一次封装。
接下来先看下CommandAssembler类。此类中有这些成员变量:

/** Current state, used to decide how to handle each incoming frame. */
private enum CAState {
    EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE
}
private CAState state;

/** The method for this command */
private Method method;

/** The content header for this command */
private AMQContentHeader contentHeader;

/** The fragments of this command's content body - a list of byte[] */
private final List<byte[]> bodyN;
/** sum of the lengths of all fragments */
private int bodyLength;

/** No bytes of content body not yet accumulated */
private long remainingBodyBytes;
  • CAState state标识这此Command目前的状态,是准备处理Method(EXPECTING_METHOD),还是处理Content header(EXPECTING_CONTENT_HEADER),还是准备处理Content body(EXPECTING_CONTENT_BODY),还是以及完成了(COMPLETE)。
  • Method method代表type=Method的AMQP帧
  • AMQContentHeader contentHeader代表type=Content header的AMQP帧
  • final List<byte[]> bodyN代表type=Content body的AMQP帧,就是真正的消息体(Message body)。
  • bodyLength就是消息体大小

这个类中除了构造函数,getMethod, getContentHeader, getContentBody,isComplete这个几个方法,最关键的方法就是:

public synchronized boolean handleFrame(Frame f) throws IOException
{
    switch (this.state) {
      case EXPECTING_METHOD:          consumeMethodFrame(f); break;
      case EXPECTING_CONTENT_HEADER:  consumeHeaderFrame(f); break;
      case EXPECTING_CONTENT_BODY:    consumeBodyFrame(f);   break;

      default:
          throw new AssertionError("Bad Command State " + this.state);
    }
    return isComplete();
}

这个方法主要是处理AQMP帧的,根据CAState state来处理相应状态类型的帧,然后赋值给相应的成员变量。
采用consumeMethodFrame(Frame f)方法举个例子:

private void consumeMethodFrame(Frame f) throws IOException {
    if (f.type == AMQP.FRAME_METHOD) {
        this.method = AMQImpl.readMethodFrom(f.getInputStream());
        this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
    } else {
        throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);
    }
}

这个方法首先判断当前帧是否是Method帧(AMQP.FRAME_METHOD),然后调用AMQPImp.readMethodFrom的方法。就那Connection.Start这个真来将,它会从socket的输入流中读取:

public Start(MethodArgumentReader rdr) throws IOException {
    this(rdr.readOctet(), rdr.readOctet(), rdr.readTable(), rdr.readLongstr(), rdr.readLongstr());
}

对应于下图:

  • 第一个rdr.readOctet()是指Version-Magor:0
  • 第二个rdr.readOctet()是指Version-Minor:9
  • 第三个rdr.readTable()是指Server-Properties
  • 第四个rdr.readLongstr()是指Mechanisms
  • 第五个rdr.readLongstr()是指Locales

而MethodArgumentReader.readOctet()就是:

public final int readOctet()
    throws IOException
{
    clearBits();
    return in.readOctet();//in对象是DataInputStream对象
}


写到这里,思路再跳回来,知道了底层其实是Socket的DataInputStream,其上只是做了封装再封装
CommandAssembler 中的handleFrame这个方法只在AMQCommand中的:

private final CommandAssembler assembler;
public boolean handleFrame(Frame f) throws IOException {
    return this.assembler.handleFrame(f);
}

只在这个方法中调用。CommandAssembler只是对Method,Content-Header,Content-Body做了一下封装。下面继续回到AMQCommand这个类中来。
仔细阅读源码的同学会发现在handleFrame方法当遇到类似Basic.Publish时会有Method,Content-Header,Content-Body一起的报文,那么handleFrame处理完Method之后就直接返回了,没有完全处理完,这该如何是好?
这个就又要联系到AMQConnection中的MainLoop的内部类了。此类中的关键代码如下:

while (_running) {
    Frame frame = _frameHandler.readFrame();

    if (frame != null) {
        _missedHeartbeats = 0;
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
            // Ignore it: we've already just reset the heartbeat counter.
        } else {
            if (frame.channel == 0) { // the special channel
                _channel0.handleFrame(frame);
            } else {
                if (isOpen()) {
                    // If we're still _running, but not isOpen(), then we
                    // must be quiescing, which means any inbound frames
                    // for non-zero channels (and any inbound commands on
                    // channel zero that aren't Connection.CloseOk) must
                    // be discarded.
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        cm.getChannel(frame.channel).handleFrame(frame);
                    }
                }
            }
        }
    } else {
        // Socket timeout waiting for a frame.
        // Maybe missed heartbeat.
        handleSocketTimeout();
    }
}

可以看到这是一个一直轮询读取Frame并处理Frame的过程。在遇到类似Basic.Publish这种带Method, Content-Header, Content-Body的类型的报文时,会循环处理,直到处理完成。注意这里的Method, Content-Header以及Content-Body都是看成单个Frame的,也就是这个while循环要三次,而不是将Basic.Publish看成一个帧。
上面调用的handleFrame方法是AMQChannel类中的(详细可以参考([五]RabbitMQ-客户端源码之AMQChannel)):

public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

可以看到只有当AMQCommand的handleFrame方法返回true时,即执行完成之后才会继续处理。



AMQCommand也有getMethod, getContentHeader, getContentBody等方法,这些都是间接调用CommandAssembler类中相应的方法的。
AMQCommand中也有个特别重要的方法:

/**
 * Sends this command down the named channel on the channel's
 * connection, possibly in multiple frames.
 * @param channel the channel on which to transmit the command
 * @throws IOException if an error is encountered
 */
public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();

    synchronized (assembler) {
        Method m = this.assembler.getMethod();
        connection.writeFrame(m.toFrame(channelNumber));
        if (m.hasContent()) {
            byte[] body = this.assembler.getContentBody();

            connection.writeFrame(this.assembler.getContentHeader()
                    .toFrame(channelNumber, body.length));

            int frameMax = connection.getFrameMax();
            int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
                    - EMPTY_FRAME_SIZE;

            for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                int remaining = body.length - offset;

                int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                        : bodyPayloadMax;
                Frame frame = Frame.fromBodyFragment(channelNumber, body,
                        offset, fragmentLength);
                connection.writeFrame(frame);
            }
        }
    }

    connection.flush();
}

这段主要通过传输AMQP帧的,通过AMQChannel获取到通信链路connection,然后将AMQCommand对象自身的method成员变量(或者包括content-header以及content-body)传送给broker。这段方法里还有判断payload大小是否超过broker端所设置的最大帧大小frameMax,即(frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE这段代码。当frameMax=0时则没有大小限制,当frameMax不为0时则按照payload拆分成若干的payload然后发送多个FRAME_BODY帧。


附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer
时间: 2024-09-13 16:44:22

[六]RabbitMQ-客户端源码之AMQCommand的相关文章

memcached客户端源码分析

转载:memcached客户端源码分析 memcached的Java客户端有好几种,http://code.google.com/p/memcached/wiki/Clients 罗列了以下几种 Html代码   spymemcached          * http://www.couchbase.org/code/couchbase/java             o An improved Java API maintained by Matt Ingenthron and other

急求百度手环手机客户端源码

问题描述 急求百度手环手机客户端源码 50C dulife手环的安卓手机客户端的源码或者手机端测试的源码百度手环Android手机客户端源码 解决方案 这种源码怎么可能给你,都是商业机密.去研究研究原理.技术还是正途.

开发一个Linux调试器(六):源码级逐步执行

在前几篇博文中我们学习了 DWARF 信息以及它如何使我们将机器码和上层源码联系起来.这一次我们通过为我们的调试器添加源码级逐步调试将该知识应用于实际. 系列文章索引 随着后面文章的发布,这些链接会逐渐生效. 准备环境 断点 寄存器和内存 Elves 和 dwarves 源码和信号 源码级逐步执行 源码级断点 调用栈展开 读取变量 下一步 揭秘指令级逐步执行 我们正在超越了自我.首先让我们通过用户接口揭秘指令级单步执行.我决定将它切分为能被其它部分代码利用的 single_step_instru

求java开发的FTP服务器与客户端源码,必有重谢!

问题描述 要求界面和谐,有权限控制,各位大大手头上有的请帮个忙吧!

android-dulife安卓手机客户端的源码,急求

问题描述 dulife安卓手机客户端的源码,急求 dulife手环的安卓手机客户端的源码,或者手机端测试的源码,百度手环Android手机客户端源码

[一]RabbitMQ-客户端源码之ConnectionFactory

首先看一段amqp-client发送端的示例代码(展示出主要部分): ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection

[Conclusion]RabbitMQ-客户端源码之总结

RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上"搜刮"点散碎的知识点来充实一下.但是这样是不能究其然,更不能究其所以然.博主这里翻阅了amqp-client的java客户端的源码,通过其来学习下AMQP协议,进而更深刻的了解RabbitMQ. 注:如无特殊说明,本系列的文章采用的amqp-client版本均为3.5.3. 本系列的文章主要是来阐述客户端与b

[二]RabbitMQ-客户端源码之AMQConnection

上一篇文章([一]RabbitMQ-客户端源码之ConnectionFactory)中阐述了conn.start()方法完成之后客户端就已经和broker建立了正常的连接,而这个Connection的关键就在于这个start()方法之内,下面我们来慢慢分析. 首先来看看start()方法的源码,这个方法有点长,这里拆开来一一分析,首先是注释: /** * Start up the connection, including the MainLoop thread. * Sends the pro

[七]RabbitMQ-客户端源码之AMQPImpl+Method

AMQPImpl类包括AMQP接口(public class AMQImpl implements AMQP)主要囊括了AMQP协议中的通信帧的类别. 这里以Connection.Start帧做一个例子. public static class Connection { public static final int INDEX = 10; public static class Start extends Method implements com.rabbitmq.client.AMQP.C