博学,切问,近思--詹子知 (https://jameszhan.github.io)
点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
按照JMS规范,发送消息的步骤如下:
1.从连接工厂中拿出Connecion对象。
2.和服务器建立连接(Connection.start())。
3.创建会话(Session)对象。
4.通过Session,在指定的Queue创建消息生产者(MessageProducer)。
5.使用Session创建消息。
6.使用消息生产者发送消息。 import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
public class Producer {
private String name;
private String dest;
private Connection conn;
private MessageProducer producer;
private Session session;
public Producer(Connection conn, String dest, String name) {
this.conn = conn;
this.dest = dest;
this.name = name;
}
public void start() throws JMSException {
//conn 可以不连接,当发送消息是会自动建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue(dest));
}
public void send(String text) throws JMSException{
TextMessage msg = session.createTextMessage(name + ": " + text);
producer.send(msg);
}
}
接收消息的步骤如下:
1.从连接工厂中拿出Connecion对象。
2.和服务器建立连接(Connection.start())。
3.创建会话(Session)对象。
4.通过Session,在指定的Queue创建消息接受者(MessageConsumer)。
5.1.调用messageConsumer.receive方法接受消息,如果队列上有消息,则receive方法返回该消息对象,如果队列上无消息,则该方法阻塞。
5.2.也可以以为Session指定MessageListener对象的方式来接受消息,该方法的好处在于,一旦有新消息到来,会自动触发该对象的onMessage方法执行。
下类描述了以5.1的方式接受消息。 import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
public class Consumer {
private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;
private Executor executor = Executors.newFixedThreadPool(10);
public Consumer(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}
public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createQueue(dest));
}
public void receive() {
executor.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} else {
System.out.println("msg: " + msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
}
}
下类描述了以5.2的方式接受消息。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
public class Consumer2 implements MessageListener{
private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;
public Consumer2(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}
public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createQueue(dest));
consumer.setMessageListener(this);
}
@Override
public void onMessage(Message msg) {
try {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} catch (JMSException e) {
e.printStackTrace();
}
}
}消息队列的特点是:
1.可以多个生产者对同一个消息队列发送消息。
2.可以多个接受者监听同一个消息对列。
3.消息只能一次性被消费,一旦消息被Consumer1消费了,则Consumer2不可能再拿到这一消息,并且同时该消息被消息队列移除。
4.持久性存储,一旦消息没有被消费,消息会一直保留在消息队列中。
利用消息队列的这一特点,我们可以实现简单的负载均衡,比如,我们可以部署几个相同的Service到不同的机器上,让他们监听同一个Queue,那么客户的请求到来后,消息中间件会动态分配其到某一个Service处理。
上一篇文章,我们介绍了创建连接对象的不同方法,这里我们把这两种方式做一个包装:public class ConnFactory {
private ConnectionFactory factory;
public ConnFactory(){
try {
Context context = new JndiFactory().getJndiContext();
this.factory = (ConnectionFactory) context.lookup("con1");
} catch (NamingException e) {
this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
}
}
public Connection createConnection() throws JMSException{
return factory.createConnection();
}
}
创建不同的生产者对同一队列发送消息。
public static void main(String[] args) throws JMSException {
ConnFactory cf = new ConnFactory();
Producer producer1 = new Producer(cf.createConnection(), "Queue1", "Product1");
Producer producer2 = new Producer(cf.createConnection(), "Queue1", "Product2");
producer1.start();
producer2.start();
for(int i = 0; i < 50; i++){
producer1.send("message " + i);
producer2.send("message " + i);
}
}
创建不同的消费者监听同一对列。
public static void main(String[] args) throws JMSException {
ConnFactory cf = new ConnFactory();
Consumer consumer1 = new Consumer(cf.createConnection(), "Queue1", "Consumer1");
Consumer consumer2 = new Consumer(cf.createConnection(), "Queue1", "Consumer2");
Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Queue1", "Consumer3");
consumer1.start();
consumer2.start();
consumer3.start();
consumer1.receive();
consumer2.receive();
} 实验结果如下(事实上,不同的生产者生产的消息被那个消费者接收到是不确定的):Consumer3 receive message {Product1: message 1}
Consumer3 receive message {Product2: message 2}
Consumer2 receive message {Product2: message 0}
Consumer1 receive message {Product1: message 0}
Consumer1 receive message {Product2: message 1}
Consumer2 receive message {Product1: message 2}
Consumer1 receive message {Product1: message 3}
Consumer2 receive message {Product2: message 3}
Consumer3 receive message {Product1: message 4}
Consumer1 receive message {Product2: message 4}
Consumer2 receive message {Product1: message 5}
Consumer3 receive message {Product2: message 5}
Consumer1 receive message {Product1: message 6}
Consumer2 receive message {Product2: message 6}
Consumer3 receive message {Product1: message 7}
Consumer1 receive message {Product2: message 7}
Consumer2 receive message {Product1: message 8}
Consumer3 receive message {Product2: message 8}
Consumer1 receive message {Product1: message 9}
Consumer2 receive message {Product2: message 9}
Consumer3 receive message {Product1: message 10}
Consumer1 receive message {Product2: message 10}
Consumer2 receive message {Product1: message 11}
Consumer1 receive message {Product1: message 12}
Consumer3 receive message {Product2: message 11}
Consumer2 receive message {Product2: message 12}
Consumer3 receive message {Product1: message 13}
Consumer3 receive message {Product2: message 14}
Consumer3 receive message {Product1: message 16}
Consumer3 receive message {Product2: message 17}
Consumer3 receive message {Product1: message 19}
Consumer3 receive message {Product2: message 20}
Consumer3 receive message {Product1: message 22}
Consumer2 receive message {Product1: message 14}
Consumer1 receive message {Product2: message 13}
Consumer1 receive message {Product1: message 15}
Consumer1 receive message {Product2: message 16}
Consumer2 receive message {Product2: message 15}
Consumer2 receive message {Product1: message 17}
Consumer1 receive message {Product1: message 18}
Consumer1 receive message {Product2: message 19}
Consumer1 receive message {Product1: message 21}
Consumer1 receive message {Product2: message 22}
Consumer2 receive message {Product2: message 18}
Consumer2 receive message {Product1: message 20}
Consumer2 receive message {Product2: message 21}
Consumer2 receive message {Product1: message 23}
Consumer3 receive message {Product2: message 23}
Consumer1 receive message {Product1: message 24}
Consumer3 receive message {Product1: message 25}
Consumer2 receive message {Product2: message 24}
Consumer2 receive message {Product1: message 26}
Consumer1 receive message {Product2: message 25}
Consumer3 receive message {Product2: message 26}
Consumer2 receive message {Product2: message 27}
Consumer1 receive message {Product1: message 27}
Consumer3 receive message {Product1: message 28}
Consumer1 receive message {Product2: message 28}
Consumer2 receive message {Product1: message 29}
Consumer3 receive message {Product2: message 29}
Consumer1 receive message {Product1: message 30}
Consumer3 receive message {Product1: message 31}
Consumer1 receive message {Product2: message 31}
Consumer2 receive message {Product2: message 30}
Consumer2 receive message {Product1: message 32}
Consumer3 receive message {Product2: message 32}
Consumer1 receive message {Product1: message 33}
Consumer3 receive message {Product1: message 34}
Consumer2 receive message {Product2: message 33}
Consumer2 receive message {Product1: message 35}
Consumer3 receive message {Product2: message 35}
Consumer1 receive message {Product2: message 34}
Consumer1 receive message {Product1: message 36}
Consumer2 receive message {Product2: message 36}
Consumer3 receive message {Product1: message 37}
Consumer1 receive message {Product2: message 37}
Consumer2 receive message {Product1: message 38}
Consumer1 receive message {Product1: message 39}
Consumer3 receive message {Product2: message 38}
Consumer2 receive message {Product2: message 39}
Consumer3 receive message {Product1: message 40}
Consumer1 receive message {Product2: message 40}
Consumer2 receive message {Product1: message 41}
Consumer3 receive message {Product2: message 41}
Consumer2 receive message {Product2: message 42}
Consumer3 receive message {Product1: message 43}
Consumer1 receive message {Product1: message 42}
Consumer2 receive message {Product1: message 44}
Consumer1 receive message {Product2: message 43}
Consumer3 receive message {Product2: message 44}
Consumer1 receive message {Product1: message 45}
Consumer2 receive message {Product2: message 45}
Consumer3 receive message {Product1: message 46}
Consumer1 receive message {Product2: message 46}
Consumer2 receive message {Product1: message 47}
Consumer3 receive message {Product2: message 47}
Consumer1 receive message {Product1: message 48}
Consumer3 receive message {Product1: message 49}
Consumer2 receive message {Product2: message 48}
Consumer1 receive message {Product2: message 49}如果你先执行发送消息的程序,在启动接受消息的程序,所有的消息都有可能被同一消费者消费,这是ActiveMQ为了提高效率,重用了同一个连接传输了所有的消息。其他的MQ产品未必会这么做,SnoicMQ它就会以一种随机的方式分发给不同的消费者。一旦你创建好消费者先监听消息队列,然后,再发送消息,由于这个时候,消费者与JMS
Server之间的连接都已经建立,所以消息会随机的分发到不同的消费者。