RabbitMQ(三) -- Publish/Subscribe

`rabbitmq`支持一对多的模式,一般称为发布/订阅。也就是说,生产者产生一条消息后,`rabbitmq`会把该消息分发给所有的消费者。

Exchanges

之前的教程中,仅仅使用了基本的消息模型:

  • 生产者产生消息
  • 把消息添加到消息队列
  • 消费者接收消息

而在`rabbitmq完整的消息模型`中,并不是这样的。事实上,生产者并不知道消息是否发送到队列,而是把消息直接发送给`Exchanges`。

`Exchanges`的功能理解起来非常简单,它只负责接收生产者发送的数据并把这些数据添加到消息队列。但是,在存在多个消息队列的情况下,`Exchanges`必须知道每条消息要添加到哪一个消息队列。

`rabbitmq`提供了几种`Exchanges`,包括:`direct`, `topic`, `headers` and `fanout`。

这里,仅仅介绍fanout的使用。

channel.exchange_declare(exchange='news', type='fanout')

那么,发布消息:

channel.basic_publish(exchange='news', routing_key='', body=message)

Temporary queues

由于在生产者和消费者中需要指定相同的消息队列才能实现消息通信,那么如果不特殊指定某个消息队列会如何呢?
那么需要使用默认参数让系统给生成一个特定的消息队列。

result = channel.queue_declare()

Bindings

为了发送指定发送的消息队列,必须创建exchange和消息队列之间的关系:

channel.queue_bind(exchange='news', queue=result.method.queue)

例子

作为生产者的publish:

#!/usr/bin/env python
# coding=utf-8

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='news',
type='fanout')
for i in range(100):
    message = str(i) + 'Hello World!'
    channel.basic_publish(exchange='news', routing_key='', body=message)
    print " [x] Sent %r" % (message,)
    import time
    time.sleep(2)
connection.close()

作为消费者的subscribe:

#!/usr/bin/env python
# coding=utf-8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='news', type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='news',
queue=queue_name)

print ' [*] Waiting for news. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

 


本文 由 cococo点点 创作,采用 知识共享 署名-非商业性使用-相同方式共享 3.0 中国大陆 许可协议进行许可。欢迎转载,请注明出处:
转载自:cococo点点 http://www.cnblogs.com/coder2012

时间: 2024-10-17 23:19:28

RabbitMQ(三) -- Publish/Subscribe的相关文章

RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)

 <=== RabbitMQ消息队列(三):任务分发机制            上篇文章中,我们把每个Message都是deliver到某个Consumer.在这篇文章中,我们将会将同一个Message deliver到多个Consumer中.这个模式也被成为 "publish / subscribe".     这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer). 我们将构建两个Consum

使用Publish/Subscribe 设计模式达到对象间数据同步

对象|设计|数据|数据同步 使用Publish/Subscribe 设计模式达到对象间数据同步 应用程序经常需要更改和交换数据,必须传送这些更改后数据以达到对象的同步,尤其在多窗口用户界面应用程序中更要求这种数据的同步协调,在这一类应用程序中,潜在的数据更新信息一定要反映到所有被包含的子窗体中. 例如一个人员信息管理的应用程序.一次可以打开多个包含一个人名字的窗口,如果你在其中一个窗口中修改并报存了这个人的名字,你将期望对名字改变应立即显示在其它全部窗体内.可以通过使用Publish/Subsc

使用Publish/Subscribe 设计模式达到对象间数据同步(二)

对象|设计|数据|数据同步 在注册处理期间,subscriber被分配一个独特的标记,用来在event channel中标识subscriber.event channel也使用这个标记索引那些subscriber. 虽然样品应用作为标记目标的杂乱脉冲干扰电码使用,我推荐在你的自己的程序里使用另一个方法产生一个独特的标识符 ( 例如产生一GUID). 使用目录菜单建立3到4个frmList窗口实例.使用新的目录菜单选项创作frmList 的3 或者4 个实例,然后在其中一个窗口中选择一个条目,双

RabbitMQ的几种典型使用场景

RabbitMQ主页:https://www.rabbitmq.com/ AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现.它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程. 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产

如何优雅的使用RabbitMQ

RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具.消息队列的使用场景大概有3种: 1.系统集成,分布式系统的设计.各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即"通过消息传递的架构". 2.当系统中的同步处理方式严重影响了吞吐量,比如日志记录.假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系

【原创】RabbitMQ 之 Plugins(翻译)

      为了方便工作中使用,对 RabbitMQ 的[插件]相关文档进行了翻译,鉴于自己水平有限,翻译中难免有纰漏产生,如果疑问,欢迎指出探讨.此文以中英对照方式呈现.官方原文:http://www.rabbitmq.com/plugins.html ========== 我是分割线 =============  Plugins RabbitMQ supports a variety of plugins. This page documents the plugins that ship

ActiveMQ(三)消息机制

消息结构 消息头 消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:JMSDestination,JMSMessageID等.  消息属性 如果需要除消息头字段以外的值,那么可以使用消息属性.这种新属性包含以下几种: 应用需要用到的属性; 消息头中原有的一些可选属性; JMS Provider 需要用到的属性.  消息体 JMS定义的消息类型有TextMessage.MapMessage.BytesMessage.StreamMessage和ObjectMessage. TextM

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

ActiveMQ, Qpid, HornetQ and RabbitMQ in Comparison

  国内私募机构九鼎控股打造APP,来就送 20元现金领取地址:http://jdb.jiudingcapital.com/phone.html内部邀请码:C8E245J (不写邀请码,没有现金送)国内私募机构九鼎控股打造,九鼎投资是在全国股份转让系统挂牌的公众公司,股票代码为430719,为"中国PE第一股",市值超1000亿元.        ----------------------------------------------------------------------