消息中间件ActiveMQ(4)--Publisher/Subscriber实验

博学,切问,近思--詹子知 (https://jameszhan.github.io)

发布/订阅(Publish/Subscribe)模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。在MQ家族产品中,MQ Event Broker是专门用于使用发布/订阅技术进行数据通讯的产品,它支持基于队列和直接基于TCP/IP两种方式的发布和订阅。

在开始编程之前,我们先看一下点对点和发布/订阅接口的关系:

JMS 公共 PTP 域 Pub/Sub 域
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber

JMS 1.1 通过统一的域简化了消息传递,在编程中,我们实际上只需要使用JMS公共域编程即可,对于P2P模式和Pub/Sub模式在编程方式上几乎毫无区别。我们再看一下上文提到的那个JMS类关系图:

按照JMS规范,发布消息的步骤如下:

  1. 从连接工厂中拿出Connecion对象。
  2. 和服务器建立连接(Connection.start())。
  3. 创建会话(Session)对象。
  4. 通过Session,在指定的Topic创建消息发布者(MessageProducer)。
  5. 使用Session创建消息。
  6. 使用消息生产者发布消息。

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Producer {

private String name;
private String dest;
private Connection conn;
private MessageProducer producer;
private Session session;

public Producer(Connection conn, String dest, String name) {
this.conn = conn;
this.dest = dest;
this.name = name;
}

public void start() throws JMSException {
//conn 可以不连接,当发送消息是会自动建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createTopic(dest));
}

public void send(String text) throws JMSException{
TextMessage msg = session.createTextMessage(name + ": " + text);
producer.send(msg);
}
}

按照JMS规范,订阅消息的步骤如下:

  1. 从连接工厂中拿出Connecion对象。
  2. 和服务器建立连接(Connection.start())。
  3. 创建会话(Session)对象。
  4. 通过Session,在指定的Topic创建消息订阅者(MessageConsumer)。
  5. 订阅消息:
    • 5.1.调用messageConsumer.receive方法接受消息,如果队列上有消息,则receive方法返回该消息对象,如果队列上无消息,则该方法阻塞。
    • 5.2.也可以以为Session指定MessageListener对象的方式来订阅消息,该方法的好处在于,一旦有新消息到来,会自动触发该对象的onMessage方法执行。

下类描述了以5.1的方式接受消息。 import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Consumer {
private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;
private Executor executor = Executors.newFixedThreadPool(10);

public Consumer(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}

public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic(dest));
}

public void receive() {
executor.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} else {
System.out.println("msg: " + msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
}

} 下类描述了以5.2的方式接受消息。 import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Consumer2 implements MessageListener{

private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;

public Consumer2(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}

public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic(dest));
consumer.setMessageListener(this);
}

@Override
public void onMessage(Message msg) {
try {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} catch (JMSException e) {
e.printStackTrace();
}
}

}

发布/订阅者模式的特点是:

  1. 可以多个发布者对同一个Topic发布消息。
  2. 可以多个订阅者监听同一个Topic。
  3. 消息将被所有的订阅者接收。默认情况下,消息只会发送给所有在线的订阅者,一旦消息发送给了所有在线的订阅者,消息就会从Topic中移除。
  4. 可以特别地为主题创建持久的订阅者,只要消息不被该消费者消费,消息就会一直保留在Topic中,一旦该持久订阅者上线,消息会自动发送给该订阅者。

在文章 消息中间件ActiveMQ(2)--创建连接对象 中,我们介绍了创建连接对象的不同方法,这里我们把这两种方式做一个包装: public class ConnFactory {

private ConnectionFactory factory;

public ConnFactory(){
try {
Context context = new JndiFactory().getJndiContext();
this.factory = (ConnectionFactory) context.lookup("con1");
} catch (NamingException e) {
this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
}
}

public Connection createConnection() throws JMSException{
return factory.createConnection();
}
}
创建不同的发布者对同一Topic发送消息。 public static void main(String[] args) throws JMSException {

ConnFactory cf = new ConnFactory();

Producer producer1 = new Producer(cf.createConnection(), "Topic1", "Product1");
Producer producer2 = new Producer(cf.createConnection(), "Topic1", "Product2");

producer1.start();
producer2.start();

for(int i = 0; i < 6; i++){
producer1.send("message " + i);
producer2.send("message " + i);
}

}
创建不同的订阅者监听同一Topic。
public static void main(String[] args) throws JMSException {

ConnFactory cf = new ConnFactory();

Consumer consumer1 = new Consumer(cf.createConnection(), "Topic1", "Consumer1");
Consumer consumer2 = new Consumer(cf.createConnection(), "Topic1", "Consumer2");
Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Topic1", "Consumer3");

consumer1.start();
consumer2.start();
consumer3.start();

consumer1.receive();
consumer2.receive();

}
实验结果如下(注意事项,应先启动订阅者监听Topic,再使用发布者发布消息。): Consumer3 receive message {Product1: message 0}
Consumer1 receive message {Product1: message 0}
Consumer2 receive message {Product1: message 0}
Consumer3 receive message {Product2: message 0}
Consumer1 receive message {Product2: message 0}
Consumer2 receive message {Product2: message 0}
Consumer2 receive message {Product1: message 1}
Consumer3 receive message {Product1: message 1}
Consumer1 receive message {Product1: message 1}
Consumer3 receive message {Product2: message 1}
Consumer2 receive message {Product2: message 1}
Consumer1 receive message {Product2: message 1}
Consumer3 receive message {Product1: message 2}
Consumer2 receive message {Product1: message 2}
Consumer1 receive message {Product1: message 2}
Consumer1 receive message {Product2: message 2}
Consumer2 receive message {Product2: message 2}
Consumer3 receive message {Product2: message 2}
Consumer3 receive message {Product1: message 3}
Consumer2 receive message {Product1: message 3}
Consumer3 receive message {Product2: message 3}
Consumer1 receive message {Product1: message 3}
Consumer1 receive message {Product2: message 3}
Consumer3 receive message {Product1: message 4}
Consumer1 receive message {Product1: message 4}
Consumer2 receive message {Product2: message 3}
Consumer3 receive message {Product2: message 4}
Consumer1 receive message {Product2: message 4}
Consumer3 receive message {Product1: message 5}
Consumer2 receive message {Product1: message 4}
Consumer3 receive message {Product2: message 5}
Consumer1 receive message {Product1: message 5}
Consumer2 receive message {Product2: message 4}
Consumer1 receive message {Product2: message 5}
Consumer2 receive message {Product1: message 5}
Consumer2 receive message {Product2: message 5}

时间: 2025-01-08 16:08:33

消息中间件ActiveMQ(4)--Publisher/Subscriber实验的相关文章

消息中间件ActiveMQ(3)--P2P实验

博学,切问,近思--詹子知 (https://jameszhan.github.io) 点对点方式是最为传统和常见的通讯方式,它支持一对一.一对多.多对多.多对一等多种配置方式,支持树状.网状等多种拓扑结构.  按照JMS规范,发送消息的步骤如下: 1.从连接工厂中拿出Connecion对象. 2.和服务器建立连接(Connection.start()). 3.创建会话(Session)对象. 4.通过Session,在指定的Queue创建消息生产者(MessageProducer). 5.使用

消息中间件ActiveMQ(1)--创建服务

博学,切问,近思--詹子知(http://blog.csdn.net/zhiqiangzhan)  消息中间件(MOM)在SOA架构和ESB产品中常常扮演着核心的角色,现在的消息中间产品也有很多,商业的产品有IBM 的 MQSeries.BEA的 Weblogic JMS service和 Progress 的 SonicMQ,开源的则有Sun 的 OpenMQ,ActiveMQ,JbossMQ等.尽管消息中间件产品纷繁复杂,但是其使用方式却毫无二致,原因是它们都遵循JMS规范.本文着重就Act

消息中间件ActiveMQ(2)--创建连接对象

博学,切问,近思--詹子知 (https://jameszhan.github.io) 1.直接使用应用程序创建. public static void main(String[] args) throws JMSException { String uri = "tcp://tcp://localhost:61616"; // 创建连接工厂. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri

ActiveMQ消息队列侦听的问题

问题描述 实现MessageListener接口,重写onMessage()方法来侦听信息,只是不明白为什么ActiveMQ自带的例子程序侦听程序启动之后不会停止,除非我手动停止该线程.我自己仿照的却会很快停止,自己写的源代码如下(仿照ActiveMQ自带的点对点通讯的例子),主方法是最下面的testReceiveMessage()希望高手分析分析:packagecom.sjhr.jms.activemq;importjava.util.ArrayList;importjava.util.Arr

JMS性能测试方案

JMS综述 1.相关概念 1)JMS jms即Java消息服务(Java Message Service) 是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持,它提供标准的产生.发送.接收消息的接口简化企业应用的开发.是一组接口和相关语义的集合,定义了JMS客户端如何获取企业消息产品的功能.JMS并不是一个MOM.它是一个API,抽象了客户端和

Android简明开发教程四:Android应用基本概念

Android平台的一个显著的特点是"低耦合".Activity是Android应用的一个最基本的用户UI模块.如果采用Windows Form 应用作为参照,Activity相当于Windows中的WinForm.和Windows 应用不同的是,运行一个Activity或是Activity之间的交互是 通过消息来实现的.也就是说如果想在起动一个Activity或是在一个Activity中启动另一个Activity,是通过发送Intent消息来 触发,而不像Windows WinFor

Java设计模式之Observer模式

Observer模式也叫观察者模式,是由GoF提出的23种软件设计模式的一种.Observer模式是行为模式之一,它的作用是当一个对象的状态发生变化时,能够自动通知其他关联对象,自动刷新对象状态. 本文介绍设计模式中的(Observer)模式的概念,用法,以及实际应用中怎么样使用Observer模式进行开发. Observer模式的概念 Observer模式是行为模式之一,它的作用是当一个对象的状态发生变化时,能够自动通知其他关联对象,自动刷新对象状态. Observer模式提供给关联对象一种同

观察者模式的java实现

Java事件模型 在我的前两篇介绍C#事件和委托的blog 发表之后,大家响应特别热烈,点击率很高,看来事件/委托机制是很多同仁比较模糊的地方,借此东风,加上最近自己转战java,于是决定写这篇介绍java事件机制的blog. 其实,不管哪种语言的事件机制,毫无例外都逃不出三点:事件源/发送者,事件的接受者/处理者/侦听者,以及事件源向事件接受者传递的事件信息.对应在java中,事件源 (event source),事件倾听者 (event listener),事件消息称为eventobject

浅谈RxJava与2.0的新特性

简介 说起 RxJava ,相信诸多 Android 开发者都不会陌生.作为一个知名的响应式编程库,从前年开始逐渐变得火热,从小众到被众多 Android 开发者们广泛引入与流传,其在 GitHub 的 仓库 截止笔者写这篇文章时,已经有16400+个 star .甚至有一些大牛专门为 Android 写了 RxJava 的适配库,如 RxAndroid RxBinding RxLifecycle 为什么 RxJava 如此受到 Android 开发者们的欢迎.我想不外乎两个原因. 1. 异步