博学,切问,近思--詹子知 (https://jameszhan.github.io)
发布/订阅(Publish/Subscribe)模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。在MQ家族产品中,MQ Event Broker是专门用于使用发布/订阅技术进行数据通讯的产品,它支持基于队列和直接基于TCP/IP两种方式的发布和订阅。
在开始编程之前,我们先看一下点对点和发布/订阅接口的关系:
JMS 公共 | PTP 域 | Pub/Sub 域 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
JMS 1.1 通过统一的域简化了消息传递,在编程中,我们实际上只需要使用JMS公共域编程即可,对于P2P模式和Pub/Sub模式在编程方式上几乎毫无区别。我们再看一下上文提到的那个JMS类关系图:
按照JMS规范,发布消息的步骤如下:
- 从连接工厂中拿出Connecion对象。
- 和服务器建立连接(Connection.start())。
- 创建会话(Session)对象。
- 通过Session,在指定的Topic创建消息发布者(MessageProducer)。
- 使用Session创建消息。
- 使用消息生产者发布消息。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
public class Producer {
private String name;
private String dest;
private Connection conn;
private MessageProducer producer;
private Session session;
public Producer(Connection conn, String dest, String name) {
this.conn = conn;
this.dest = dest;
this.name = name;
}
public void start() throws JMSException {
//conn 可以不连接,当发送消息是会自动建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createTopic(dest));
}
public void send(String text) throws JMSException{
TextMessage msg = session.createTextMessage(name + ": " + text);
producer.send(msg);
}
}
按照JMS规范,订阅消息的步骤如下:
- 从连接工厂中拿出Connecion对象。
- 和服务器建立连接(Connection.start())。
- 创建会话(Session)对象。
- 通过Session,在指定的Topic创建消息订阅者(MessageConsumer)。
- 订阅消息:
- 5.1.调用messageConsumer.receive方法接受消息,如果队列上有消息,则receive方法返回该消息对象,如果队列上无消息,则该方法阻塞。
- 5.2.也可以以为Session指定MessageListener对象的方式来订阅消息,该方法的好处在于,一旦有新消息到来,会自动触发该对象的onMessage方法执行。
下类描述了以5.1的方式接受消息。 import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
public class Consumer {
private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;
private Executor executor = Executors.newFixedThreadPool(10);
public Consumer(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}
public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic(dest));
}
public void receive() {
executor.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} else {
System.out.println("msg: " + msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
}
} 下类描述了以5.2的方式接受消息。 import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
public class Consumer2 implements MessageListener{
private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;
public Consumer2(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}
public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic(dest));
consumer.setMessageListener(this);
}
@Override
public void onMessage(Message msg) {
try {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
发布/订阅者模式的特点是:
- 可以多个发布者对同一个Topic发布消息。
- 可以多个订阅者监听同一个Topic。
- 消息将被所有的订阅者接收。默认情况下,消息只会发送给所有在线的订阅者,一旦消息发送给了所有在线的订阅者,消息就会从Topic中移除。
- 可以特别地为主题创建持久的订阅者,只要消息不被该消费者消费,消息就会一直保留在Topic中,一旦该持久订阅者上线,消息会自动发送给该订阅者。
在文章 消息中间件ActiveMQ(2)--创建连接对象 中,我们介绍了创建连接对象的不同方法,这里我们把这两种方式做一个包装: public class ConnFactory {
private ConnectionFactory factory;
public ConnFactory(){
try {
Context context = new JndiFactory().getJndiContext();
this.factory = (ConnectionFactory) context.lookup("con1");
} catch (NamingException e) {
this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
}
}
public Connection createConnection() throws JMSException{
return factory.createConnection();
}
}
创建不同的发布者对同一Topic发送消息。 public static void main(String[] args) throws JMSException {
ConnFactory cf = new ConnFactory();
Producer producer1 = new Producer(cf.createConnection(), "Topic1", "Product1");
Producer producer2 = new Producer(cf.createConnection(), "Topic1", "Product2");
producer1.start();
producer2.start();
for(int i = 0; i < 6; i++){
producer1.send("message " + i);
producer2.send("message " + i);
}
}
创建不同的订阅者监听同一Topic。
public static void main(String[] args) throws JMSException {
ConnFactory cf = new ConnFactory();
Consumer consumer1 = new Consumer(cf.createConnection(), "Topic1", "Consumer1");
Consumer consumer2 = new Consumer(cf.createConnection(), "Topic1", "Consumer2");
Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Topic1", "Consumer3");
consumer1.start();
consumer2.start();
consumer3.start();
consumer1.receive();
consumer2.receive();
}
实验结果如下(注意事项,应先启动订阅者监听Topic,再使用发布者发布消息。): Consumer3 receive message {Product1: message 0}
Consumer1 receive message {Product1: message 0}
Consumer2 receive message {Product1: message 0}
Consumer3 receive message {Product2: message 0}
Consumer1 receive message {Product2: message 0}
Consumer2 receive message {Product2: message 0}
Consumer2 receive message {Product1: message 1}
Consumer3 receive message {Product1: message 1}
Consumer1 receive message {Product1: message 1}
Consumer3 receive message {Product2: message 1}
Consumer2 receive message {Product2: message 1}
Consumer1 receive message {Product2: message 1}
Consumer3 receive message {Product1: message 2}
Consumer2 receive message {Product1: message 2}
Consumer1 receive message {Product1: message 2}
Consumer1 receive message {Product2: message 2}
Consumer2 receive message {Product2: message 2}
Consumer3 receive message {Product2: message 2}
Consumer3 receive message {Product1: message 3}
Consumer2 receive message {Product1: message 3}
Consumer3 receive message {Product2: message 3}
Consumer1 receive message {Product1: message 3}
Consumer1 receive message {Product2: message 3}
Consumer3 receive message {Product1: message 4}
Consumer1 receive message {Product1: message 4}
Consumer2 receive message {Product2: message 3}
Consumer3 receive message {Product2: message 4}
Consumer1 receive message {Product2: message 4}
Consumer3 receive message {Product1: message 5}
Consumer2 receive message {Product1: message 4}
Consumer3 receive message {Product2: message 5}
Consumer1 receive message {Product1: message 5}
Consumer2 receive message {Product2: message 4}
Consumer1 receive message {Product2: message 5}
Consumer2 receive message {Product1: message 5}
Consumer2 receive message {Product2: message 5}