ActiveMQ消息中间件Producer和Consumer

生产者代码:

package com.java1234.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    private static final int SENDNUM=10; // 发送的消息数量
    
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生产者
        
        // 实例化连接工厂
        connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
        
        try {
            connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start(); // 启动连接
            session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination=session.createQueue("FirstQueue1"); // 创建消息队列
            messageProducer=session.createProducer(destination); // 创建消息生产者
            sendMessage(session, messageProducer); // 发送消息
            session.commit();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally{
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送消息
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
        for(int i=0;i<JMSProducer.SENDNUM;i++){
            TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
            System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);
            messageProducer.send(message);
        }
    }
}

消费者分两种:

1.主动接受消息receive()

package com.java1234.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * @author Administrator
 *
 */
public class JMSConsumer {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息的消费者
        
        // 实例化连接工厂
        connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
                
        try {
            connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
            connection.start(); // 启动连接
            session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
            messageConsumer=session.createConsumer(destination); // 创建消息消费者
            while(true){
                TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
                if(textMessage!=null){
                    System.out.println("收到的消息:"+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

2.使用listener被动监听消息(非阻塞的)(观察者模式)

Listener代码

package com.java1234.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        // TODO Auto-generated method stub
        try {
            System.out.println("收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}


package com.java1234.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * @author Administrator
 *
 */
public class JMSConsumer2 {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息的消费者
        
        // 实例化连接工厂
        connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
                
        try {
            connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
            connection.start(); // 启动连接
            session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
            messageConsumer=session.createConsumer(destination); // 创建消息消费者
            messageConsumer.setMessageListener(new Listener()); // 注册消息监听
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

如果想主动的去接受消息,而不用异步消息监听的话,把consumer.setMessageListener(this)改为Message message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。

参考文章:

http://blog.csdn.net/courage89/article/details/8809551

 

本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1685651

时间: 2024-11-03 07:25:43

ActiveMQ消息中间件Producer和Consumer的相关文章

activemq-利用ajax客户端就行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊?

问题描述 利用ajax客户端就行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊? 利用ajax客户端进行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊?具体语法怎么写啊? 解决方案 var amq = org.activemq.Amq; amq.init({ uri: 'amq.servlet', logging: true, timeout: 20, clientId:'topicClientA' }); var myHandler = { rcvM

Apache Kafka - KIP-42: Add Producer and Consumer Interceptors

kafka 0.10.0.0 released   Interceptors的概念应该来自flume 参考,http://blog.csdn.net/xiao_jun_0820/article/details/38111305 比如,flume提供的 Timestamp Interceptor Host Interceptor Static Interceptor Regex Filtering Interceptor Regex Extractor Interceptor 可以对于流过的mes

开源 VS 商业,消息中间件你不知道的那些事

11月23日,新炬网络中间件技术专家刘拓老师在DBA+社群中间件用户组进行了一次主题为"开源 VS 商业,消息中间件你不知道的那些事"的线上分享.小编特别整理出其中精华内容,供大家学习交流.  嘉宾简介   新炬网络中间件技术专家 曾任职于IBM华南GTS 4年,IBM WebSphere.MQ.CICS产品线技术专家 5年移动运营商(广东移动.浙江移动)运维经验,3年JAVA开发及售后经验 演讲实录   随着云计算的兴起,Docker.微服务的流行,分布式消息队列技术成为云计算平台中

JMS ActiveMQ研究文档

  1. 背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性:(1)同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行:(2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进程 都必须正常运行:如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常:(3)点对点通信:客户的一次调用只发送给某个单独的目标对象. 面向消息的中间件(Message Orien

关于ActiveMQ 消息吞吐量的如何优化?

问题描述 最近公司想对公司应用结构进行优化,对现在架构中很多耗费资源严重且耗时比较长的应用间大数据量同步通信的接口进行优化,讨论决定引入消息中间件,网上浏览了一圈最后锁定ActiveMQ,在网上看到很多关于ActiveMQ的测试分析文档,大部分测试结果 AMQ的每秒吞吐量大概都在2000以上甚至更多,但是我在公司服务器上面测试的数据并不是很理想,有点让人头疼,不能够达到峰值,甚至在多线程下速度极速下降,效率方面有点不能让人接受,想看看有没有更好的优化配置. 下面是我的Amq的配置: [size=

京东消息中间件演进之路:三次更迭,八大突破

京东商城基础平台团队:包括大规模容器集群调度.数据库与存储技术.消息系统与服务框架.架构与运维.机器学习与人工智能等技术方向.由京东商城首席架构师刘海锋担任部门负责人.基础平台运营多个数据中心数万台服务器,支撑京东无数在线业务(团队公众号ID:ipdchat).   导读:本文将简单介绍京东消息中间件的演进历程,以及作为消息中间件,在每一代产品中我们是如何解决MQ面临的一些通用问题,如:如何处理IO,消息如何存储,消息如何路由等等.   我们在开始之前要明确一些基本概念:   Broker:消息

消息队列入门(四)ActiveMQ的应用实例

部署和启动ActiveMQ 去官网下载:http://activemq.apache.org/ 我下载的是apache-activemq-5.12.0-bin.tar.gz, 解压到本地目录,进入到bin路径下, 运行activemq启动ActiveMQ. 运行方式:启动 ./activemq start ActiveMQ默认使用的TCP连接端口是61616, 5.0以上版本默认启动时,开启了内置的Jetty服务器,可以进入控制台查看管理. 启动ActiveMQ以后,登陆:http://loca

activeMQ到底是怎么个接受信息?

问题描述 小弟初学,网上看了好几个例子,都千遍一律,如消息发送出去,消息如果被接受,通过什么中转站什么的?要如何配置?求详细一个步骤,感激不尽 问题补充:消息消费者:public void consuMessage() throws Exception{ throws JMSException{ActiveMQConnectionFactory connFactory=new ActiveMQConnectionFactory("tcp://localhost:61616");conn

ActiveMQ消息队列

什么是MQ? 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术(如:WebService).排队指的是应用程序通过队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求. 为什么要用MQ? 1.调用异步化,提高服务器性能 在不使用消息队列的情况下,用户的请求数据直接写入数据库,