RabbitMQ之mandatory和immediate

1. 概述

mandatory和immediate是AMQP协议中basic.publish方法中的两个标识位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。对于刚开始接触RabbitMQ的朋友特别容易被这两个参数搞混,这里博主整理了写资料,简单讲解下这两个标识位。

mandatory
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

immediate
当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。


2. mandatory

在生产者通过channle的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看channel接口,会发现存在3个重载的basicPublish方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

mandatory和immediate上面已经解释过了,其余的参数分别是:
exchange:交换机名称
routingkey:路由键
props:消息属性字段,比如消息头部信息等等
body:消息主体部分

本节主要讲述mandatory, 下面我们写一个demo,在RabbitMQ broker中有:
exchange : exchange.mandatory.test
queue: queue.mandatory.test
exchange路由到queue的routingkey是mandatory
这里先不讲当前的exchange绑定到queue中,即:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

详细代码如下:

package com.vms.test.zzh.rabbitmq.self;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Created by hidden on 2017/2/7.
 */
public class RBmandatoryTest {
    public static final String ip = "10.198.197.73";
    public static final int port = 5672;
    public static final String username = "root";
    public static final String password = "root";

    public static final String queueName = "queue.mandatory.test";
    public static final String exchangeName = "exchange.mandatory.test";
    public static final String routingKey = "mandatory";
    public static final Boolean mandatory = true;
    public static final Boolean immediate = false;

    public static void main(String[] args) {

        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(ip);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.basicQos(1);
            channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
//            channel.close();
//            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

运行,之后通过wireshark抓包工具可以看到如下图所示:

这里可以看到最后执行了basic.return方法,将发布者发出的消息返回给了发布者,查看协议的arguments参数部分可以看到:reply-text字段值为NO_ROUTE,表示消息并没有路由到合适的队列中;

那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为channel信道设置ReturnListener监听器来实现,具体代码(main函数部分):

try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(ip);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.basicQos(1);
            channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("Basic.return返回的结果是:"+message);
                }
            });

//            channel.close();
//            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

运行结果:

Basic.return返回的结果是:===mandatory===

下面我们来看一下,设置mandatory标志且exchange路由到queue中,代码部分只需要将:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

改为

channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

即可。
通过wireshark抓包如下:

可以看到并不会有basic.return方法被调用。查看RabbitMQ管理界面发现消息已经到达了队列。


3. immediate

在RabbitMQ3.0以后的版本里,去掉了immediate参数的支持,发送带immediate标记的publish会返回如下错误:
“{amqp_error,not_implemented,”immediate=true”,’basic.publish’}”

为什么移除immediate标记,参见如下版本变化描述:
Removal of “immediate” flag
What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish.
Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.
What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0.
If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them.
这段解释的大概意思是:immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。


参考资料

  1. 深入学习RabbitMQ(一):mandatory标志的作用
  2. RabbitMQ(二)AMQP协议mandatory和immediate标志位区别
时间: 2024-11-03 01:40:35

RabbitMQ之mandatory和immediate的相关文章

【原创】RabbitMQ 之 mandatory

1.什么情况会导致 blackholed? 两种情况: 声明 exchange 后未绑定任何 queue ,此时发送到该 exchange 上的 message 均被 blackholed : 声明 exchange 后绑定了 queue ,但发送到该 exchange 上的 message 所使用的 routing_key 不匹配任何 binding_key ,则 blackholed . 2.mandatory 的作用?       决定 message 将被如何处理,是被 exchange

【原创】RabbitMQ之PublisherConfirm实战问题总结

如何理解Publisher Confirm机制       Publisher Confirm机制(又称为Confirms或Publisher Acknowledgements)是作为解决事务机制性能开销大(导致吞吐量下降)而提出的另外一种保证消息不会丢失的方式. Publisher Confirm的协议交互过程 (补充说明:上图中未显示出info信息的两条交互对应的就是 Confirm.Select 和 Confirm.Select-ok ,显示不出来是因为 wireshark 没有对该扩展进

(RabbitMQ) Java Client API Guide

本篇翻译的是RabbitMQ官方文档关于API的内容,原文链接:http://www.rabbitmq.com/api-guide.html.博主对其内容进行大体上的翻译,有些许部分会保留英文,个人觉得这样更加有韵味,如果全部翻译成中文,会存在偏差,文不达意(主要是功力浅薄~~).文章也对部分内容进行一定的解释,增强对相关知识点的理解. Overview RabbitMQ java client uses com.rabbitmq.client as its top-level package,

RabbitMQ之死信队列

DLX, Dead-Letter-Exchange.利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX.消息变成死信一向有一下几种情况: 消息被拒绝(basic.reject/ basic.nack)并且requeue=false 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间)) 队列达到最大长度 DLX也是一个正常的Exchange,和一般的Exchan

RabbitMQ之消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢--消息持久化. 为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化. queue的持久化 queue的持久化是通过durable=true来实现的. 一般程序中这么使用: Connection connection = connectionFactory.newConnection(); Channel channel = conn

消息中间件收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能. 这里会持续收录相关知识,包括安装.部署.使用示例.监控.运维.原理等. 所有新撰写的与中间件有关的文章都会收录与此,注意保存本文链接. Last Update Time: 2017-10-26 08:23 Update Content: RabbitMQ管理(5)--集群管理 通用 什么是Zero-Copy?(sendfile) 1.

【整理】RabbitMQ publish方法中的immediate和mandatory属性

      鉴于在 RabbitMQ 的使用过程中,很多同学搞不清楚 basic.publish 接口中 mandatory 和 immediate 的背后含义,特搜集整理网上相关文章,以便彻底理解该问题. ===== 我是三体分隔线 =====  在 RabbitMQ 3.0.0 版本的 README 中如是说:  ? 1 2 3 4 ... feature removal 23896 remove support for AMQP's "immediate" publish mod

【原创】RabbitMQ 相关问题汇总

[面向对象和免责声明]      本文不是面向初级 RabbitMQ 的使用者,本文面向的是对 RabbitMQ 有一定的研究,对使用中的细节问题有一定的思考,对各种模型背后的原因有强烈的探究欲望的人.本文的所有内容不保证 100% 正确,但至少是我目前为止认为正确的结论,如果您有任何高见,敬请赐教,不甚感激. [RabbitMQ 问答]       本章节主要解答一些在 RabbitMQ 使用过程中,经常被问到的问题.其实很多问题的答案都可以在各类文档里找到,建议多翻阅参考资料中给出的文档. 

RabbitMQ 使用参考

1. 安装 从网站 http://www.rabbitmq.com/install-generic-unix.html 下载到二进制源码, 进入sbin 目录, 直接运行 server 即可. 默认服务监听在 5672 端口上(带上 SSL 默认在 5671 上). 2. 基本概念 RabbitMQ , 是一个使用 erlang 编写的 AMQP (高级消息队列协议) 的服务实现. 简单来说, 就是一个功能强大的消息队列服务. 通常我们谈到队列服务, 会有三个概念, 发消息者 , 队列 , 收消