最佳实践:如何基于MNS实现一对多拉取消息消费模型

如何实现一对多拉取消息消费模型

问题背景:

阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型。其中队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull模式;主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push模式。上面两种模型基本能满足我们大多数应用场景。

 

推送模式的好处是即时性能比较好,但是需要暴露客户端地址来接收服务端的消息推送。有些情况下,比如企业内网,我们无法暴露推送地址,希望改用拉取(Pull)的方式。虽然MNS不直接提供这种消费模型,但是我们可以结合主题和队列来实现一对多的拉取消息消费模型。具体方案如下:

 

解决方案:

让主题将消息先推送到队列,然后由消费者从队列拉取消息。这样既可以做到1对多的广播消息,又不需要暴露消费者的地址;如下图所示:

接口说明:

在MNS最新的Java SDK (1.1.5)中的CloudPullTopic
默认支持上述解决方案。其中

MNSClient 提供下面两个接口来快速创建CloudPullTopic

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

其中,TopicMeta 是创建topic的meta 设置, queueNameList里指定topic消息推送的队列名列表;needCreateQueue表明queueNameList是否需要创建;queueMetaTemplate是创建queue需要的queue meta 参数设置;

Demo 代码

        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient();

        // build consumer name list.
        Vector<String> consumerNameList = new Vector<String>();
        String consumerName1 = "consumer001";
        String consumerName2 = "consumer002";
        String consumerName3 = "consumer003";
        consumerNameList.add(consumerName1);
        consumerNameList.add(consumerName2);
        consumerNameList.add(consumerName3);
        QueueMeta queueMetaTemplate = new QueueMeta();
        queueMetaTemplate.setPollingWaitSeconds(30);

        try{
            //producer code:
            // create pull topic which will send message to 3 queues for consumer.
            String topicName = "demo-topic-for-pull";
            TopicMeta topicMeta = new TopicMeta();
            topicMeta.setTopicName(topicName);
            CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

            //publish message and consume message.
            String messageBody = "broadcast message to all the consumers:hello the world.";
            // if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly.
            TopicMessage tMessage = new RawTopicMessage();
            tMessage.setBaseMessageBody(messageBody);
            pullTopic.publishMessage(tMessage);

            // consumer code:
            //3 consumers receive the message.
            CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
            CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
            CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

            Message consumer1Msg = queueForConsumer1.popMessage(30);
            if(consumer1Msg != null)
            {
                System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            Message consumer2Msg = queueForConsumer2.popMessage(30);
            if(consumer2Msg != null)
            {
                System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            Message consumer3Msg = queueForConsumer3.popMessage(30);
            if(consumer3Msg != null)
            {
                System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            // delete the fullTopic.
            pullTopic.delete();
        }catch(ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availablity.");
            ce.printStackTrace();
        }
        catch(ServiceException se)
        {
            /*you can get more MNS service error code in following link.
              https://help.aliyun.com/document_detail/mns/api_reference/error_code/error_code.html?spm=5176.docmns/api_reference/error_code/error_response
            */
            se.printStackTrace();
        }

        client.close();
时间: 2024-10-03 06:32:14

最佳实践:如何基于MNS实现一对多拉取消息消费模型的相关文章

SQLServer · 最佳实践 · 开发基于.NET CORE的LINUX版本的数据库应用

title: SQLServer · 最佳实践 · 开发基于.NET CORE的LINUX版本的数据库应用 author: 石沫 背景 最近有客户在基于.NET CORE的LINUX版本连接数据库的应用程序,在开发中,会遇到一些问题,客户会错误地将原因定位到我们的SQL SERVER,陆续收到一些工单,因此,我们需要有计划增强这个方面的能力,同事正确引导用户使用SQL SERVER. 部署环境 1. 服务器版本:ubuntu 14.04 2. .NET CORE 版本:1.0 3. 安装过程 3

Docker在云平台上的最佳实践:基于容器技术的DevOps探索

12月9日,在云栖计算之旅线下沙龙上,阿里云容器服务团队的高级研发工程师秦妤嘉分享了<基于容器技术的DevOps探索>.首先介绍了DevOps和CD,接着分析了Docker如何打破传统CD壁垒,最后讲解了怎样从零开始搭建一个持续交付系统. 视频回顾 DevOps与Continuous Delivery DevOps 在一个较成熟的软件和服务交付的团队里,就技术层面来说主要分为三个组成部分:开发.测试和运维.开发测试团队比较关注的是代码能否运行,而运维比较关注的是系统能否在上线后稳定运行,于是隔

《微信公众平台开发最佳实践》——3.3 接收事件推送消息

3.3 接收事件推送消息 在基础接口中,事件消息只有关注和取消关注事件.用户关注和取消关注公众账号的时候将分别触发这两个消息. 3.3.1 关注/取消关注 用户关注微信公众账号时的界面如图3-14所示,单击"关注"按钮,微信公众账号将收到关注事件.用户关注微信公众账号时的XML数据格式如下所示: <xml> <ToUserName><![CDATA[gh_b629c48b653e]]></ToUserName> <FromUserN

JSP 最佳实践:组合 JavaBean 组件和 JSP 技术

js JSP 最佳实践:组合 JavaBean 组件和 JSP 技术 使用 JavaBean 和 JSP 参数在 Web 页面之间传递数据级别:入门Brett McLaughlin(brett@oreilly.com)作家,O'Reilly and Associates2003 年 7 月 Web 架构设计师 Brett McLaughlin 演示了 JavaBean 组件和 JSP 技术的结合如何使您能够在 Web 页面之间存储并传递数据,以及这样做如何能实现更为动态的站点设计.到目前为止,我

Servlets和JSP Pages最佳实践

js|servlet Java Servlet技术与JSP技术使Java服务器端技术,目前他们控制了整个服务器端Java技术市场,并且逐渐成为构建商业Web应用的标准.Java开发者喜欢这些技术是由于很多的原因,包括:这些技术很容易学习,一次编写,处处运行(Write Once, Run Anywhere).更重要的是,如果更高效地采用了下面的实践,Servlet与JSP能够帮助分开Web的表示与内容."最佳实践"是被证明为开发高质量.可重用与易维护的基于Servlet和JSP的Web

JSP 最佳实践:用 jsp:include 控制动态内容

include|js|动态|控制 本文是 Java"知情人士"Brett McLaughlin 继第一篇 JSP 最佳实践文章后的后续文章,在文中,作者向您演示了如何扩展 JSP 技术中用于动态内容的包含功能.了解静态 include 伪指令和动态 jsp:include 元素之间的差异,搞清楚如何混合搭配这二者以获取最优性能.在新的 JSP 最佳实践系列的前一篇文章中,您了解了如何使用 JSP include 伪指令将诸如页眉.页脚和导航组件之类的静态内容包含到 Web 页面中.和服

JSP最佳实践: 组合JavaBean组件和JSP技术

使用 JavaBean 和 JSP 参数在 Web 页面之间传递数据 简介:Web 架构设计师 Brett McLaughlin 演示了 JavaBean 组件和 JSP 技术的结合如何使您能够在 Web 页面之间存储并传递数据,以及这样做如何能实现更为动态的站点设计. 到目前为止,我们在 JSP 最佳实践系列文章 中着重讨论的都是较为基本的主题.在前两篇文章中, 您学习了如何使用 JSP include 机制来将外部内容引入到您的网站或 Web 应用程序.我们使用了两种不 同的 include

数据产品经理最佳实践-数据战略规划

一.前言 到目前为止,取得这样的成果,我总结了一条经验:就是预先要把事情想清楚,把战略目的.步骤,尤其是出了问题如何应对,一步步一层层都想清楚:要有系统地想,这不是一个人或者董事长来想,而是有一个组织来考虑.当然,尽管不可能都想得和实际中完全一样,那么意外发生时要很快知道问题所在,情况就很好处理了.                                                                                                    

提高数据中心效率、可用性和容量的五项最佳实践方案

数据中心可以说是任何企业中最具活力和最为关键的操作之一.近年来,随着数据中心的密度和容量呈现稳步增长,其复杂性和安全风险也在不断增加,资源日趋紧张,进一步为其相关设备的性能带来了不良的影响.根据一项针对数据中心行业的停机中断研究结果显示,数据中心的任何类型的停机中断的平均成本为389,879欧元,而部分的停机中断的平均成本为199,103欧元.完全停机中断的成本超过524,464欧元.鉴于停机中断的成本如此之高,使得IT容量的可用性通常是评估数据中心的最重要的指标.然而,在今天,数据中心还必须同