QueueingConsumer在Rabbitmq客户端3.x版本中用的如火如荼,但是在4.x版本开初就被标记为@Deprecated,这是为什么呢?本文就此展开探讨。
在我的博文《RabbitMQ之Consumer消费模式(Push & Pull)》中讲到,Consumer的消费模式有Pull 和 Push两种,而经常用到的就是Push模式,Push模式在3.x的用法demo如下:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [X] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
break;
}
在官方文档中推荐使用继承DefaultConsumer的方式:
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
在源码注释中有关QueueingConsumer的介绍有这样一段:
QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers ware made on the Connection’s thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.
QueuingConsumer provided client code with an easy way to obviate the problem by queueing incoming messages and processing them on a separate, application-managed thread.
The threading behaviour of Connection and Channel has been changed so that each Channel uses a distinct thread for dispatching to Consumers. This prevents Consumers on one Channel holding up Consumers on another and it also prevents recursive calls from deadlocking the client.
As such, it is now safe to implement Consumer directly of to extend DefaultConsumer and QueueingConsumer is a lot less relevant.
上面提及了两个drawbacks:
- the Consumer could stall the processing of all Channels on the Connection. =>QueueingConsumer会拖累Connection的所有Channels的操作
- if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.=>同步递归调用时会产生死锁
对于这两句简单的言辞,博主没有停下追求真理的脚步,既而去github上发问,当我咨询rabbitmq-java-client的作者时(issue @265),他是这么回复的:
Search rabbitmq-users archives. That consumer implementation was merely a workaround for the consumer operation dispatch deficiency that no longer exists. It has significant limitations in that automatic connection recovery does not support it and when deliveries happen faster than consumers actually process them, its internal j.u.c. queue data structure can grow very large. It has been deprecated for years prior to the removal.
上面提及的rabbitmq-users的链接是:https://groups.google.com/forum/#!forum/rabbitmq-users,当然在我大天朝是访问不了的。博主的翻墙软件也失效了,就没法search,有兴趣的小伙伴search到的话麻烦告知下(下方留言,私信,或者留下你的资料地址~)。不过作者也提交了两点:1. automatic connection recovery不支持QueueingConsumer的这种形式;2. 内存溢出问题。
对于QueueingConsumer的内存溢出问题,我在博文《[九]RabbitMQ-客户端源码之Consumer》中讲到QueueingConsumer内部其实是一个LinkBlockingQueue,它将从broker端接受到的信息先暂存到这个LinkBlockingQueue中,然后消费端程序在从这个LinkBlockingQueue中take出消息。试下一下,如果我们不take消息或者说take的非常慢,那么LinkBlockingQueue中的消息就会越来越多,最终造成内存溢出。
这里我们来看一段英文介绍: You have a queue in RabbitMQ. You have some clients consuming from that queue. If you don’t set a Qos setting at all (Basic.Qos), then RabbitMQ will push all the queue’s messages to the client as fast as the network and the clients will allow. 也就是说,如果由于某些原因,队列中堆积了比较多的消息,就可能导致Consumer内存溢出卡死,于是发生恶性循环,队列消息不断堆积得不到消化,彻底地悲剧了。其实这个问题可以通过设置Basic.Qos来很好的解决。
博主这里实验了下,先往一个queue里发送200MB+的消息,然后进行消费:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
在client端发送Basic.Consume帧,并设置回调函数为QueueingConsumer之后,并不真正消费QueueingConsumer中的LinkedBlockingQueue中的内容,通过JVisualVM可以看到堆内存的变化,如下图所示:
可以看到堆内存一直在增加,博主这里只测试了发送200+MB的消息,如果发送的更多,那么这个堆会变得更大,直到OutOfMemory。
在stackoverflow上也有关于QueueingConsumer的疑问,有人说QueueingConsumer不是event-driven的,也有人提及了内存溢出的问题。看来QueueingConsumer的毛病真的不少,都推荐使用继承DefaultConsumer的方式进行消费。
如果博主后面搜集到更多的证据,也会在本博文中更新相关的内容。
如果你有相关的资料,也可以留言分享一下,大家互相促进~~
参考资料
- RabbitMQ之Consumer消费模式(Push & Pull)
- [九]RabbitMQ-客户端源码之Consumer
- RabbitMQ-api-guide
- 解决RabbitMQ队列超长QueueingConsumer导致JVM内存溢出的问题