RabbitMQ之死信队列

DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数(可以参考RabbitMQ之mandatory和immediate)的功能。

核心代码实现:通过在queueDeclare方法中加入“x-dead-letter-exchange”实现。

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key");

还可以使用policy来配置:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

修改RabbitMQ之TTL(Time-To-Live 过期时间)中的例子:

public static void createQueue(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(ip);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        Map<String, Object>  argss = new HashMap<String, Object>();
        argss.put("vhost", "/");
        argss.put("username","root");
        argss.put("password", "root");
        argss.put("x-message-ttl",6000);
        argss.put("x-dead-letter-exchange","exchange.dlx.test");
 argss.put("x-dead-letter-routing-key","queue.dlx.test");
        channel.queueDeclare("queue.dlx.test", durable, exclusive, autoDelete, argss);
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

通过RabbitMQ的管理界面可以看到:

queue.dlx.test这个queue中有个“DLX”和“DLK”的标记. DLX关联的是exchangeName, DLK关联的是routingKey.

详细说明:
在RabbitMQ中有两个exchange: exchange.dlx.self和exchange.dlx.test,两个queue:queue.dlx.test和%DLX%queue.dlx.test
exchange.dlx.self是正常情况下,生产者发送消息到此exchange中,绑定关系如图:

exchang.dlx.test是产生死信之后,原queue[queue.dlx.test]的死信发送到此exchange中,绑定关系如图:

数据首先发送到 exchange[exchange.dlx.self],根据routingkey[dlx]路由到queue.dlx.test,如果正常情况下,消费者可以消费queue.dlx.test的内容。但是如果queue.dlx.test中有消息变成了dead message即死信了,那么这个死信则会通过exchangeName=exchange.dlx.test, routingKey=”queue.dlx.test”路由到死信队列%DLX%queue.dlx.test中,如果要消费这个dead message, 此时消费者必须消费%DLX%queue.dlx.test中的内容而不是queue.dlx.test中的内容。

如果不指定x-dead-letter-routing-key参数,则使用原来的routingkey


参考资料

  1. RabbitMQ之TTL(Time-To-Live 过期时间)
  2. RabbitMQ之mandatory和immediate
  3. RabbitMQ(四)RabbitMQ死信邮箱(DLX)
  4. RabbitMQ Dead Letter Exchanges
时间: 2025-01-20 20:39:59

RabbitMQ之死信队列的相关文章

RabbitMQ之镜像队列

概述 如果RabbitMQ集群只有一个broker节点,那么该节点的失效将导致整个服务临时性的不可用,并且可能会导致message的丢失(尤其是在非持久化message存储于非持久化queue中的时候).当然可以将所有的publish的message都设置为持久化的,并且使用持久化的queue,但是这样仍然无法避免由于缓存导致的问题:因为message在发送之后和被写入磁盘并执行fsync之间存在一个虽然短暂但是会产生问题的时间窗.通过publisher的confirm机制能够确保客户端知道哪些

RabbitMQ .NET消息队列使用详解_实用技巧

本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下 首先下载安装包,我都环境是win7 64位: 去官网下载 otp_win64_19.0.exe  和rabbitmq-server-3.6.3.exe安装好 然后开始编程了: (1)创建生产者类: class Program { private static void Main() { //建立RabbitMQ连接和通道 var connectionFactory = new ConnectionFacto

RabbitMQ之惰性队列(Lazy Queue)

RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念.惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储.当消费者由于各种各样的原因(比如消费者下线.宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了. 默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者.

rabbitmq php 消息队列安装教程

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. 1,安装rabbitmq,php扩展 # yum install php-pecl-amqp rabbitmq-server epel-release  2,启动 # /etc/init.d/rabbitmq-server start  如果要改什么配置的话,在/etc/rabbitmq 目录下新增文件:rabbitmq-env.conf,注意:文件名是固定的 RAB

RabbitMQ如何实现延迟队列?

什么是延迟队列 延迟队列存储的对象肯定是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延迟队列将订单信息发送到延迟队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到

消息中间件收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能. 这里会持续收录相关知识,包括安装.部署.使用示例.监控.运维.原理等. 所有新撰写的与中间件有关的文章都会收录与此,注意保存本文链接. Last Update Time: 2017-10-26 08:23 Update Content: RabbitMQ管理(5)--集群管理 通用 什么是Zero-Copy?(sendfile) 1.

RabbitMQ之TTL(Time-To-Live 过期时间)

1. 概述 RabbitMQ可以对消息和队列设置TTL. 目前有两种方法可以设置.第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间.第二种方法是对消息进行单独设置,每条消息TTL可以不同.如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准.消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息. 2. 设置队列属性 通过队列属性设置消息TTL的方法是在queue.declare方法中加入x-message-t

spring-boot + rabbitmq消息手动确认模式的几点说明(重试机制)

前提:使用rabbitmq的手动确认消息的模式 消息手动确认模式的几点说明 监听的方法内部必须使用channel进行消息确认,包括消费成功或消费失败 如果不手动确认,也不抛出异常,消息不会自动重新推送(包括其他消费者),因为对于rabbitmq来说始终没有接收到消息消费是否成功的确认,并且Channel是在消费端有缓存的,没有断开连接 如果rabbitmq断开,连接后会自动重新推送(不管是网络问题还是宕机) 如果消费端应用重启,消息会自动重新推送 如果消费端处理消息的时候宕机,消息会自动推给其他

消息队列在VB.NET数据库开发中的应用

数据|数据库 我们先简单的了解一下什么是消息队列(MSMQ)?消息队列是 Windows 2000(NT也有MSMQ,WIN95/98/me/xp不含消息队列服务但是支持客户端的运行)操作系统中通讯的基础,也是用于创建分布式.松散连接通讯应用程序的工具.这些应用程序可以通过不同种类的网络进行通讯,也可以与脱机的计算机通讯.消息队列分为用户创建队列和系统队列,用户队列分为: · "公共队列"在整个可传递消息的"消息队列"网络中复制并传输,并且有可能由网络连接的所有站点