轻松搞定RabbitMQ(六)——主题

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

       在上一篇博文中,我们进一步改良了日志系统。使用Direct类型的转换器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发,如果你还不了解,请阅读:轻松搞定RabbitMQ(四)——发布/订阅

       虽然使用Direct类型的转换器改进了日志系统。但它仍然有一定的局限性——不能根据多重条件进行路由选择。

       在我们的日志系统中,我们可能不仅仅根据日志严重性订阅日志,也想根据发送源订阅。你可能从unix工具syslog了解过这个概念,它可以根据严重性(info/warning/crit…)和设备(auth/cron/kern…)转发日志。

       这将给我们更多的灵活性——我们可以订阅来自元‘cron’的致命错误日志,同时也可以订阅‘kern’的所有日志。

       为了在我们的日志系统中实现上述需求,我们需要了解更复杂的主题类型的转发器——Topic Exchange。

Topic exchange(主题转发器)

       发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如“socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节。

       绑定键也必须以相同的格式。主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

       举一个简单例子,如下图:

       在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。

       我们创建3个绑定:Q1绑定键是“*.orange.*”,Q2绑定键是“*.*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

       路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而"lazy.brown.fox"则只发往第二个队列。“quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

       如果我们违反约定,发送了只带1个或者4个标识符的选择键,像“orange”或者“quick.orange.male.rabbit”,会发生什么呢?这些消息都不匹配任何绑定,所以将被丢弃。

       另外,“lazy.orange.male.rabbit”,尽管有4个标识符,但是仍然匹配最后一个绑定键,所以会发送到第二个队列中。

       注:主题类型的转发器非常强大,可以实现其他类型的转发器。当队列绑定#绑定键,可以接受任何消息,类似于fanout转发器。当特殊字符*和#不包含在绑定键中,这个主题转发器就像一个direct类型的转发器。

完整实例

       我们将主题类型的转发器应用到日志系统中,路由格式为:“<设备>.<严重级别>”。

发送端(EmitLogTopic.java)

public class EmitLogDirect {
	private final static String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");

		//所有设备和日志级别
		String[] facilities ={"auth","cron","kern","auth.A"};
		String[] severities={"error","info","warning"};

		for(int i=0;i<4;i++){
			for(int j=0;j<3;j++){
			//每一个设备,每种日志级别发送一条日志消息
			String routingKey = facilities[i]+"."+severities[j%3];

			// 发送的消息
			String message =" Hello World!"+Strings.repeat(".", i+1);
			//参数1:exchange name
			//参数2:routing key
			channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
			System.out.println(" [x] Sent [" + routingKey +"] : '"+ message + "'");
			}
		}
		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

消费者1(ReceiveLogs2Console.java)

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		//声明exchange
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();

		String[] routingKeys ={"auth.*","*.info","#.warning"};//关注所有的授权日志、所有info和waring级别的日志
		for (String routingKey : routingKeys) {
			//关注所有级别的日志(多重绑定)
			channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
		}
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received ["  + envelope.getRoutingKey() + "] :'" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2(ReceiveLogs2File.java)

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");

	    String severity="kern.error";//只关注核心错误级别的日志,然后记录到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);

		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    //记录日志到文件:
			    print2File( "["+ envelope.getRoutingKey() + "] "+message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}

	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

       最终结果:

时间: 2024-08-03 07:35:26

轻松搞定RabbitMQ(六)——主题的相关文章

轻松搞定RabbitMQ(一)——RabbitMQ基础知识+HelloWorld

       本文是简单介绍一下RabbitMQ,参考官网上的教程.同时加入了一些自己的理解.官网教程详见:"Hello World!". 引言        你是否遇到过多个系统间需要通过定时任务来同步某些数据?        你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?        如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题.消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问题.        本文将要介绍的RabbitMQ就是当

轻松搞定RabbitMQ(五)——路由选择

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html        在前篇博文中,我们建立了一个简单的日志系统.可以广播消息给多个消费者.本篇博文,我们将添加新的特性--我们可以只订阅部分消息.比如:我们可以接收Error级别的消息写入文件.同时仍然可以在控制台打印所有日志. Bindings(绑定)        在上一篇博客中我们已经使用过绑定.类似下面的代码: channel.queueBind(queu

轻松搞定RabbitMQ(四)——发布/订阅

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html        在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者.这次我们做一些完全不同的事儿--将消息发送给多个消费者.这种模式叫做"发布/订阅".        为了说明这个模式,我们将构建一个简单日志系统.它包含2段程序:第一个将发出日志消息,第二个接受并打印消息.        如果在日志系统中每一个接受者(订阅者)

轻松搞定RabbitMQ(七)——远程过程调用RPC

       翻译:http://www.rabbitmq.com/tutorials/tutorial-six-java.html       在第二篇博文中,我们已经了解到了如何使用工作队列来向多个消费者分散耗时任务.       但是付过我们需要在远程电脑上运行一个方法然后等待结果,该怎么办?这是不同的需求.这个模式通常叫做RPC.        本文我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器端.由于我们没有任何真实的耗时任务需要分配,所以我们将创建

轻松搞定RabbitMQ(三)——消息应答与消息持久化

       这个官网的第二个例子中的消息应答和消息持久化部分.我把它摘出来作为单独的一块儿来分享. Message acknowledgment(消息应答)        执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了.基于现在的代码,一旦RabbitMQ将消息分发给了消费者,就会从内存中删除.在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息.        但是,我们不想丢失任何任务,如果有一个消

几行代码轻松搞定网页的简繁转换

简繁转换|网页 对网页进行简繁字体转换的方法一般有两种:一是使用<简繁通>这样的专业软件,另外一种是制作两套版本的网页.显然,这两种方法都较为麻烦,而且专业软件一般不能用于免费的空间.笔者在这里给大家提供一个非常简单的方法,只须在页面上添加几行代码就可以轻松搞定网页的简繁转换了.首先在http://www.knowsky.com/download/transform.js处下载用于简繁转换的js文件transform.js,复制到网站目录下,然后使用网页制作工具打开需要进行简繁转换的网页,

一步一步轻松搞定您的个人数码免冠照

应用目标:制作证件照片 使用软件:Photoshop 7.0(其他版本操作类似) 实现难度:即学即会 相信许多朋友都有这样的经历:填写应聘表.考个驾照或是办个工作证经常会用到免冠照,可每次都想不起来上回留下的底(照)片放在哪里,回到家里翻天覆地地一阵好找,结果常常是无功而返,最后只好一边抱怨自己记性不好一边梳妆打扮走进照相馆,一番讨价还价之后,然后极不情愿地把银子数给照相馆老板,到了下回这样的情形依然会重现.要是我们做好一个无需底片的数码免冠照存在个人电脑或软盘里,随用随扩,岂不是要省掉许多麻烦

用Photoshop滤镜三分钟轻松搞定拼图

滤镜 女儿最喜欢玩拼图游戏,每次买拼图玩具的理由就是它们的画面不一样.家里的拼图玩具堆成山,可是她的兴致依然不减. 一天,我上网时无意中发现了Photoshop的一款制作拼图的滤镜--AV Bros.公司的Puzzle Pro滤镜.下载回来一试,果然效果不错!女儿喜欢的拼图玩具仅用三分钟就轻松搞定了.好东西不敢独享,下面我就给大家详细介绍一下. 首先,到http://download.sina.com.cn/cgi-bin/detail.cgi?s_id=8779去下载并安装这款滤镜,关于如何安

轻松搞定数据访问层[续]

访问|数据 数据库表 Tbl_Teacher 数据字段名称 类型 说明 teaID Int 自动编号 teaCode Char(20) 教师员工号 teaName Nchar(10) ? teaGender Bit ? teaNation Nchar(6) ? teaAge TinyInt ? Tbl_Student 数据字段名称 类型 说明 stuID Int 自动编号 stuCode Char(20) 学生证号 stuTeacherCode Char(20) 班主任的员工号 stuName