RabbitMQ原理
前一段时间研究RabbitMQ的时候,发现官网、网上博客的内容比较零散,所以整理了一下官网内容以及其他人的博客内容,整理出下面一篇RabbitMQ原理。主要内容如下:
基础概念:Exchange, Routing key, Binding, Binding key, Exchange Types, RPC, 通信过程
架构相关:Virtual host, 消息存储, GC过程, 性能优化, Message,流控
队列详解:普通队列, 镜像队列
集群原理:基础概念, 服务可用性, 集群原理
镜像队列的回复方案
其他
一、 基础概念
1. Exchange
Exchange类似于数据通信网络中的交换机,提供消息路由策略。Rabbitmq中,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
一个Exchange可以和多个Queue进行绑定,Producer发送消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY,通过绑定规则,将消息路由给指定的Queue。
和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
2. Routingkey
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。Producer通过在发送消息给Exchange时指定routing key来决定消息流向哪里。
RabbitMQ为routingkey设定的长度限制为255 bytes。
3. Binding
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
4. Bindingkey
在绑定Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
5. ExchangeTypes
RabbitMQ常用的ExchangeType有fanout、direct(默认)、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述)。
1) Direct
它会把消息路由到那些binding key与routing key完全匹配的Queue中。
2) Fanout
不管消息的ROUTING_KEY设置为什么,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
3) Topic
Exchange会将消息转发到和ROUTING KEY匹配模式相同的所有队列
模式约定:
l routing key、binding key使用句点号“.”分隔的字符串
l binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配零个或多个单词
4) headers
不依赖于routingkey与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配,如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
6. RPC
实际的应用场景中,我们可能希望知道消息的处理结果,这相当于RPC。在RabbitMQ通过以下方式支持。
RabbitMQ中实现RPC的机制是:
l 客户端C发送请求(消息)时,在消息的属性(MessageProperties)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败。
l 服务器端S收到消息并处理。
l 服务器端S处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。
l 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
7. 通信过程
假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
1) P1生产消息,发送给服务器端的Exchange。
2) Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1。
3) Queue1收到消息,将消息发送给订阅者C1。
4) C1收到消息,发送ACK给队列确认收到消息。
5) Queue1收到ACK,删除队列中缓存的此条消息。
Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:
1) 如果consumer接收消息后发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
2) 如果cosumer接收了消息, 但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
3) 如果consumer接收了消息,但是程序中如果忘记了ack/nack,rabbitmq不会重复推此消息到consumer,但是。
4) rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true,那么rabbitmq将会把消息发送给下一个注册的consumer。
二、 架构相关
1. virtualhost
rabbitmqserver上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。
2. 消息存储
RabbitMQ对于queue中的message的保存方式有两种方式:disc和ram。如果采用disc,则需要对exchange/queue/delivery mode都要设置成durable模式。
Disc方式的好处是当RabbitMQ失效了,message仍然可以在重启之后恢复。ram方式,RabbitMQ处理message的效率要高很多,ram和disc两种方式的效率比大概是3:1。如果在有其它HA手段保障的情况下,选用ram方式是可以提高消息队列的工作效率的。如果使用ram方式,RabbitMQ能够承载的访问量则取决于可用的内存大小。
所有队列中的消息都以append的方式写到一个文件中,当这个文件的大小超过指定的限制大小后,关闭这个文件再创建一个新的文件供消息的写入。文件名(*.rdq)从0开始然后依次累加。
在进行消息的存储时,rabbitmq会在ets表中记录消息在文件中的映射,以及文件的相关信息。消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。
rabbitmq在启动时会创建msg_store_persistent,msg_store_transient两个进程,一个用于持久消息的存储,一个用于内存不够时,将存储在内存中的非持久化数据转存到磁盘中。所有队列的消息的写入和删除最终都由这两个进程负责处理,而消息的读取则可能是队列本身直接打开文件进行读取,也可能是发送请求由msg_store_persisteng/msg_store_transient进程进行处理。
Ets数据结构:
-record(msg_location,{ msg_id, //消息ID
ref_count, //引用计数
file, //消息存储的文件名
offset, //消息在文件中的偏移量
total_size //消息的大小
}).
日志文件数据结构:
-record(file_summary,{ file, //文件名
valid_total_size, //文件有效数据大小
left, //位于该文件左边的文件
right, //位于该文件右边的文件
file_size, //文件总的大小
locked, //上锁标记 垃圾回收时防止对文件进行操作
readers //当前读文件的队列数
})
3. GC过程
消息的删除只是从flying_ets表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息、更新文件有效数据大小。当垃圾数据超过一定比例后(默认比例为40%),rabbitmq触发垃圾回收。垃圾回收会先找到符合要求的两个文件(根据#file_summary{}中left,right找逻辑上相邻的两个文件,并且两个文件的有效数据可在一个文件中存储),然后锁定这两个文件,并先对左边文件的有效数据进行整理,再将右边文件的有效数据写入到左边文件,同时更新消息的相关信息(存储的文件,文件中的偏移量)、文件的相关信息(文件的有效数据,左边文件,右边文件),最后将右边的文件删除。
4. 性能考虑
1) 操作引用计数(flying_ets)
在进行消息的写入和删除操作前,会在flying_ets表里通过+1、-1的方式进行计数,然后投递请求给msg_store_persistent、msg_store_transient进程进行处理,进程在真正写操作或者删除之前会再次判断flying_ets中对应消息的计数,决定是否需要进行相应操作。这样对于频繁写入和删除的操作,减少实际的写入和删除。
2) 尽可能的并发读
在读取消息的时候,都先根据消息ID找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息的内容。
如果消息存储的文件被锁住了,或者对应的文件不存在了,则发送请求,由msg_store_persistent/msg_store_transient进程进行处理。
3) 利用flying_ets表进行缓存
对于当前正在写的文件,所有消息在写入前都会在cur_file_cache_ets表中存一份,消息读取时会优先从这里进行查找。文件关闭时,会将cur_file_cache_ets表中引用计数为0的消息进行清除。
4) 利用file_handle_cache的写缓存
rabbitmq中对文件的操作封转到了file_handle_cache模块,以写模式打开文件时,默认有1M大小的缓存,即在进行文件的写操作时,是先写入到这个缓存中,当缓存超过大小或者显式刷新,才将缓存中的内容刷入磁盘中。
5. Messageacknowledgment
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ, Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。
6. 流控
RabbitMQ可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项恢复正常。除了这两个阈值,RabbitMQ在正常情况下还用流控(Flow Control)机制来确保稳定性。
Erlang进程之间并不共享内存(binaries类型除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱。Erlang默认没有对进程邮箱大小设限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。
在RabbitMQ中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱大小达到内存阈值,阻塞生产者(得益于block机制,并不会崩溃)。然后RabbitMQ会进行page操作,将内存中的数据持久化到磁盘中。
为了解决该问题,RabbitMQ使用了一种基于信用证的流控机制。消息处理进程有一个信用组{InitialCredit,MoreCreditAfter},默认值为{200, 50}。消息发送者进程A向接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住;对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。
从上RabbitMQ生产消息传输路径可以看出,基于信用证的流控最终将消息发送进程的发送速度限制在消息处理进程的处理速度内。RabbitMQ中与流控有关的进程构成了一个有向无环图。
三、 MQ队列详解
1. 普通队列
1) 普通MQ的结构
通常队列由两部分组成:一部分是AMQQueue,负责AMQP协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相关的接口供AMQQueue调用,完成消息的存储以及可能的持久化工作等。
在RabbitMQ中BackingQueue又由5个子队列组成:Q1, Q2, Delta, Q3和Q4。RabbitMQ中的消息一旦进入队列,不是固定不变的,它会随着系统的负载在队列中不断流动,消息不断发生变化。在BackingQueue中,消息的生命周期分为4个状态:
l Alpha:消息的内容和消息索引都在RAM中。Q1和Q4的状态。
l Beta:消息的内容保存在DISK上,消息索引保存在RAM中。Q2和Q3的状态。
l Gamma:消息内容保存在DISK上,消息索引在DISK和RAM都有。Q2和Q3的状态。
l Delta:消息内容和索引都在DISK上。Delta的状态。
上述就是RabbitMQ的多层队列结构的设计,我们可以看出从Q1到Q4,基本经历RAM->DISK->RAM这样的过程。这样设计的好处是:当队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,当负载降低的时候,这部分消息又渐渐回到内存,被消费者获取,使得整个队列具有很好的弹性。
2) 消息队列的工作流程
引起消息流动主要有两方面因素:其一是消费者获取消息;其二是由于内存不足引起消息换出到磁盘。RabbitMQ在系统运行时会根据消息传输的速度计算一个当前内存中能够保存的最大消息数量(Target_RAM_Count),当内存中的消息数量大于该值时,就会引起消息的流动。进入队列的消息,一般会按照Q1->Q2->Delta->Q3->Q4的顺序进行流动,但是并不是每条消息都一定会经历所有的状态,这个取决于当前系统的负载状况。
当消费者获取消息时,首先会从Q4队列中获取消息,如果Q4获取成功,则返回。如果Q4为空,则尝试从Q3获取消息,首先系统会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息(后续会论证)。如果不为空,则取出Q3的消息,然后判断此时Q3和Delta队列的长度,如果都为空,则可认为Q2、Delta、Q3、Q4全部为空(后续会论证),此时将Q1中消息直接转移到Q4中,下次直接从Q4中获取消息。如果Q3为空,Delta不为空,则将Delta转移到Q3中,如果Q3不为空,则直接下次从Q3中获取消息。在将Delta转移到Q3的过程中,RabbitMQ是按照索引分段读取的,首先读取某一段,直到读到的消息非空为止,然后判断读取的消息个数与Delta中的消息个数是否相等,如果相等,则断定此时Delta中已无消息,则直接将Q2和刚读到的消息一并放入Q3中。如果不相等,则仅将此次读取到的消息转移到Q3。这就是消费者引起的消息流动过程。
消息换出的条件是内存中保存的消息数量+等待ACK的消息的数量>Target_RAM_Count。当条件触发时,系统首先会判断如果当前进入等待ACK的消息的速度大于进入队列的消息的速度时,会先处理等待ACK的消息。
最后我们来分析一下前面遗留的两个问题,一个是为什么Q3队列为空即可以认定整个队列为空。试想如果Q3为空,Delta不空,则在Q3取出最后一条消息时,Delta上的消息就会被转移到Q3上,Q3空矛盾。如果Q2不空,则在Q3取出最后一条消息,如果Delta为空,则会将Q2的消息并入到Q3,与Q3为空矛盾。如果Q1不为空,则在Q3取出最后一条消息,如果Delta和Q3均为空时,则将Q1的消息转移到Q4中,与Q4为空矛盾。这也解释了另外一个问题,即为什么Q3和Delta为空,Q2就为空。
通常在负载正常时,如果消息被消费的速度不小于接收新消息的速度,对于不需要保证可靠不丢的消息极可能只会有Alpha状态。对于durable=true的消息,它一定会进入gamma状态,若开启publish confirm机制,只有到了这个阶段才会确认该消息已经被接受,若消息消费速度足够快,内存也充足,这些消息也不会继续走到下一状态。
通常在系统负载较高时,已接受到的消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,增加处理每个消息的平均开销。因为要花更多的时间和资源处理“积压”的消息,所以用于处理新来的消息的能力就会降低,使得后来的消息又被积压进入很深的队列,继续加大处理每个消息的平均开销,这样情况就会越来越恶化,使得系统的处理能力大大降低。
根据官网资料,应对这一问题,有三个措施:
l 进行流量控制。
l 增加prefetch的值,即一次发送多个消息给接收者,加快消息被消费掉的速度。
l 采用multipleack,降低处理ack带来的开销。
2. 镜像队列
1) 镜像队列的结构
镜像队列就是一个特殊的BackingQueue,它内部包裹了一个普通的BackingQueue做本地消息持久化处理,在此基础上增加了将消息、ack复制到所有镜像的功能。所有对mirror_queue_master的操作,会通过组播GM的方式同步到各slave节点。GM负责消息的广播,mirror_queue_slave负责回调处理,而master上的回调处理是由coordinator负责完成。mirror_queue_slave中包含了普通的BackingQueue进行消息的存储,master节点中BackingQueue包含在mirror_queue_master中由AMQQueue进行调用。
消息的发布(除了Basic.Publish之外)与消费都是通过master节点完成。master节点对消息进行处理的同时将消息的处理动作通过GM广播给所有的slave节点,slave节点的GM收到消息后,通过回调交由mirror_queue_slave进行实际的处理。
对于Basic.Publish,消息同时发送到master和所有slave上,如果此时master宕掉了,消息还发送slave上,这样当slave提升为master的时候消息也不会丢失。
2) 组播(GM)
GM,Guarenteed Multicast. GM模块实现的一种可靠的组播通讯协议,该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到。它的实现大致如下:
将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时,相邻的节点会接管保证本次广播的消息会复制到所有的节点。在master节点和slave节点上的这些gm形成一个group,group(gm_group)的信息会记录在mnesia中。不同的镜像队列形成不同的group。从master节点发出gm后,消息顺着链表依次传送到所有的节点,由于所有节点组成一个循环链表,master节点对应的gm最终会收到自己发送的消息,这个时候master节点就知道消息已经复制到所有的slave节点了。
3) 节点的失效
如果某个slave失效了,系统除了做些记录外几乎啥都不做:master依旧是master,客户端不需要采取任何行动,或者被通知slave失效。
如果master失效了,那么slave中的一个必须被选中为master。被选中作为新的master的slave通常是最老的那个,因为最老的slave与前任master之间的同步状态应该是最好的。然而,需要注意的是,如果存在没有任何一个slave与master完全同步的情况,那么前任master中未被同步的消息将会丢失。
4) 消息的同步
将新节点加入已存在的镜像队列是,默认情况下ha-sync-mode=manual,镜像队列中的消息不会主动同步到新节点,除非显式调用同步命令。当调用同步命令后,队列开始阻塞,无法对其进行操作,直到同步完毕。当ha-sync-mode=automatic时,新加入节点时会默认同步已知的镜像队列。由于同步过程的限制,所以不建议在生产的active队列(有生产消费消息)中操作。
可以使用下面的命令来查看那些slaves已经完成同步:
rabbitmqctl list_queues
nameslave_pids synchronised_slave_pids
可以通过手动的方式同步一个queue:
rabbitmqctl sync_queue name
同样也可以取消某个queue的同步功能:
rabbitmqctl cancel_sync_queue name
当然这些都可以通过management插件来设置。
四、 集群原理
1. 基本概念
1) 镜像队列(Mirrored Queue)
RabbitMQ集群的队列(Queue)在默认的情况下只存在单一节点(node)上。我们也可以把队列配置成同时存在在多个节点上,也就是说队列可以被镜像到多个节点上。发布(publish)到镜像队列上的消息(message)会被复制(replicated)到所有的节点上。一个镜像队列包含一个主(master)和多个从(slave)。
2) 非同步的Slave(unsynchronised slave)
在rabbitmq中同步(synchronised)是用来描述master和slave之间的数据状态是否一致的。如果slave包含master中的所有message,则这个slave是synchronised,如果这个slave并没有包含master中所有的message,则这个slave是unsynchronised。
3) 在什么情况下会出现unsynchronisedslave?
当一个新slave加入到一个镜像队列时,这时这个新slave是空的,而master中这时可能包含之前接收到的消息。假设这时master包含了N条消息,这是第N+1条消息被添加到这个镜像队列中,这个新slave会从这个第N+1条消息开始接收。此时这个slave就是unsynchronised slave。随着前5条消息从镜像队列中被消费掉(consumed), 这个slave变成了synchronised。
slave 重新加入(rejoin)到镜像队列时,也会出现非同步的情况。一个slave要重新加入镜像队列之前,slave可能已经接收了一些消息,要重新加入镜像队列,就要清空自己之前已经接收的所有消息,好像自己是第一次加入队列一样。(slave在很多情况下会需要重新加入镜像队列,例如:网络分区(networkpartition))
2. 服务可用性(Availablity)与数据可靠性(Reliability)
1) 选主方式
由RabbitMQ的slave加入和重新加入队列的方式,我们得出一个结论,越早加入队列的slave,越有更大的机会是同步状态的,所以RabbitMQ通过这种方式选主:当master因为某种原因失效时,最早加入镜像队列的slave被提升成master。
2) RabbitMQ的可用性(Availablity)和数据可靠性(Reliability)
RabbitMQ通过参数配置的方式,在可用性(Availablity)和数据可靠性(Reliability)做出了一定的权衡。下面我们来看看这些参数。
l 参数ha-sync-mode
取值有automatic,manual,默认是manual。
如果镜像队列被设置成munual,当一个slave加入和重新加入队列时的行为,就是我们上面描述的行为,之所以叫manual,就是我们可以通过命令行手工(manually)进行同步。命令如下:rabbitmqctl sync_queue name
如果镜像队列被设置成automatic,当一个新slave加入时,slave会自动同步master中的所有消息,在所有消息被同步完成之前,所有的操作都会被阻塞(blocking)。
这个参数是可用性和可靠性的一个平衡,manual不保证数据可靠性,在某些情况会出现丢消息的可能,但是保证了队列的可用性。automatic提高了数据的可靠性,但是当有新slave加入时,可能会出现队列的暂时不可用。
l 参数ha-promote-on-shutdown
取值有when-synced,always,默认是when-synced。用来控制选主的行为。
当取值为when-synced时,在可控的master关闭时(比如停止RabbitMQ服务或者关闭操作系统),RabbitMQ会拒绝故障恢复(fail over)到一个非同步slave,也即拒绝把一个非同步的slave提升成新的master。只有在非可控的master关闭时(比如server crash, 断网),才会故障恢复到一个非同步的slave。
当取值为always时,则在所有情况下,都不会拒绝故障恢复到非同步的slave。
这个参数也是平衡可用性和可靠性的,当when-synced,可靠性更好,可用性降低了,因为如果所有的slave都是非同步状态,那就没有符合条件的slave可以被提升成master,这时队列就处在不可用状态。
3. 集群原理
1) 单节点
如果RabbitMQ集群只有一个broker节点,那么该节点的失效将导致整个服务临时性的不可用,并且可能会导致message的丢失(尤其是在非持久化message存储于非持久化queue中的时候)。当然可以将所有的publish的message都设置为持久化的,并且使用持久化的queue,但是这样仍然无法避免由于缓存导致的问题:因为message在发送之后和被写入磁盘并执行fsync之间存在一个虽然短暂但是会产生问题的时间窗。通过publisher的confirm机制能够确保客户端知道哪些message已经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
2) 普通队列
如果RabbitMQ集群是由多个broker节点构成的,那么从服务的整体可用性上来讲,该集群对于单点失效是有弹性的,但是同时也需要注意:尽管exchange和binding能够在单点失效问题上幸免于难,但是queue和其上持有的message却不行,这是因为queue及其内容仅仅存储于单个节点之上,所以一个节点的失效表现为其对应的queue不可用。
3) 镜像队列
RabbitMQ的镜像队列机制是将queue镜像到cluster中其他的节点之上。在通常的用法中,针对每一个镜像队列都包含一个master和多个slave,分别位于于不同的节点。slave会准确地按照master执行命令的顺序进行命令执行,故slave与master上维护的状态应该是相同的。所有动作都只会向master发送,然后由master将命令执行的结果广播给slave们,故看似从镜像队列中的消费操作实际上是在master上执行的。
在该实现下,如果镜像队列中的一个master失效了,集群自动选出一个slave(最老的slave)提升为master,此后message可以继续发送到队列上。
RabbitMQ的镜像队列同时支持publisher confirm和事务两种机制。在事务机制中,只有当前事务在全部镜像queue中执行之后,客户端才会收到Tx.CommitOk的消息。同样的,在publisher confirm机制中,向publisher进行当前message确认的前提是该message被全部镜像所接受了。
五、 补充要点
镜像队列不能作为负载均衡使用,因为每个操作在所有节点都要做一遍。
ha-mode参数和durabledeclare对exclusive队列都并不生效,因为exclusive队列是连接独占的,当连接断开,队列自动删除。所以实际上这两个参数对exclusive队列没有意义。
当所有slave都出在(与master)未同步状态时,并且ha-promote-on-shutdown设置为when-synced(默认)时,如果master因为主动的原因停掉,比如是通过rabbitmqctl stop命令停止或者优雅关闭OS,那么slave不会接管master,也就是此时镜像队列不可用;但是如果master因为被动原因停掉,比如VM或者OS crash了,那么slave会接管master。这个配置项隐含的价值取向是保证消息可靠不丢失,放弃可用性。如果ha-promote-on-shutdown设置为always,那么不论master因为何种原因停止,slave都会接管master,优先保证可用性。
镜像队列中最后一个停止的节点会是master,启动顺序必须是master先启动,如果slave先启动,它会有30s的等待时间,等待master的启动,然后加入cluster中(如果30s内master没有启动,slave会自动停止)。当所有节点因故(断电等)同时离线时,每个节点都认为自己不是最后一个停止的节点。要恢复镜像队列,可以尝试在30s之内启动所有节点。
对于镜像队列,客户端Basic.Publish操作会同步到所有节点(消息同时发送到master和所有slave上,如果此时master宕掉了,消息还发送slave上,这样当slave提升为master的时候消息也不会丢失),而其他操作则是通过master中转,再由master将操作作用于slave。比如一个Basic.Get操作,假如客户端与slave建立了TCP连接,首先是slave将Basic.Get请求发送至master,由master备好数据,返回至slave,投递给消费者。
当slave宕掉了,除了与slave相连的客户端连接全部断开之外,没有其他影响。
当master宕掉时,会有以下连锁反应:
l 与master相连的客户端连接全部断开;
l 选举最老的slave节点为master。若此时所有slave处于未同步状态,则未同步部分消息丢失;
l 新的master节点requeue所有unack消息,因为这个新节点无法区分这些unack消息是否已经到达客户端,亦或是ack消息丢失在老的master的链路上,亦或者是丢在master组播ack消息到所有slave的链路上。所以处于消息可靠性的考虑,requeue所有unack的消息。此时客户端可能有重复消息;
l 如果客户端连着slave,并且Basic.Consume消费时指定了x-cancel-on-ha-failover参数,那么客户端会受到一个ConsumerCancellation Notification通知,Java SDK中会回调Consumer接口的handleCancel方法,故需覆盖此方法。如果未指定x-cancal-on-ha-failover参数,那么消费者就无法感知master宕机,会一直等待下去。
Channel channel =
...;
Consumer consumer =
...;
Map<String, Object> args = new HashMap<String, Object>();
args.put(
"x-cancel-on-ha-failover", true);
channel.basicConsume(
"my-queue", false, args, consumer);
六、 镜像队列的恢复
1. 场景与解决方案
前提:两个节点A和B组成以镜像队列。
1) 场景1:A先停,B后停
该场景下B是master,只要先启动B,再启动A即可。或者先启动A,再在30s之内启动B即可恢复镜像队列。(如果没有在30s内恢复B,那么A自己就停掉自己)
2) 场景2:A,B同时停
该场景下可能是由掉电等原因造成,只需在30s内连续启动A和B即可恢复镜像队列。
3) 场景3:A先停,B后停,且A无法恢复。
因为B是master,所以等B起来后,在B节点上调用rabbitmqctl forget_cluster_node A以解除A的cluster关系,再将新的slave节点加入B即可重新恢复镜像队列。
4) 场景4:A先停,B后停,且B无法恢复
该场景比较难处理,旧版本的RabbitMQ没有有效的解决办法,在现在的版本中,因为B是master,所以直接启动A是不行的,当A无法启动时,也就没版本在A节点上调用rabbitmqctl forget_cluster_node B了,新版本中forget_cluster_node支持-offline参数,offline参数允许rabbitmqctl在离线节点上执行forget_cluster_node命令,迫使RabbitMQ在未启动的slave节点中选择一个作为master。当在A节点执行rabbitmqctl
forget_cluster_node-offline B时,RabbitMQ会mock一个节点代表A,执行forget_cluster_node命令将B提出cluster,然后A就能正常启动了。最后将新的slave节点加入A即可重新恢复镜像队列
5) 场景5:A先停,B后停,且A和B均无法恢复,但是能得到A或B的磁盘文件
这个场景更加难以处理。将A或B的数据库文件($RabbitMQ_HOME/var/lib目录中)copy至新节点C的目录下,再将C的hostname改成A或者B的hostname。如果copy过来的是A节点磁盘文件,按场景4处理,如果拷贝过来的是B节点的磁盘文件,按场景3处理。最后将新的slave节点加入C即可重新恢复镜像队列。
6) 场景6:A先停,B后停,且A和B均无法恢复,且无法得到A和B的磁盘文件
无解。
七、 参考博客
官网文档
RabbitMQ 队列镜像配置主机挂掉之后自动切换 另一台
http://blog.csdn.net/csethcrm/article/details/53928313
RabbitMQ不同Confirm模式下的性能对比
http://ju.outofmemory.cn/entry/177937
其他博客(当初整理的时候,忘记将所有参考文档都记录下来,所以此处就只有两个)