基于Netty与RabbitMQ的消息服务

Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架。

前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下:

将业务代码和实际协议解析部分的代码抽离,得到以上一个简单的设计图,代码开源在GitHub上,简单介绍下NettyMQServer采集服务涉及到的几个关键技术点:

1、设备TCP消息解析:

NettyMQServer和采集设备Device之间采用TCP通信,TCP消息的解析可以使用LengthFieldBasedFrameDecoder(消息头和消息体),可以有效的解决TCP消息“粘包”问题。

消息包解析图如下:bytes length field at offset 0, do not strip header, the length field represents the length of the whole message

 lengthFieldOffset   =  0
 lengthFieldLength   =  2
 lengthAdjustment    = -2 (= the length of the Length field)
 initialBytesToStrip =  0

 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 +--------+----------------+      +--------+----------------+
 | Length | Actual Content |----->| Length | Actual Content |
 | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 +--------+----------------+      +--------+----------------+

代码中消息长度的存储采用了4个字节,采用LengthFieldBasedFrameDecoder(65535,0,4,-4,0)解码,Netty会从接收的数据中头4个字节中得到消息的长度,进而得到一个TCP消息包。

2、给设备发消息:

首先在连接创建时,要保留TCP的连接:

static final ChannelGroup channels = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // A closed channel will be removed from ChannelGroup automatically
        channels.add(ctx.channel());
    }

在每次一个Channel Active(连接创建)的时候用ChannelGroup保存这个Channel连接,当需要给某个设备发消息的时候,可以遍历该ChannelGroup,找到对应的Channel,给该Channel发送消息:

for (io.netty.channel.Channel c : EchoServerHandler.channels) {
    ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
    c.writeAndFlush(msg);
}

这里是给所有的连接的设备都发。当连接断开的时候,ChannelGroup会自动remove掉这个连接,不需要我们手动管理。

3、心跳检测

当某个设备Device由于断电或是其他原因导致设备不正常无法采集数据,Netty服务端需要知道该设备是否在正常工作,可以使用Netty的IdleStateHandler,示例代码如下:

// 3 minutes for read idle
ch.pipeline().addLast(new IdleStateHandler(3*60,0,0));
ch.pipeline().addLast(new HeartBeatHandler());

/**
 * Handler implementation for heart beating.
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                // Read timeout
                System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress());
                //ctx.disconnect(); //Channel disconnect
            }
        }
    }
}

上面设置3分钟没有读到数据,则触发一个READER_IDLE事件。

4、RabbitMQ消息接收与发送

NettyMQServer消息发送采用了Spring AMQP,只需要在配置文件中简单配置一下,就可以方便使用。

NettyMQServer消息接收同样可以采用Spring AMQP,但由于对Spring相关的配置不是很熟悉,为了更灵活的使用MQ,这里使用了RabbitMQ Client Java API来实现:

                    Connection connection = connnectionFactory.newConnection();
                    Channel channel = connection.createChannel();
                    channel.exchangeDeclare(exchangeName, "direct", true, false, null);
                    channel.queueDeclare(queueName, true, false, false, null);
                    channel.queueBind(queueName, exchangeName, routeKey);

                    // process the message one by one
                    channel.basicQos(1);

                    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                    // auto-ack is false
                    channel.basicConsume(queueName, false, queueingConsumer);
                    while (true) {
                        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                        String message = new String(delivery.getBody());

                        log.debug("Mq Receiver get message");
                        // Send the message to all connected clients
                        // If you want to send to a specified client, just add
                        // your own logic and ack manually
                        // Be aware that ChannelGroup is thread safe
                        log.info(String.format("Conneted client number: %d",EchoServerHandler.channels.size()));
                        for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
                            c.writeAndFlush(msg);
                        }
                        // manually ack to MQ server the message is consumed.
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }

以上代码从一个Queue中读取数据,为了有效处理数据,防止异常数据丢失,使用了手动Ack。

RabbitMQ的使用方式:http://www.cnblogs.com/luxiaoxun/p/3918054.html

 

代码托管在GitHub上:https://github.com/luxiaoxun/NettyMqServer

 

参考:

http://netty.io/

http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html

 

作者:阿凡卢

出处:http://www.cnblogs.com/luxiaoxun/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

时间: 2024-10-21 16:24:54

基于Netty与RabbitMQ的消息服务的相关文章

使命必达--阿里云商用消息服务MNS初探

在2015杭州云栖大会上,阿里云飞天事业部资深总监李津发布了一款海量消息,使命必达的消息服务产品(http://www.aliyun.com/product/mns).该产品能够提供高效,可靠,安全,便捷,弹性扩展的消息服务:能够帮助我们轻松的构建松耦合,高并发的分布式系统:能够方便我们做跨域数据安全传输.目前,消息服务也是阿里云唯一商用消息产品,其服务稳定性和可靠性都有SLA保障.下面让我一起来详细了解一下这款产品.   架构优势带来海量,高可靠,高可用特性 在了解消息服务前,不得不提的是阿里

阿里移动|《使命必达——钉钉企业级消息服务的机遇与挑战》

摘要:对于移动技术而言,2017年是继往开来之年.一方面是移动技术领域进入深水区,另一方面移动技术边界和内涵被不断重塑.作为专为中国企业打造的免费沟通和协同的多端平台,钉钉在发展和功能的不断完善过程中沉淀了很多实战经验,2017年杭州云栖大会上,钉钉技术专家格夫结合钉钉发展历程为大家分享了企业沟通场景的消息服务挑战以及系统推送链路的优化经验. 演讲嘉宾简介:格夫,钉钉技术专家.入职阿里巴巴之前在百度云服务平台部门,2014年加入钉钉创业团队,参与钉钉消息系统设计开发,主导了IM系统性能和稳定性提

最佳实践:如何基于消息服务MNS实现严格有序队列

问题背景:阿里云消息服务提供的队列(queue)主要特点是高可靠.高可用.高并发.每个队列的数据都会被持久化三份到阿里云的飞天分布式平台:其中每个队列至少有2台服务器向外提供服务:同时每台服务器都支持高并发访问.这些分布式特性,也导致了消息服务队列无法像传统单机队列那样保证严格的消息FIFO特点,只能做到基本有序. 我们的队列如果同时有多个消息发送者(sender),由于并发和网络延迟不一等问题,消息的严格顺序本身就是失去了意义,因为在这种情况下,我们根本无法获知消息在多个sender上的实际发

基于netty-socketio的web推送服务

实时消息的推送,PC端的推送技术可以使用socket建立一个长连接来实现.传统的web服务都是客户端发出请求,服务端给出响应.但是现在直观的要求是允许特定时间内在没有客户端发起请求的情况下服务端主动推送消息到客户端. 有哪些可以实现web消息推送的技术: 不断地轮询(俗称"拉",polling)是获取实时消息的一个手段:Ajax 隔一段时间(通常使用 JavaScript 的 setTimeout 函数)就去服务器查询是否有改变,从而进行增量式的更新.但是间隔多长时间去查询成了问题,因

Flink运行时之基于Netty的网络通信中

PartitionRequestClient 分区请求客户端(PartitionRequestClient)用于发起远程PartitionRequest请求,它也是RemoteChannel跟Netty通信层之间进行衔接的对象. 对单一的TaskManager而言只存在一个NettyClient实例.但处于同一TaskManager中不同的任务实例可能会跟不同的远程TaskManager上的任务之间交换数据,不同的TaskManager实例会有不同的ConnectionID(用于标识不同的IP地

RabbitMQ之消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢--消息持久化. 为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化. queue的持久化 queue的持久化是通过durable=true来实现的. 一般程序中这么使用: Connection connection = connectionFactory.newConnection(); Channel channel = conn

Apache Artemis —— 非堵塞 Java 嵌入消息服务

Apache ActiveMQ Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器.其核心只依赖一个 netty.jar 文件.该项目的目的是为你的 Java 应用提供一个可嵌入的消息服务. 架构图: 特性: 支持 AMQP 协议 OpenWire 支持 5 个 ActiveMQ 客户端 STOMP 协议支持 HornetQ Core 协议支持 HornetQ 2.4,2.5 客户端 JMS 2.0 和 1.1 支持 通过共享存储和基于复制的非共享存储实现的高可用性

Flink运行时之基于Netty的网络通信上

概述 本文以及接下来的几篇文章将介绍Flink运行时TaskManager间进行数据交换的核心部分--基于Netty通信框架远程请求ResultSubpartition.作为系列文章的第一篇,先列出一些需要了解的基础对象. NettyConnectionManager Netty连接管理器(NettyConnectionManager)是连接管理器接口(ConnectionManager)针对基于Netty的远程连接管理的实现者.它是TaskManager中负责网络通信的网络环境对象(Netwo

干货--JMS(java消息服务)整合Spring项目案例

Sprng-jms消息服务小项目 所需的包: spring的基础包 spring-jms-xx包 spring-message–xx包 commons-collection-xx包 commons-pool2-xx包 aop切面的包: spring-aop,spring-aspect,aopalliance,aspectjrt.jar,aspectjweaver.jar 配置: 1.配置ConnectionFactory 2.配置jmsTemplate; 3.配置Destination 4.配置