消息队列入门(四)ActiveMQ的应用实例

部署和启动ActiveMQ

去官网下载:http://activemq.apache.org/

我下载的是apache-activemq-5.12.0-bin.tar.gz,

解压到本地目录,进入到bin路径下,
运行activemq启动ActiveMQ。

运行方式:
启动 ./activemq start

ActiveMQ默认使用的TCP连接端口是61616,
5.0以上版本默认启动时,开启了内置的Jetty服务器,可以进入控制台查看管理。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/

默认用户名admin/admin

这里我在虚拟机里启动,访问地址:
http://192.168.106.128:8161/admin/

ActiveMQ的控制台功能十分强大,管理起来也很直观。

使用Java连接

创建POM文件

在Eclipse中新建Java工程,这里使用Maven管理依赖,
下面是pom.xml:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>activemq-sample</groupId>

<artifactId>activemq-sample</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>activemq-sample</name>

<description>an activemq practice</description>

<build>

<sourceDirectory>src</sourceDirectory>

<plugins>

 

<plugin>

<artifactId>maven-compiler-plugin</artifactId>

<version>3.1</version>

<configuration>

<source>1.7</source>

<target>1.7</target>

</configuration>

</plugin>

<!-- activemq-core 5.7.0 使用bunble打包,需要添加相关插件 -->

<plugin>

<groupId>org.apache.felix</groupId>

<artifactId>maven-bundle-plugin</artifactId>

<extensions>true</extensions>

</plugin>

 

</plugins>

</build>

<dependencies>

<!-- activemq的maven依赖 -->

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-core</artifactId>

<version>5.7.0</version>

<type>bundle</type>

</dependency>

 

</dependencies>

 

</project>

  

在第一次添加activemq的maven依赖时报错,后来发现activemq-core 5.7.0采用了bundle的打包方式,

必须在pom中配置maven-bundle-plugin。

创建消息创建者 MsgProducer:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

/**

* @Description: Message Producer

* @author: Bing Yue

*/

public class MsgProducer {

//如果你在本地启动,可以直接使用空的ActiveMQConnectionFactory构造函数

private static final String BROKER_URL="failover://tcp://192.168.106.128:61616";

 

public static void main(String[] args) throws JMSException, InterruptedException{

//创建连接工厂

ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);

//获得连接

Connection conn = connectionFactory.createConnection();

//start

conn.start();

 

//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

//创建队列

Destination dest = session.createQueue("test-queue");

//创建消息生产者

MessageProducer producer = session.createProducer(dest);

 

for (int i=0;i<100;i++) {

//初始化一个mq消息

TextMessage message = session.createTextMessage("这是第 " + i+" 条消息!");

//发送消息

producer.send(message);

System.out.println("send message:消息"+i);

//暂停3秒

Thread.sleep(3000);

}

 

//关闭mq连接

conn.close();

}

 

}

  

创建消息接收者 MsgProducer:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

/**

*

* @Description: Message Consumer

* @author: Bing Yue

*/

public class MsgConsumer implements MessageListener {

 

private static final String BROKER_URL="failover://tcp://192.168.106.128:61616";

 

public static void main(String[] args) throws JMSException{

 

//创建连接工厂

ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);

//获得连接

Connection conn = connectionFactory.createConnection();

//start

conn.start();

 

//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//创建队列

Destination dest = session.createQueue("test-queue");

//创建消息生产者

MessageConsumer consumer = session.createConsumer(dest);

 

//初始化MessageListener

MsgConsumer msgConsumer = new MsgConsumer();

 

//给消费者设定监听对象

consumer.setMessageListener(msgConsumer);

 

}

 

 

/**

* 消费者需要实现MessageListener接口

* 接口有一个onMessage(Message message)需要在此方法中做消息的处理

*/

@Override

public void onMessage(Message msg) {

TextMessage txtMessage = (TextMessage)msg;

try {

System.out.println("get message:" + txtMessage.getText());

catch (JMSException e) {

e.printStackTrace();

}

 

}

}

  

运行MsgProducer,

登录后台查看test-queue队列,可以看到发出的消息正在等待被处理:

 

运行MsgConsumer,接收消息并在控制台打印:

通过这个实例可以对ActiveMQ的应用有一个简单的了解。

代码地址:https://github.com/bingyue/activemq-sample

在实际开发中,通常还需要设置优先级处理,大部分情况下,消息的发送和接收方都会启用多线程,
通过线程池来提高处理效率,解耦的同时保持业务处理能力。


时间: 2024-12-22 11:06:49

消息队列入门(四)ActiveMQ的应用实例的相关文章

消息队列入门(二)消息队列的规范和开源实现

1.AMQP规范 AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.AMQP不是一个具体的消息队列实现,而 是一个标准化的消息中间件协议.目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口. 目前主流的ActiveMQ和RabbitMQ都支持AMQP协议. AMQP相关的角色和职责 Producer 消息生产者 一个给exchange发送消息的程序,发送方式大致是:它首先创建一个空消息,然后填上内容.路由KEY,最后发

RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)

版权声明:本文为博主原创文章,转载注明出处http://blog.csdn.net/u013142781 目录(?)[+] 一.消息队列使用场景或者其好处 消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量. 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的.消息队列在处理过程中间插入了一个隐含的.基于数据的接口层,两边的处理过程都要实现这一接口.这允许你独立的扩展或修改两边的处理过

消息队列入门(一)关于消息队列

1.什么是消息队列 消息是指在两个独立的系统间传递的数据,这两个系统可以是两台计算机,也可以是两个进程. 消息可以非常简单,可以是简单的字符串,也可以是保存了数据持久化的各种类型的文档集合. 队列是在消息的传输过程中的通道,是保存消息的容器,根据不同的情形,可以有先进先出,优先级队列等区别 . 2.为什么使用消息队列 个人觉得消息队列主要的意义是解耦和异步处理,以及在高并发场景下平滑短时间内大量的服务请求. 消息队列不仅被用于系统内部组件之间的通信,同时也被用于系统跟其它服务之间的交互. 消息队

RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)

 <=== RabbitMQ消息队列(三):任务分发机制            上篇文章中,我们把每个Message都是deliver到某个Consumer.在这篇文章中,我们将会将同一个Message deliver到多个Consumer中.这个模式也被成为 "publish / subscribe".     这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer). 我们将构建两个Consum

消息队列入门(三)JMS标准及实现

消息中间件 消息中间件即Message-oriented middleware(MOM),消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成. 通过提供消息传递和消息排队模型,消息中间件可以在分布式环境下扩展进程间的通信. 消息中间件可以即支持同步方式,又支持异步方式. 异步中间件比同步中间件具有更强的容错性,在系统故障时可以保证消息的正常传输.异步中间件技术又分为两类:广播方式和发布/订阅方式. 消息中间件应用主要有两个优点:异步和解耦. JMS规

C#:消息队列应用程序

程序 摘要:本文概述一种用于处理若干消息队列的 Windows 服务解决方案,重点介绍 .NET 框架和 C# 应用程序. 下载 CSharpMessageService.exe 示例文件 (86 KB) 目录简介.NET 框架应用程序 应用程序结构 服务类 检测设备 安装总结参考资料 简介Microsoft 近期推出了一种用于生成集成应用程序的新平台--Microsoft .NET 框架..NET 框架允许开发人员使用任何编程语言迅速生成和部署 Web 服务和应用程序.Microsoft In

UNIX环境高级编程:system V消息队列

unix早期通信机制中的信号能够传送的信息量有限,管道则只能传送无格式字节流,这远远是不够的. 消息队列(也叫报文队列)客服了这些缺点: 消息队列就是一个消息的链表. 可以把消息看作一个记录,具有特定的格式. 进程可以按照一定的规则向消息队列中添加新消息:另一些进程可以从消息队列中读走消息. 消息队列是随内核持续的,只有内核重启或人工删除时,该消息队列才会被删除. system V消息队列使用消息队列标识符标识.具有足够特权的任何进程都可以往一个给定队列放置一个消息,具有足够特权的任何进程都可以

C#消息队列应用程序

简介 Microsoft 近期推出了一种用于生成集成应用程序的新平台--Microsoft .NET 框架..NET 框架允许开发人员使用任何编程语言迅速生成和部署 Web 服务和应用程序.Microsoft Intermediate Language (MSIL) 和实时 (JIT) 编译器使这种不依赖语言的框架得以实现. 与 .NET 框架同时面世的还有一种新的编程语言 C#(读作"C sharp").C# 是一种简单.新颖.面向对象和类型安全的编程语言.利用 .NET 框架和 C

Posix消息队列

消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反.Posix消息队列与System V消息队列的区别如下: 1. 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息. 2. 当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制. Posix消息队列操作函数如下: #incl