RabbitMQ之消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。


queue的持久化

queue的持久化是通过durable=true来实现的。
一般程序中这么使用:

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name", true, false, false, null);

关键的是第二个参数设置为true,即durable=true.

Channel类中queueDeclare的完整定义如下:

    /**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @return a declaration-confirm method to indicate the queue was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

参数说明:

queue:queue的名称

exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

queueDeclare相关的有4种方法,分别是:

Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

其中需要说明的是queueDeclarePassive(String queue)可以用来检测一个queue是否已经存在。如果该队列存在,则会返回true;如果不存在,就会返回异常,但是不会创建新的队列。


消息的持久化

如过将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新什么之前被持久化的queue。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。
如果要在重启后保持消息的持久化必须设置消息是持久化的标识。

设置消息的持久化:

channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());

这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN
首先看一下basicPublish的方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;

exchange表示exchange的名称
routingKey表示routingKey的名称
body代表发送的消息体
有关mandatory和immediate的详细解释可以参考:RabbitMQ之mandatory和immediate.

这里关键的是BasicProperties props这个参数了,这里看下BasicProperties的定义:

public BasicProperties(
            String contentType,//消息类型如:text/plain
            String contentEncoding,//编码
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//优先级
            String correlationId,
            String replyTo,//反馈队列
            String expiration,//expiration到期时间
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)

这里的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。

上面的实现代码使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那么这个又是什么呢?

public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain",
                        null,
                        null,
                        2,
                        0, null, null, null,
                        null, null, null, null,
                        null, null);

可以看到这其实就是讲deliveryMode设置为2的BasicProperties的对象,为了方便编程而出现的一个东东。
换一种实现方式:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());

设置了队列和消息的持久化之后,当broker服务重启的之后,消息依旧存在。单只设置队列持久化,重启之后消息会丢失;单只设置消息的持久化,重启之后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。


exchange的持久化

上面阐述了队列的持久化和消息的持久化,如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。这里博主建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
                           String type,
                           boolean durable,
                           boolean autoDelete,
                           boolean internal,
                           Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);即在声明的时候讲durable字段设置为true即可。


进一步讨论

1.将queue,exchange, message等都设置了持久化之后就能保证100%保证数据不丢失了嚒?
答案是否定的。
首先,从consumer端来说,如果这时autoAck=true,那么当consumer接收到相关消息之后,还没来得及处理就crash掉了,那么这样也算数据丢失,这种情况也好处理,只需将autoAck设置为true(方法定义如下),然后在正确处理完消息之后进行手动ack(channel.basicAck).

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

其次,关键的问题是消息在正确存入RabbitMQ之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。那么这个怎么解决呢?首先可以引入RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多,很多现实生产环境下都是配置了mirrored-queue的。还有要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端,有关RabbitMQ的事务机制或者Confirm机制可以参考:RabbitMQ之消息确认机制(事务+Confirm). 幸亏本文的主题是讨论RabbitMQ的持久化而不是可靠性,不然就一发不可收拾了。RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。

2.消息什么时候刷到磁盘?
写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。



欲了解更多消息中间件的内容,可以关注:消息中间件收录集


参考资料

  1. RabbitMQ消息队列(三):任务分发机制
  2. RabbitMQ之mandatory和immediate
  3. RabbitMQ之消息确认机制(事务+Confirm)
  4. RabbitMQ持久化机制
时间: 2024-08-24 18:49:23

RabbitMQ之消息持久化的相关文章

轻松搞定RabbitMQ(三)——消息应答与消息持久化

       这个官网的第二个例子中的消息应答和消息持久化部分.我把它摘出来作为单独的一块儿来分享. Message acknowledgment(消息应答)        执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了.基于现在的代码,一旦RabbitMQ将消息分发给了消费者,就会从内存中删除.在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息.        但是,我们不想丢失任何任务,如果有一个消

EQueue - 详细谈一下消息持久化以及消息堆积的设计

前言 之前写了一篇文章,总体介绍了EQueue.在看这篇文章之前如果还没看过那篇文章,可能会看不懂这篇文章.所以建议没看过的朋友务必先看一下那篇文章中所提到的各种概念,这样才能更好的理解本文所说的内容.说实话我当初写EQueue也是抱着一种玩的态度的,就是想尝试写一个分布式消息队列,用来为ENode提供分布式消息通信的能力.后来写着写着,发现越来越好玩,因为觉得这个队列以后应该会很实用,所以就花了更多的时间去设计它,完善它.希望它最终能被更多的人使用.到目前为止,我觉得目前基本实现了以下特性:轻

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

【转载】关于RabbitMQ的消息持久性

      在消息队列 RabbitMQ 入门介绍里,描述了 RabbitMQ 的持久性设置.在设置持久化后,消息保存在磁盘上,即使 RabbitMQ 重启或服务器重启,消息都不会丢失. RabbitMQ 支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化.消息队列持久化包括3个部分:(1)exchange 持久化,在声明时指定 durable => 1(2)queue 持久化,在声明时指定 durable => 1(3)消息持久化,在投递时指定 deliv

ActiveMQ的几种消息持久化机制

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制. ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的. 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试. 消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去. >>

【转载】关于RabbitMQ的消息确认

      RabbitMQ 将消息投递到客户端后,客户端如果没处理完这个消息就死掉了,这个消息还会不会存在?这取决于 RabbitMQ 的消息确认机制(Message acknowledgment)是否打开.       为了确保消息不会丢失,RabbitMQ 支持消息确认机制.(在开启了消息确认机制后)客户端需要在收到消息并处理完后,发送一个 ack 消息给 RabbitMQ,告诉它该消息可以被安全删除了.假如客户端在发送 ack 之前意外死掉了,那么 RabbitMQ 会将消息投递到下一个

RabbitMQ .NET消息队列使用详解_实用技巧

本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下 首先下载安装包,我都环境是win7 64位: 去官网下载 otp_win64_19.0.exe  和rabbitmq-server-3.6.3.exe安装好 然后开始编程了: (1)创建生产者类: class Program { private static void Main() { //建立RabbitMQ连接和通道 var connectionFactory = new ConnectionFacto

rabbitmq php 消息队列安装教程

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. 1,安装rabbitmq,php扩展 # yum install php-pecl-amqp rabbitmq-server epel-release  2,启动 # /etc/init.d/rabbitmq-server start  如果要改什么配置的话,在/etc/rabbitmq 目录下新增文件:rabbitmq-env.conf,注意:文件名是固定的 RAB

WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能

最近的工作我在做一个有关于消息发送和接受封装工作.大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行DB落地.在发送前我会先进行DB的插入,单表插入,所以在性能上也是能接受的,单表插入做了压测基本上是一到两毫秒的时间,加上消息的发送(有ACK)再加上集群是两个节点的高可用(一个磁盘持久化节点),单台TPS基本上是在2000-3000左右.这对于我们的业务场景来说是够用了.一旦当消息丢失或者由于网络问题.集群问题业务不会中断,消息就算发不