消息中间件ActiveMQ(3)--P2P实验

博学,切问,近思--詹子知 (https://jameszhan.github.io)

点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。 

按照JMS规范,发送消息的步骤如下:

1.从连接工厂中拿出Connecion对象。

2.和服务器建立连接(Connection.start())。

3.创建会话(Session)对象。

4.通过Session,在指定的Queue创建消息生产者(MessageProducer)。

5.使用Session创建消息。

6.使用消息生产者发送消息。 import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Producer {

private String name;
private String dest;
private Connection conn;
private MessageProducer producer;
private Session session;

public Producer(Connection conn, String dest, String name) {
this.conn = conn;
this.dest = dest;
this.name = name;
}

public void start() throws JMSException {
//conn 可以不连接,当发送消息是会自动建立连接。
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue(dest));
}

public void send(String text) throws JMSException{
TextMessage msg = session.createTextMessage(name + ": " + text);
producer.send(msg);
}
}
   

接收消息的步骤如下: 

1.从连接工厂中拿出Connecion对象。

2.和服务器建立连接(Connection.start())。 

3.创建会话(Session)对象。

4.通过Session,在指定的Queue创建消息接受者(MessageConsumer)。

5.1.调用messageConsumer.receive方法接受消息,如果队列上有消息,则receive方法返回该消息对象,如果队列上无消息,则该方法阻塞。

5.2.也可以以为Session指定MessageListener对象的方式来接受消息,该方法的好处在于,一旦有新消息到来,会自动触发该对象的onMessage方法执行。 

下类描述了以5.1的方式接受消息。 import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Consumer {
private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;
private Executor executor = Executors.newFixedThreadPool(10);

public Consumer(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}

public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createQueue(dest));
}

public void receive() {
executor.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} else {
System.out.println("msg: " + msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
}

}

下类描述了以5.2的方式接受消息。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Consumer2 implements MessageListener{

private String name;
private String dest;
private Connection conn;
private MessageConsumer consumer;
private Session session;

public Consumer2(Connection conn, String dest, String name){
this.conn = conn;
this.dest = dest;
this.name = name;
}

public void start() throws JMSException{
//使用Consumer之前,必须调用conn的start方法建立连接。
conn.start();

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createQueue(dest));
consumer.setMessageListener(this);
}

@Override
public void onMessage(Message msg) {
try {
System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}");
} catch (JMSException e) {
e.printStackTrace();
}
}

}消息队列的特点是:

1.可以多个生产者对同一个消息队列发送消息。

2.可以多个接受者监听同一个消息对列。

3.消息只能一次性被消费,一旦消息被Consumer1消费了,则Consumer2不可能再拿到这一消息,并且同时该消息被消息队列移除。

4.持久性存储,一旦消息没有被消费,消息会一直保留在消息队列中。 

利用消息队列的这一特点,我们可以实现简单的负载均衡,比如,我们可以部署几个相同的Service到不同的机器上,让他们监听同一个Queue,那么客户的请求到来后,消息中间件会动态分配其到某一个Service处理。 

上一篇文章,我们介绍了创建连接对象的不同方法,这里我们把这两种方式做一个包装:public class ConnFactory {

private ConnectionFactory factory;

public ConnFactory(){
try {
Context context = new JndiFactory().getJndiContext();
this.factory = (ConnectionFactory) context.lookup("con1");
} catch (NamingException e) {
this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
}
}

public Connection createConnection() throws JMSException{
return factory.createConnection();
}

}  
创建不同的生产者对同一队列发送消息。

public static void main(String[] args) throws JMSException {
ConnFactory cf = new ConnFactory();

Producer producer1 = new Producer(cf.createConnection(), "Queue1", "Product1");
Producer producer2 = new Producer(cf.createConnection(), "Queue1", "Product2");

producer1.start();
producer2.start();

for(int i = 0; i < 50; i++){
producer1.send("message " + i);
producer2.send("message " + i);
}

}

创建不同的消费者监听同一对列。

public static void main(String[] args) throws JMSException {

ConnFactory cf = new ConnFactory();

Consumer consumer1 = new Consumer(cf.createConnection(), "Queue1", "Consumer1");
Consumer consumer2 = new Consumer(cf.createConnection(), "Queue1", "Consumer2");
Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Queue1", "Consumer3");

consumer1.start();
consumer2.start();
consumer3.start();

consumer1.receive();
consumer2.receive();
} 实验结果如下(事实上,不同的生产者生产的消息被那个消费者接收到是不确定的):Consumer3 receive message {Product1: message 1}
Consumer3 receive message {Product2: message 2}
Consumer2 receive message {Product2: message 0}
Consumer1 receive message {Product1: message 0}
Consumer1 receive message {Product2: message 1}
Consumer2 receive message {Product1: message 2}
Consumer1 receive message {Product1: message 3}
Consumer2 receive message {Product2: message 3}
Consumer3 receive message {Product1: message 4}
Consumer1 receive message {Product2: message 4}
Consumer2 receive message {Product1: message 5}
Consumer3 receive message {Product2: message 5}
Consumer1 receive message {Product1: message 6}
Consumer2 receive message {Product2: message 6}
Consumer3 receive message {Product1: message 7}
Consumer1 receive message {Product2: message 7}
Consumer2 receive message {Product1: message 8}
Consumer3 receive message {Product2: message 8}
Consumer1 receive message {Product1: message 9}
Consumer2 receive message {Product2: message 9}
Consumer3 receive message {Product1: message 10}
Consumer1 receive message {Product2: message 10}
Consumer2 receive message {Product1: message 11}
Consumer1 receive message {Product1: message 12}
Consumer3 receive message {Product2: message 11}
Consumer2 receive message {Product2: message 12}
Consumer3 receive message {Product1: message 13}
Consumer3 receive message {Product2: message 14}
Consumer3 receive message {Product1: message 16}
Consumer3 receive message {Product2: message 17}
Consumer3 receive message {Product1: message 19}
Consumer3 receive message {Product2: message 20}
Consumer3 receive message {Product1: message 22}
Consumer2 receive message {Product1: message 14}
Consumer1 receive message {Product2: message 13}
Consumer1 receive message {Product1: message 15}
Consumer1 receive message {Product2: message 16}
Consumer2 receive message {Product2: message 15}
Consumer2 receive message {Product1: message 17}
Consumer1 receive message {Product1: message 18}
Consumer1 receive message {Product2: message 19}
Consumer1 receive message {Product1: message 21}
Consumer1 receive message {Product2: message 22}
Consumer2 receive message {Product2: message 18}
Consumer2 receive message {Product1: message 20}
Consumer2 receive message {Product2: message 21}
Consumer2 receive message {Product1: message 23}
Consumer3 receive message {Product2: message 23}
Consumer1 receive message {Product1: message 24}
Consumer3 receive message {Product1: message 25}
Consumer2 receive message {Product2: message 24}
Consumer2 receive message {Product1: message 26}
Consumer1 receive message {Product2: message 25}
Consumer3 receive message {Product2: message 26}
Consumer2 receive message {Product2: message 27}
Consumer1 receive message {Product1: message 27}
Consumer3 receive message {Product1: message 28}
Consumer1 receive message {Product2: message 28}
Consumer2 receive message {Product1: message 29}
Consumer3 receive message {Product2: message 29}
Consumer1 receive message {Product1: message 30}
Consumer3 receive message {Product1: message 31}
Consumer1 receive message {Product2: message 31}
Consumer2 receive message {Product2: message 30}
Consumer2 receive message {Product1: message 32}
Consumer3 receive message {Product2: message 32}
Consumer1 receive message {Product1: message 33}
Consumer3 receive message {Product1: message 34}
Consumer2 receive message {Product2: message 33}
Consumer2 receive message {Product1: message 35}
Consumer3 receive message {Product2: message 35}
Consumer1 receive message {Product2: message 34}
Consumer1 receive message {Product1: message 36}
Consumer2 receive message {Product2: message 36}
Consumer3 receive message {Product1: message 37}
Consumer1 receive message {Product2: message 37}
Consumer2 receive message {Product1: message 38}
Consumer1 receive message {Product1: message 39}
Consumer3 receive message {Product2: message 38}
Consumer2 receive message {Product2: message 39}
Consumer3 receive message {Product1: message 40}
Consumer1 receive message {Product2: message 40}
Consumer2 receive message {Product1: message 41}
Consumer3 receive message {Product2: message 41}
Consumer2 receive message {Product2: message 42}
Consumer3 receive message {Product1: message 43}
Consumer1 receive message {Product1: message 42}
Consumer2 receive message {Product1: message 44}
Consumer1 receive message {Product2: message 43}
Consumer3 receive message {Product2: message 44}
Consumer1 receive message {Product1: message 45}
Consumer2 receive message {Product2: message 45}
Consumer3 receive message {Product1: message 46}
Consumer1 receive message {Product2: message 46}
Consumer2 receive message {Product1: message 47}
Consumer3 receive message {Product2: message 47}
Consumer1 receive message {Product1: message 48}
Consumer3 receive message {Product1: message 49}
Consumer2 receive message {Product2: message 48}
Consumer1 receive message {Product2: message 49}如果你先执行发送消息的程序,在启动接受消息的程序,所有的消息都有可能被同一消费者消费,这是ActiveMQ为了提高效率,重用了同一个连接传输了所有的消息。其他的MQ产品未必会这么做,SnoicMQ它就会以一种随机的方式分发给不同的消费者。一旦你创建好消费者先监听消息队列,然后,再发送消息,由于这个时候,消费者与JMS
Server之间的连接都已经建立,所以消息会随机的分发到不同的消费者。 

时间: 2024-08-27 21:37:42

消息中间件ActiveMQ(3)--P2P实验的相关文章

消息中间件ActiveMQ(4)--Publisher/Subscriber实验

博学,切问,近思--詹子知 (https://jameszhan.github.io) 发布/订阅(Publish/Subscribe)模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息.发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发.在MQ家族产品中,MQ Event Broker

消息中间件ActiveMQ(1)--创建服务

博学,切问,近思--詹子知(http://blog.csdn.net/zhiqiangzhan)  消息中间件(MOM)在SOA架构和ESB产品中常常扮演着核心的角色,现在的消息中间产品也有很多,商业的产品有IBM 的 MQSeries.BEA的 Weblogic JMS service和 Progress 的 SonicMQ,开源的则有Sun 的 OpenMQ,ActiveMQ,JbossMQ等.尽管消息中间件产品纷繁复杂,但是其使用方式却毫无二致,原因是它们都遵循JMS规范.本文着重就Act

消息中间件ActiveMQ(2)--创建连接对象

博学,切问,近思--詹子知 (https://jameszhan.github.io) 1.直接使用应用程序创建. public static void main(String[] args) throws JMSException { String uri = "tcp://tcp://localhost:61616"; // 创建连接工厂. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri

JMS性能测试方案

JMS综述 1.相关概念 1)JMS jms即Java消息服务(Java Message Service) 是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持,它提供标准的产生.发送.接收消息的接口简化企业应用的开发.是一组接口和相关语义的集合,定义了JMS客户端如何获取企业消息产品的功能.JMS并不是一个MOM.它是一个API,抽象了客户端和

消息中间件kafka与activemq、rabbitmq、zeromq、rocketmq的比较

  消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件. 分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 常用消息队列有:kafka.activemq.rabbitmq等.   一.kafka: 1.不完全符合jms规范,注重吞吐量,类似udp 和 tcp: 2.一般做大数据吞吐的管道 我们现在的用途就是负责在各个idc之间通信: 3.量大 对数据不是百分之百

activemq-利用ajax客户端就行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊?

问题描述 利用ajax客户端就行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊? 利用ajax客户端进行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊?具体语法怎么写啊? 解决方案 var amq = org.activemq.Amq; amq.init({ uri: 'amq.servlet', logging: true, timeout: 20, clientId:'topicClientA' }); var myHandler = { rcvM

消息中间件-平台使用Activemq传递消息在机器上部署应用后,如果在机器端删除应用,如何通知平台更改其状态

问题描述 平台使用Activemq传递消息在机器上部署应用后,如果在机器端删除应用,如何通知平台更改其状态 我是一名学生,最近在学习Activemq这个消息中间件,简单的单向传递消息什么的已经弄明白了,但是我知道那些应用托管平台也都是使用activemq来传递消息进行自己应用的部署,那么当平台使用Activemq传递消息在机器上部署应用后,如果在机器端删除应用,如何通知平台更改其状态呢? 我的猜想: 1.activemq自身有什么传递的信息同步或反馈机制我不了解可以实现部署应用的状态同步. 2.

ActiveMQ消息中间件Producer和Consumer

生产者代码: package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessa

开源 VS 商业,消息中间件你不知道的那些事

11月23日,新炬网络中间件技术专家刘拓老师在DBA+社群中间件用户组进行了一次主题为"开源 VS 商业,消息中间件你不知道的那些事"的线上分享.小编特别整理出其中精华内容,供大家学习交流.  嘉宾简介   新炬网络中间件技术专家 曾任职于IBM华南GTS 4年,IBM WebSphere.MQ.CICS产品线技术专家 5年移动运营商(广东移动.浙江移动)运维经验,3年JAVA开发及售后经验 演讲实录   随着云计算的兴起,Docker.微服务的流行,分布式消息队列技术成为云计算平台中