[九]RabbitMQ-客户端源码之Consumer

[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。

这里先来看下消费者客户端的关键代码:

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicQos(32);
        channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [X] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }

可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。

在AMQConnection中有关MainLoop的主线程,专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:

if (method instanceof Basic.Deliver) {
    processDelivery(command, (Basic.Deliver) method);
    return true;
} 

之后调用processDelivery方法:

protected void processDelivery(Command command, Basic.Deliver method) {
    Basic.Deliver m = method;

    Consumer callback = _consumers.get(m.getConsumerTag());
    if (callback == null) {
        if (defaultConsumer == null) {
            throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case.");
        }
        else {
            callback = defaultConsumer;
        }
    }

    Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());
    try {
        this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());
    } catch (Throwable ex) {
        getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery");
    }
}

这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数,然后调用这个回调函数的handleDeliver()方法进行处理,这里有些同学会有疑问,明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法,其实这里只是包了一层皮,ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。

我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的:

@Override public void handleDelivery(String consumerTag,
                           Envelope envelope,
                           AMQP.BasicProperties properties,
                           byte[] body)
    throws IOException
{
    checkShutdown();
    this._queue.add(new Delivery(envelope, properties, body));
}

这里的queue就是一个LinkedBlockingQueue,客户端程序通过调用nextDelivery()方法来获取数据:

public Delivery nextDelivery()
    throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{
    return handle(_queue.take());
}

private Delivery handle(Delivery delivery) {
    if (delivery == POISON ||
        delivery == null && (_shutdown != null || _cancelled != null)) {
        if (delivery == POISON) {
            _queue.add(POISON);
            if (_shutdown == null && _cancelled == null) {
                throw new IllegalStateException(
                    "POISON in queue, but null _shutdown and null _cancelled. " +
                    "This should never happen, please report as a BUG");
            }
        }
        if (null != _shutdown)
            throw Utility.fixStackTrace(_shutdown);
        if (null != _cancelled)
            throw Utility.fixStackTrace(_cancelled);
    }
    return delivery;
}

这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作,也就是一个可能会阻塞等待的操作。


附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer
时间: 2025-01-30 12:58:29

[九]RabbitMQ-客户端源码之Consumer的相关文章

memcached客户端源码分析

转载:memcached客户端源码分析 memcached的Java客户端有好几种,http://code.google.com/p/memcached/wiki/Clients 罗列了以下几种 Html代码   spymemcached          * http://www.couchbase.org/code/couchbase/java             o An improved Java API maintained by Matt Ingenthron and other

急求百度手环手机客户端源码

问题描述 急求百度手环手机客户端源码 50C dulife手环的安卓手机客户端的源码或者手机端测试的源码百度手环Android手机客户端源码 解决方案 这种源码怎么可能给你,都是商业机密.去研究研究原理.技术还是正途.

求java开发的FTP服务器与客户端源码,必有重谢!

问题描述 要求界面和谐,有权限控制,各位大大手头上有的请帮个忙吧!

android-dulife安卓手机客户端的源码,急求

问题描述 dulife安卓手机客户端的源码,急求 dulife手环的安卓手机客户端的源码,或者手机端测试的源码,百度手环Android手机客户端源码

[一]RabbitMQ-客户端源码之ConnectionFactory

首先看一段amqp-client发送端的示例代码(展示出主要部分): ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection

[Conclusion]RabbitMQ-客户端源码之总结

RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上"搜刮"点散碎的知识点来充实一下.但是这样是不能究其然,更不能究其所以然.博主这里翻阅了amqp-client的java客户端的源码,通过其来学习下AMQP协议,进而更深刻的了解RabbitMQ. 注:如无特殊说明,本系列的文章采用的amqp-client版本均为3.5.3. 本系列的文章主要是来阐述客户端与b

[二]RabbitMQ-客户端源码之AMQConnection

上一篇文章([一]RabbitMQ-客户端源码之ConnectionFactory)中阐述了conn.start()方法完成之后客户端就已经和broker建立了正常的连接,而这个Connection的关键就在于这个start()方法之内,下面我们来慢慢分析. 首先来看看start()方法的源码,这个方法有点长,这里拆开来一一分析,首先是注释: /** * Start up the connection, including the MainLoop thread. * Sends the pro

[七]RabbitMQ-客户端源码之AMQPImpl+Method

AMQPImpl类包括AMQP接口(public class AMQImpl implements AMQP)主要囊括了AMQP协议中的通信帧的类别. 这里以Connection.Start帧做一个例子. public static class Connection { public static final int INDEX = 10; public static class Start extends Method implements com.rabbitmq.client.AMQP.C

[五]RabbitMQ-客户端源码之AMQChannel

AMQChannel是一个抽象类,是ChannelN的父类.其中包含唯一的抽象方法: /** * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method * returns true, the command is considered handled a