转载RabbitMQ入门(3)--发布和订阅

发布和订阅

(使用java 客户端)

在先前的指南中,我们创建了一个工作队列。这工作队列后面的假想是每一个任务都被准确的传递给工作者。在这部分我们将会做一些完全不同的事情–我们将一个消息传递给多个消费者。这部分被认知为“发布和订阅”。

为了说明这个部分,我们会建立一个简单德日志系统,它是由两个程序组成–第一个发出日志消息,第二个接收和打印它们。

在我们的日志系统中,每一个运行的接收者拷贝程序将会获得信息。通过这个方式我们可以运行一个接收者,直接的把日志记录到硬盘中;在同一时间我们可以运行另一个接收者,在屏幕上看这些日志。
本质上,发布日志消息等同于广播到所有接收者。

交换

在先前指南部分,我们将消息发送到队列里,并从队列中接收消息。现在是时候介绍RabbitMQ中全消息模型。
让我们快速温习下在先前指南中我们掌握的:

一个发送消息的生产者是一个用户程序。
一个存储消息的队列是一个缓冲。
一个接收消息的消费者是一个用户程序。
在RabbitMQ消息模型中核心的思想是生产者从不直接将消息发送给队列。实际上,生产者常常甚至不知道是否一个消息会被传递到队列中。

相反,生产者仅能将消息发送到一个交换机。一个交换机是一个非常简单的事物。在它的一遍,它从生产者那里接收消息,另一边将消息推送到队列中。这个
交换所必须清楚的知道它所接收到的消息要如何处理。是否将它附加到一个特别的队列中?是否将它附加到多个队列中?或者是否它应该被丢弃。规则的定义是由交
换类型决定的。

有几个交换类型:directtopicdeadersfanout。我们来关注最后一个–fanout。让我们创建一个这种类型的交换机并且称呼它为logs:

channel.exchangeDeclare("logs", "fanout");

fanout交换机是非常简单的。通过这个名字你可能已经猜出它的用处了,它会将接收的所有消息都广播到所有它所知道的所有队列。这个真正是我们的记录器所需要的。

交换机列表
为了列出服务器中所有交换机,你可以运行着有用的rabbitmqctl

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在这个列表里有一些以amq.打头的交换机和默认(未命名)的交换机。这些是默认创建的,但是不太可能你会在某个时刻使用它们。
匿名交换机
在先前的指南中我们对交换机毫无了解,但是我们依旧能将消息发送到队列中。那是可能实现的,因为我们使用的是默认交换机,通过我们使用空字符串(““)标识它。
回想一下我们以前是如何发送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

这第一个参数是交换机的名字。空字符串说明它是默认的或者匿名的交换机:路由关键字存在的话,消息通过路由关键字的名字路由到特定的队列上。

现在,我们可以发布我们自己命名的交换机:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列

你可能会想起先前我们使用的队列是有特定的名字的(是否记得hellotask_queue)。命名一个队列对我们来说是至关重要的–我们需要指定工作者到这相同的队列上。当你想把队列分享给生产者和消费者,给队列名是重要的。
但是那不是我们记录器的实例。我们想监听所有日志消息,不仅仅是它们中的子集。我们同样是对当前的消息流感兴趣,而不是旧的。为了解决这个我们需要两件事。
首先,无论我们什么时候连接RabbitMQ,我们需要一个新的,空的队列。为了做到这些,我们可以创建一个随机名字的队列或者更胜一筹-让服务器为我们选择一个随机的名字。
第二部,一旦我们将消费者的连接断开,队列应该自动删除。
在Java客户端里,当我们使用无参数调用queueDeclare()方法,我们创建一个自动产生的名字,不持久化,独占的,自动删除的队列。

String queueName = channel.queueDeclare().getQueue();

在这点,队列名中包含一个随机队列名。例如名字像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定

我们已经创建了一个fanout交换机和队列。现在我们需要告诉交换机发送消息给我们的队列上。这交换机和队列之间的关系称之为一个绑定。

channel.queueBind(queueName, "logs", "");

从现在开始,日志交换所将要附加消息到我们的队列中。

绑定列表
你可以列出存在的绑定使用,使用rabbitmqctl list_bindings

把所有放在一起


这发送日志消息的生产者程序,跟以前指南中的程序没有多少不同。这最重要的改变是我们将匿名的交换机替换为我们想要消息发布到的日志交换机。当发送是我们需要申请一个路由关键字,但是在广播消息是它的值会被忽略。这是EmitLog.java程序的代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

(EmitLog.java source)
如你所知,建立连接后我们声明一个交换机。这个步骤是必须的,因为发布到一个不存在的交换机是禁止的。

如果队列还没有绑定到交换机上,消息将会丢失,但是这个对我们来说是ok的;如果没有消费者正在监听,我们可以安全的丢弃消息。
ReceiveLogs.java代码:

                  java.lang.InterruptedException {
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

(ReceiveLogs.java source)
如以前那样编译,我们已经做了。

$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java

如果你想把日志保存到文件中,仅仅打开一个控制平台,键入:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

如果你想在你的屏幕上看这些日志, 新建一个终端并且运行:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs

当然,为了发出日志键入:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog

使用rabbitmactl list_bindings你可以验证这代码确实创建绑定和我们想要的队列。随着两个ReceiveLogs.java程序的运行你可以看到一些如:

 $ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这结果的解释是直白简单的:来自交换机的日志流向服务器安排的两个队列中。并且那确实我们所期望的。
为了弄明白如何监听一个消息的子集,让我们移到指南的第四部分。

时间: 2024-10-02 02:13:45

转载RabbitMQ入门(3)--发布和订阅的相关文章

转载RabbitMQ入门(6)--远程调用

远程过程调用(RPC) (使用Java客户端) 在指南的第二部分,我们学习了如何使用工作队列将耗时的任务分布到多个工作者中. 但是假如我们需要调用远端计算机的函数,等待结果呢?好吧,这又是另一个故事了.这模式通常被称为远程过程调用或RPC. 在这部分,我们将会使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器.由于我们还没有值得分散的耗时任务,我们将会创建一个虚拟的RPC服务,用来返回Fibonacci(斐波纳契数列). 用户接口 为了说明RPC服务如何使用,我们将会创

转载RabbitMQ入门(4)--路由

路由 (使用Java客户端) 在先前的指南中,我们建立了一个简单的日志系统.我们可以将我们的日志信息广播到多个接收者. 在这部分的指南中,我们将要往其中添加一个功能-让仅仅订阅一个消息的子集成为可能.例如,我们可以直接将关键的错误信息指向到日志文件(保存在爱硬盘空间),同时依旧能打印所有日志信息到平台上. 绑定 在之前的例子里我们已经创建绑定.你可以回顾下代码: channel.queueBind(queueName, EXCHANGE_NAME, ""); A binding is

转载RabbitMQ入门(2)--工作队列

工作队列 (使用Java客户端) 在这第一指南部分,我们写了通过同一命名的队列发送和接受消息.在这一部分,我们将会创建一个工作队列,在多个工作者之间使用分布式时间任务. 工作队列(亦称:任务队列)背后主要的思想是避免立即处理一个资源密集型任务并且不得不一直等待完成.相反我们可以计划着让任务后续执行.我们将任务封装 成消息,发送到队列中.一个工作者进程在后台运行,获取任务并最终执行任务.当你运行多个工作者,所有的任务将会被他们所共享. 在web应用程序中,这个理念是特别有用的,你无法在一个短暂的h

MS SQL Server 2008发布与订阅

本文转载:http://chaoyouzhuo.blog.163.com/blog/static/126376001201173092514498/ 参考文章:http://www.cnblogs.com/gbmf/archive/2009/06/04/1496013.html   如果选择的是"快照发布":则必须在发布服务器需要设置代理时间计划,订阅服务器可以不用设置代理时间计划. 如果选择的是"事务发布":则发布服务器和订阅服务器不需要设置代理时间计划,这样几乎

SQL 2012 发布与订阅实现数据同步 图解(解决 错误22022)

原文:SQL 2012 发布与订阅实现数据同步 图解(解决 错误22022)   概念参见:https://msdn.microsoft.com/zh-cn/library/ms151170.aspx 推送订阅  对于推送订阅,发布服务器将更改传播到订阅服务器,而无需订阅服务器发出请求.   更改可以按需.连续地或按照计划推送到订阅服务器. 分发代理或合并代理在分发服务器上运行. 通常,数据将连续同步或按照经常重复执行的计划同步.    发布要求数据近似实时地移动.   分发服务器上较高的处理器

RabbitMQ消息队列(1):RabbitMQ入门

oldriver老司机技术手册 分享 RabbitMQ消息队列(1):RabbitMQ入门

SqlServer2008 数据库同步的两种方式(发布、订阅使用方法)_mssql2008

上篇中说了通过SQL JOB的方式对数据库的同步,这一节作为上一节的延续介绍通过发布订阅的方式实现数据库之间的同步操作.发布订阅份为两个步骤:1.发布.2订阅.首先在数据源数据库服务器上对需要同步的数据进行发布,然后在目标数据库服务器上对上述发布进行订阅.发布可以发布一张表的部分数据,也可以对整张表进行发布.下面分别介绍发布.订阅的过程. 1.发布.发布需要用实际的服务器名称,不能使用服务器的IP地址进行.能发布的信息包括[表].[存储过程].[用户函数]如果使用IP会有错误,如下图:    具

《Redis设计与实现》学习笔记-发布与订阅、事务、慢查询日志

发布与订阅 Redis通过发布订阅提供一对多甚至是多对多的节点消息通信,发布订阅由PUBLISH.SUBSCRIBE.PSUBSCRIBE.PUBSUB等命令组成. SUBSCRIBE命令:订阅某频道,在redisServer结构中通过pubsub_channels字典属性保存当前服务器所有频道的订阅关系,字典键时频道名称,字典值是一个链表,记录了所有订阅这个频道的客户端. UNSUBSCRIBE命令:退订频道,调用该命令之后,会把订阅关系从pubsub_channels中删掉,如果键对应的链表

【SQL Sever】实现SQL Sever的发布。订阅。 双机热备

实现SQL Sever的发布和订阅  最大的好处就是: 可以实现读写分离,增删改操作在主数据库服务器上进行,查询在备份数据库服务器上进行.一方面提高软件执行效率,另一方面也减轻主库压力. 本次实现发布订阅  主要在两台电脑上安装了  SQL Sever2008实现的.   这里介绍发布和订阅会涉及到的几个名称: 发布服务器(winser2):主库所在的服务器.  分发服务器(winser2):用于传递当主库发生变化(增删改)时发送到订阅服务器的  订阅服务器(vclient):备份服务器