RabbitMQ基本概念和使用

RabbitMQ是一个消息代理,核心原理:发送消息,接收消息。

RabbitMQ主要用于组件之间的解耦,消息发送者无需知道消息使用者的存在,反之亦然。

              单向解耦                                        双向解耦(如:RPC)

例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer进行消息的正常处理,另一个Consumer复制对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成。

安装rabbitmq,请参考官网

首先通过一个非常简单的”hello world“例子介绍如何使用RabbitMQ,然后再介绍其涉及的基本概念并对交换机和队列多做点介绍。

一、helloworld例子

本例非常简单——发送一个消息”hello world“,然后获取它并输出到屏幕。

总共需要两个程序,一个发送消息叫send.py,一个接受消息并打印消息内容叫receive.py。

该图为helloworld例子的原理图:生产者send.py(Productor)把消息(”hello world“)发送到一个名为”queue“的队列中,消费者receive.py从这个队列中获取消息。接下来看代码:

send.py(发送消息)

#!/usr/bin/env python
import pika
#第一步,连接RabbitMq服务器
rabbit_username='xxx'
rabbit_password='xxx'
credentials = pika.PlainCredentials(rabbit_username, rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters( host='x.x.x.x',credentials=credentials))#channel是进行消息读写的通道
channel = connection.channel()

#第二步,创建一个名为queue的队列,然后把消息发送到这个队列

channel.queue_declare(queue='queue')

#第三步,现在可以发送消息,但是RabbitMQ不能把消息直接发送到队列,要发送到交换器,这个稍后介绍,这里使用默认交换器(exchange),它使用一个空字符串标
#识,routing_key参数必须指定为队列名称,这里为queue
channel.basic_publish(exchange='',
                      routing_key='queue',
                      body='hello world')
print "send.py:send message 'hello world',wait for receive.py deal with this message"

#退出程序前,通过关闭连接保证消息已经投递到RabbitMq
connection.close()

receive.py(获取数据)

#!/usr/bin/env python
import pika

#第一步,同样连接RabbitMq服务器
rabbit_username='xxx'
rabbit_password='xxx'
credentials = pika.PlainCredentials(rabbit_username, rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters( host='x.x.x.x',credentials=credentials))
channel = connection.channel()

#为确保队列存在,再次执行queue_declare创建一个队列,我们可以多次运行该命令,但是只有一个队列会创建
#因为不能保证send.py先执行还是receive.py先执行,所以重复声明队列来确保其存在
channel.queue_declare(queue='queue')

#第三步,定义一个回调函数,当获得消息时,Pika库调用这个回调函数来处理消息,该回调函数将消息内容打印到屏幕
def callback(ch, method, properties, body):
    print "receive.py: Received message %r" % (body,)

#第四步,告诉rabbbitMq回调函数将从queue队列接收消息
channel.basic_consume(callback,
                      queue='queue',
                      no_ack=True)

#第五步,输入一个无限循环来等待消息数据并运行回调函数
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

现在在终端运行程序。首先,用send.py发送一条消息:

$ python send.py
send.py:send message 'hello world',wait for receive.py deal with this message

生产者(producer)程序send.py每次运行后就会停止。现在运行receive.py来接收消息:

$ python receive.py
receive.py: Received message 'hello world'
 [*] Waiting for messages. To exit press CTRL+C

成功了!现在已经通过RabbitMQ发送了第一条消息。但是receive.py程序并没有退出,它一直在准备获取消息,可以通过ctrl-c来中断它。

二、RabbitMQ基本概念

总结一下发送接收消息的过程:

通过上面例子对RabbitMQ有一个感性认识后,现在来介绍RabbitMQ中的基本概念。

Broker:消息队列服务器实体

消息:每个消息都有一个路由键(routing key)的属性。就是一个简单的字符串。

connection:应用程序与broker的网络连接。

channel:几乎所有的操作都在channel中进行,channel是进行消息读写的通道。客户端可建立多个channel,每个channel代表一个会话任务。

交换机:接收消息,根据路由键转发消息到绑定的队列

绑定:一个绑定就是基于路由键将交换机和队列连接起来的路由规则,所以交换机不过就是一个由绑定构成的路由表。

举例:一个具有路由键“key1”的消息要发送到两个队列,queueA和queueB。要做到这点就要建立两个绑定,每个绑定连接一个交换机和一个队列。两者都是由路由键“key1”触发,这种情况,交换机会复制一份消息并把它们分别发送到两个队列中。

队列:消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

三、交换机

交换机用来接收消息,转发消息到绑定的队列,是rabbitMq中的核心。

交换机共有4种类型:direct,topic,headers和fanout。

为什么不创建一种交换机来处理所有类型的路由规则?因为每种规则匹配时的CPU开销是不同的,所以根据不同需求选择合适交换机。

举例:一个"topic"类型的交换机会将消息的路由键与类似“dog.*”的模式进行匹配。一个“direct”类型的交换机会将路由键与“dogs”进行比较。匹配末端通配符比直接比较消耗更多的cpu,所以如果用不到“topic”类型交换机带来的灵活性,就通过“direct”类型交换机获得更高的处理效率。

1、Direct交换机:转发消息到routingKey指定队列(完全匹配,单播)。

routingKey与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发routingkey标记为dog的消息,不会转发dog.puppy,也不会转发dog.guard等。

2、Topic交换机:按规则转发消息(最灵活,组播)

 Topic类型交换机通过模式匹配分配消息的routing-key属性。将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。

它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。

例如,binding key:*.stock.#匹配routing key: usd.stock和eur.stock.db,但是不匹配stock.nana。

例如,“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。

3、Fanout交换机:转发消息到所有绑定队列(最快,广播)

fanout交换机不处理路由键,简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。

很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

4、Note

  • 如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。
  • 一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。
  • 还有一些其他类型的交换机类型,如header、failover、system等,现在在当前的RabbitMQ版本中均未实现。
  • 因为交换机是命名实体,声明一个已经存在的交换机,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换机,然后重新声明并且赋予新的类型。
  • 交换机的属性:
    • 持久性:如果启用,交换机将会在server重启前都有效。
    • 自动删除:如果启用,那么交换机将会在其绑定的队列都被删掉之后删除自身。
    • 惰性:如果没有声明交换机,那么在执行到使用的时候会导致异常,并不会主动声明。

四、队列

  • 队列的属性:

    • 持久性:如果启用,队列将在Server服务重启前都有效。
    • 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除自身。
    • 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
    • 排他性:如果启用,队列只能被声明它的消费者使用。  

五、命令

1、启动

rabbitmq-server &

2、队列重置(清空队列、用户等)

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl stop

3、关闭

rabbitmqctl stop

4、列举出所有用户

rabbitmqctl list_users

5、列举出所有队列

rabbitmqctl list_queues

6、添加用户

rabbitmqctl add_user user_name user_passwd

7、设置用户角色为管理员

rabbitmqctl set_user_tags user administrator

8、权限设置

rabbitmqctl set_permissions -p / user ".*" ".*" ".*"

9、查看状态

rabbitmqctl status

10、安装RabbitMQWeb管理插件

rabbitmq-plugins enable rabbitmq_management 

可以利用http://ip:15672查看界面状态

以上命令参考自:http://www.cnblogs.com/kaituorensheng/p/4985767.html

 

深入了解可参考:

AMQP协议

RabbitMQ+Python入门经典 兔子和兔子窝

官网教程

Hello world

工作队列

上面两篇是翻译成中文的,剩下的都没有翻译,可参考英文版

RabbitMQ Tutorials

python采用pika库使用rabbitmq总结,多篇笔记和示例

 

本文作者starof,因知识本身在变化,作者也在不断学习成长,文章内容也不定时更新,为避免误导读者,方便追根溯源,请诸位转载注明出处:http://www.cnblogs.com/starof/p/4173413.html有问题欢迎与我讨论,共同进步。

时间: 2024-10-31 18:54:09

RabbitMQ基本概念和使用的相关文章

RabbitMQ基础概念详解

RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.A

消息中间件收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能. 这里会持续收录相关知识,包括安装.部署.使用示例.监控.运维.原理等. 所有新撰写的与中间件有关的文章都会收录与此,注意保存本文链接. Last Update Time: 2017-10-26 08:23 Update Content: RabbitMQ管理(5)--集群管理 通用 什么是Zero-Copy?(sendfile) 1.

RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)

版权声明:本文为博主原创文章,转载注明出处http://blog.csdn.net/u013142781 目录(?)[+] 一.消息队列使用场景或者其好处 消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量. 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的.消息队列在处理过程中间插入了一个隐含的.基于数据的接口层,两边的处理过程都要实现这一接口.这允许你独立的扩展或修改两边的处理过

RabbitMQ的transaction、confirm、ack三个概念的解释

在使用RabbitMQ的过程中,肯定会遇到这样的几个概念:transaction.confirm.ack.本文介绍一下这几个概念,以及他们之间的关系. RabbitMQ是采用的AMQP协议,AMQP协议定义了"确认"(acknowledgement),它是从consumer到RabbitMQ的确认,表示一条消息已经被客户端正确处理.RabbitMQ扩展了AMQP协议,定义了从broker到publisher的"确认",但将其称之为confirm.所以RabbitMQ

RabbitMQ的几种典型使用场景

RabbitMQ主页:https://www.rabbitmq.com/ AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现.它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程. 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产

RabbitMQ集群环境生产实例部署

生产环境: CentOS 6.3 x86_64 服务器主机名与IP列表: mq136      172.28.2.136 mq137      172.28.2.137 mq164      172.28.2.164 mq165      172.28.2.165 在各节点服务器上作好hosts解析 cat >>/etc/hosts/<<EOF mq136      172.28.2.136 mq137      172.28.2.137 mq164      172.28.2.1

RabbitMQ管理(3)——Web端管理

前面讲述的都是使用rabbitmqctl工具来管理RabbitMQ,有些时候你是否会觉得这种方式是不是不太友好?而且为能够运行rabbitmqctl工具,当前的用户需要拥有访问Erlang cookie的权限,由于服务器可能是以guest或者rabbit用户身份来运行的,因此你需要获得这些文件的访问权限,这样就引申出来一些权限管理的问题. RabbitMQ的开发团队也考虑到了这种情况,并且开发了RabbitMQ management插件.RabbitMQ management插件同样是由Erla

RabbitMQ管理(1)——多租户与权限

每一个RabbitMQ服务器都能创建虚拟的消息服务器,我们称之为虚拟主机(virtual host),简称为vhost.每一个vhost本质上是一个独立的小型RabbitMQ服务器,拥有自己独立的队列.交换器以及绑定关系等待,并且它拥有自己独立的权限.vhost就像是虚拟机与物理服务器一样,它们在各个实例间提供逻辑上的分离,允许你为不同程序安全保密地运行数据,它既能将同一个RabbitMQ的中众多客户区分开,又可以避免队列和交换器等命令冲突.vhost之间是绝对隔离的,你无法将vhost1中的交

RabbitMQ之惰性队列(Lazy Queue)

RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念.惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储.当消费者由于各种各样的原因(比如消费者下线.宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了. 默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者.