ActiveMQ笔记(6):消息延时投递

在开发业务系统时,某些业务场景需要消息定时发送或延时发送(类似:飞信的短信定时发送需求),这时候就需要用到activemq的消息延时投递,详细的文档可参考官网说明,本文只介绍二种常用的用法:

注:本文采用spring的JmsTemplate来发送消息

步骤1、首先要修改activemq.xml配置文件,启用延时投递

1 <broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" >
2     ...
3   </broker>

即:在broker节点加上schedulerSupport="true",然后重启activemq即可

 

步骤2、定义一个MessagePostProcessor的实现类

import javax.jms.JMSException;
import javax.jms.Message;

import lombok.Data;
import org.apache.activemq.ScheduledMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jms.core.MessagePostProcessor;

/**
 * MQ延时投递处理器(注:ActiveMQ的配置文件中,要配置schedulerSupport="true",否则不起作用)
 * by: 杨俊明 2016-06-16
 */
@Data
public class ScheduleMessagePostProcessor implements MessagePostProcessor {

    private long delay = 0l;

    private String corn = null;

    public ScheduleMessagePostProcessor(long delay) {
        this.delay = delay;
    }

    public ScheduleMessagePostProcessor(String cron) {
        this.corn = cron;
    }

    public Message postProcessMessage(Message message) throws JMSException {
        if (delay > 0) {
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
        }
        if (!StringUtils.isEmpty(corn)) {
            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, corn);
        }
        return message;
    }

}

 

步骤3、jmsTemplate发送示例

        Object message1 = "corn消息内容:" + DateUtil.formatDate(new Date());
        //分 时 天 月 星期几
        jmsTemplate.convertAndSend(message1, new ScheduleMessagePostProcessor("40 22 * * *"));
        logger.info("消息1:[" + message1 + "] 延时发送成功!");

        jmsTemplate.convertAndSend(message1, new ScheduleMessagePostProcessor("50 22 * * *"));
        logger.info("消息1:[" + message1 + "] 延时发送成功!");

        Object message2 = "message:" + DateUtil.formatDate(new Date());
        jmsTemplate.convertAndSend(message2, new ScheduleMessagePostProcessor(30 * 1000));//延时30秒
        jmsTemplate.convertAndSend(message2, new ScheduleMessagePostProcessor(3600 * 24 * 1000));//延时24小时
        logger.info("消息2:[" + message2 + "] 延时发送成功!");

上面的代码演示了二种延时的用法:延时N毫秒、按corn表达式延时(注:此corn表达式并非Quartz框架中的corn表达式,而是linux中corntab中的表达 式,基本顺序是"分(0-59) 时(0-23) 日(1-31) 月(1-12) 星期几(1-7) ")

 

发送成功后,可以登录activemq的webconsole查看消息的属性:

在scheduled面板中,可以看到延时的消息

注:在开启消息持久化存储的前提下,就算把相应的queue在webconsole面板中删除(即删除队列),只要投递的时间尚未到,该消息也不会删除,仍然能正常延时投递。

此外,在queues面板中,如何查看某条具体的消息,也可以通过属性发现这条消息是延时消息,参考下图:

 

参考文章:
1、Delay and Schedule Message Delivery

2、喂鸡百科上的Corn表达式解释 (中文)

3、喂鸡百科上的Corn表达式解释 (英文)

4、kahaDB官方文档

 

时间: 2024-10-27 05:15:39

ActiveMQ笔记(6):消息延时投递的相关文章

ActiveMQ笔记(7):如何清理无效的延时消息?

ActiveMQ的延时消息是一个让人又爱又恨的功能,具体使用可参考上篇ActiveMQ笔记(6):消息延时投递,在很多需要消息延时投递的业务场景十分有用,但是也有一个缺陷,在一些大访问量的场景,如果瞬间向MQ发送海量的延时消息,超过MQ的调度能力,就会造成很多消息到了该投递的时刻,却没有投递出去,形成积压,一直停留在ActiveMQ web控制台的Scheduled面板中. 下面的代码演示了,如何清理activemq中的延时消息(包括:全部清空及清空指定时间段的延时消息),这也是目前唯一可行的办

activemq-利用ajax客户端就行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊?

问题描述 利用ajax客户端就行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊? 利用ajax客户端进行activeMQ消息中间件的消息收发,如何设置topic的持久订阅啊?具体语法怎么写啊? 解决方案 var amq = org.activemq.Amq; amq.init({ uri: 'amq.servlet', logging: true, timeout: 20, clientId:'topicClientA' }); var myHandler = { rcvM

测试-关于activemq 队列持久化消息传送速度的问题?

问题描述 关于activemq 队列持久化消息传送速度的问题? 我使用两种方法测试activemq在队列模式下使用kahadb持久化消息传送速度,结果上来看两种方法测出来的速度相差很大,一个一万多条每秒,一个五千左右每秒. 两种测试方法的测试机器是一样的,测试用例相同,activemq的配置也是一样的,谁能给分析下为什么速度差很多??? 做服务器机器叫server,方法一:先脚本启动10个消费者,当20秒没有收到消息就退出,再脚本启动10个生产者,几下开始时间T1秒,每个生产者发送10万条消息,

ActiveMQ笔记(4):搭建Broker集群(cluster)

上一篇介绍了基于Networks of Borkers的2节点HA方案,这一篇继续来折腾Networks of Brokers,当应用规模日渐增长时,2节点的broker可能仍然抗不住访问压力,这时候就需要多加一些broker,弄一个更大规模的Broker集群,但是怎么合理设置broker之间的网络桥接,却是有讲究的,先来看一种不太好的设计:   这个架构看上去没瑕疵,没毛病,3个broker之间两两互通,整体可用性极高,但是从消息的路由角度来看,却不是一个好的设计,当producer向brok

服务器设计笔记(3)-----消息队列

    摘抄的一篇文章,故拿出来记录下,下篇博客把解决代码分享出来.感谢这篇文章的原作者,解决了棘手的问题.       我们所能想到的最简单的消息队列可能就是使用stl的list来实现了,即消息队列内部维护一个list和一个互斥锁,putMessage时将message加入到队列尾,getMessage时从队列头取一个message返回,同时在getMessage和putMessage之前都要求先获取锁资源. 实现虽然简单,但功能是绝对满足需求的,只是性能上可能稍稍有些不尽如人意.其最大的问题

ActiveMQ笔记(2):基于ZooKeeper的HA方案

activemq官网给出了3种master/slave的HA方案,详见:http://activemq.apache.org/masterslave.html,基于共享文件目录,db,zookeeper. 下面演示了如何在本机搭建基于zookeeper的activemq集群: 一.在目录activemq1下安装activemq(可参考上篇内容),然后修改conf/activemq.xml 1 <broker xmlns="http://activemq.apache.org/schema/

翻译:AKKA笔记 - Actor消息 -1(一)

从第一篇Akka笔记的介绍中,我们是从很高的高度去观察Akka工具箱中的Actors.在这篇笔记的第二篇,我们会看一下Actors中的消息部分.而且延续上一次的例子,我们还会使用同样的学生与老师的例子. 在Actor消息的第一部分,我们会建立一个Teacher Actor,而且会使用一个叫StudentSimulatorApp的主程序. 回顾学生-老师模式的细节 现在考虑下StudentSimulatorApp单独发消息给TeacherActor.当我说到StudentSimulatorApp,

Android开发笔记之:消息循环与Looper的详解_Android

Understanding LooperLooper是用于给一个线程添加一个消息队列(MessageQueue),并且循环等待,当有消息时会唤起线程来处理消息的一个工具,直到线程结束为止.通常情况下不会用到Looper,因为对于Activity,Service等系统组件,Frameworks已经为我们初始化好了线程(俗称的UI线程或主线程),在其内含有一个Looper,和由Looper创建的消息队列,所以主线程会一直运行,处理用户事件,直到某些事件(BACK)退出.如果,我们需要新建一个线程,并

服务器设计笔记(1)-----消息的封装

    消息的封装方式有多中,比如xml,json等,但是我依然觉得使用拼数据的方式最简单,也最节省带宽.比如我们处理一个逻辑就可以这样处理了:     int cast_net(MessageBlock &mb)     {         int  area_id,lvl;         mv >> area >> lvl;         //逻辑处理         //....         MessageBlock re_mb;         re_mb