轻松搞定RabbitMQ(五)——路由选择

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

       在前篇博文中,我们建立了一个简单的日志系统。可以广播消息给多个消费者。本篇博文,我们将添加新的特性——我们可以只订阅部分消息。比如:我们可以接收Error级别的消息写入文件。同时仍然可以在控制台打印所有日志。

Bindings(绑定)

       在上一篇博客中我们已经使用过绑定。类似下面的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

       绑定表示转换器与队列之间的关系。可以简单的人为:队列对该转发器上的消息感兴趣。

       绑定可以设定额外的routingKey参数。为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。下面展示如何使用绑定键(binding key)来创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

       绑定键关键取决于转换器的类型。对于fanout类型,忽略此参数。

Direct exchange(直接转发)

       前面讲到我们的日志系统广播消息给所有的消费者。我们想对其扩展,根据消息的严重性来过滤消息。例如:我们希望将致命错误的日志消息记录到文件,而不是把磁盘空间浪费在warn和info类型的日志上。我们使用的fanout转发器,不能给我们太多的灵活性。它仅仅只是盲目的广播而已。我们使用direct转发器进行代替,其背后的算法很简单——消息会被推送至绑定键(binding
key)和消息发布附带的选择键(routing key)完全匹配的队列。

       在上图中,我们可以看到direct类型的转发器与2个队列进行了绑定。第一个队列使用的绑定键是orange,第二个队列绑定键为black和green。这样当消息发布到转发器是,附带orange绑定键的消息将被路由到队列Q1中去。附带black和green绑定键的消息被路由到Q2中去。其他消息全部丢弃。

Multiple bindings(多重绑定)

       使用一个绑定键绑定多个队列是完全合法的。如上图,绑定键black绑定了2个队列——Q1和Q2。

Emitting logs(发送日志)

       我们将这种模式用于日志系统,发送消息给direct类型的转发器。我们将 提供日志严重性做为绑定键。那样,接收程序可以选择性的接收严重性的消息。首先关注发送日志的代码:

像往常一样首先创建一个转换器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

       然后为发送消息做准备:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

       为了简化代码,我们假定日志的严重性是‘info’,‘warning’,‘error’中之一。

Subscribing(订阅)

       接收消息跟前面博文中的一样。我们仅需要修改一个地方:为每一个我们感兴趣的严重性的消息,创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

完整的例子


发送端代码(EmitLogDirect.java)

public class EmitLogDirect {
	private final static String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");

		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for(int i=0;i<3;i++){
			String severity = severities[i%3];//每一次发送一条不同严重性的日志

			// 发送的消息
			String message = "Hello World"+Strings.repeat(".", i+1);
			//参数1:exchange name
			//参数2:routing key
			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");
		}
		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

消费者1(ReceiveLogs2Console.java)

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();

		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for (String severity : severities) {
			//关注所有级别的日志(多重绑定)
			channel.queueBind(queueName, EXCHANGE_NAME, severity);
		}
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received '"  + envelope.getRoutingKey() + "':'" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2(ReceiveLogs2File.java)

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();

	    String severity="error";//只关注error级别的日志,然后记录到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);

		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    //记录日志到文件:
			    print2File( "["+ envelope.getRoutingKey() + "] "+message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}

	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

       最终结果:

       罗哩罗嗦的说这么多,其实就是说了这么一件事:我们可以使用Direct exchange+routingKey来过滤自己感兴趣的消息。一个队列可以绑定多个routingKey。这就是我们今天的主题——路由选择。

时间: 2024-11-02 08:25:14

轻松搞定RabbitMQ(五)——路由选择的相关文章

轻松搞定RabbitMQ(六)——主题

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-five-java.html        在上一篇博文中,我们进一步改良了日志系统.使用Direct类型的转换器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发,如果你还不了解,请阅读:轻松搞定RabbitMQ(四)--发布/订阅.        虽然使用Direct类型的转换器改进了日志系统.但它仍然有一定的局限性--不能根据多重条件进行路由选择.  

轻松搞定RabbitMQ(一)——RabbitMQ基础知识+HelloWorld

       本文是简单介绍一下RabbitMQ,参考官网上的教程.同时加入了一些自己的理解.官网教程详见:"Hello World!". 引言        你是否遇到过多个系统间需要通过定时任务来同步某些数据?        你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?        如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题.消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问题.        本文将要介绍的RabbitMQ就是当

轻松搞定RabbitMQ(四)——发布/订阅

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html        在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者.这次我们做一些完全不同的事儿--将消息发送给多个消费者.这种模式叫做"发布/订阅".        为了说明这个模式,我们将构建一个简单日志系统.它包含2段程序:第一个将发出日志消息,第二个接受并打印消息.        如果在日志系统中每一个接受者(订阅者)

轻松搞定RabbitMQ(七)——远程过程调用RPC

       翻译:http://www.rabbitmq.com/tutorials/tutorial-six-java.html       在第二篇博文中,我们已经了解到了如何使用工作队列来向多个消费者分散耗时任务.       但是付过我们需要在远程电脑上运行一个方法然后等待结果,该怎么办?这是不同的需求.这个模式通常叫做RPC.        本文我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器端.由于我们没有任何真实的耗时任务需要分配,所以我们将创建

轻松搞定RabbitMQ(三)——消息应答与消息持久化

       这个官网的第二个例子中的消息应答和消息持久化部分.我把它摘出来作为单独的一块儿来分享. Message acknowledgment(消息应答)        执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了.基于现在的代码,一旦RabbitMQ将消息分发给了消费者,就会从内存中删除.在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息.        但是,我们不想丢失任何任务,如果有一个消

SQL Server数据汇总五招轻松搞定_MsSql

本文我们将讨论如何使用GROUPBY子句来汇总数据. 使用单独列分组 GROUP BY子句通过设置分组条件来汇总数据,在第一个例子中,我在数据库AdventureWork2012中的表 Sales.SalesOrderDetail.中的一列上进行数据分组操作.这个例子以及其他例子都使用数据库AdventureWorks2012,如果你想使用它运行我的代码,你可以点击下载. 下面是第一个示例的源码,在CarrierTrackingNumber列上使用group by子句进行数据分组操作 USE A

五步轻松搞定网站数据分析收集工作

中介交易 http://www.aliyun.com/zixun/aggregation/6858.html">SEO诊断 淘宝客 云主机 技术大厅 网站及产品运营不能凭空而谈,纸上谈兵,要用实实在在的数据说话.如果我们手里有一份针对我们需求的非常完美的数据的话,那对于我们在进行推广的时候,就会有目标,有针对性地去进行,节省了大量的工作量,也能让我们很好地了解竞争对手的情况,从而来达到一个知己知彼,掌控全局的目的.万丈高楼平地起,再完美的数据分析也是建立在数据的基础上的,只有在足够多的数据

一步一步轻松搞定您的个人数码免冠照

应用目标:制作证件照片 使用软件:Photoshop 7.0(其他版本操作类似) 实现难度:即学即会 相信许多朋友都有这样的经历:填写应聘表.考个驾照或是办个工作证经常会用到免冠照,可每次都想不起来上回留下的底(照)片放在哪里,回到家里翻天覆地地一阵好找,结果常常是无功而返,最后只好一边抱怨自己记性不好一边梳妆打扮走进照相馆,一番讨价还价之后,然后极不情愿地把银子数给照相馆老板,到了下回这样的情形依然会重现.要是我们做好一个无需底片的数码免冠照存在个人电脑或软盘里,随用随扩,岂不是要省掉许多麻烦

wps计算试卷总分 文字窗体域+书签计算轻松搞定教程

  大家都知道,一套试卷由多道试题组成,而每道题又有不同的分值,那么老师在出题时如何快速计算试卷总分呢?当然excel能轻松搞定这个问题,但是老师出题肯定是文档的,其实可以利用wps的书签功能来计算,有时候由于题量很大,书签就显得繁琐了,接下来小编就告诉大家用文字型窗体域添加书签的方法来解决这个问题.具体操作如下: 一.文字窗体域 1. 新建空白文档,输入试卷标题的几大题型,在需要插入分值的文字后插入文字型窗体域 :单击"插入"选项卡,单击窗体分组里的"文字型窗体域"