消息总线重构之EventBus

最近花了不少时间对消息总线进行了重构。重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景:

(1)改进广播通知

(2)业务逻辑串联,用事件驱动替代责任链模式

EventBus简介

EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读。总得来说,EventBus是观察者模型的实现,利用它你既可以实现观察者模型的业务场景,还可以基于它的事件驱动机制来实现应用程序内组件之间的解耦与通信。

改进广播通知

广播通知是消息总线提供的功能之一。在重构之前,客户端接收广播通知是通过消息总线客户端SDK的一个API来实现的:

public void setNotificationListener(IMessageReceiveListener notificationListener);

但之前的广播通知设计并不合理。它受限于之前的基于RabbitMQ的树形路由拓扑模型:

这个拓扑结构中有些只发送不接受的“虚拟队列”并不是真实存在的队列。这些消息生产者无法接收消息,这是非常大的一个缺陷。我一直在想办法重新设计它,之前的关注点都集中在RabbitMQ上,想在MQ上找到一种解决方案,但这很难,除非摈弃“虚拟队列”的设计。于是,我将关注点转移到消息总线中另一个可以提供pub/sub的组件上(后称之为pubsuber),该组件目前可以是redis也可以是zookeeper。因为每个client(更准确得说是每个创建client的pool)都会以长连接的方式挂在pubsuber上。所以,它本身就是一个很不错的广播渠道,并且因为它脱离RabbitMQ单独实现,跟虚拟队列的设计不相冲突。

上面的思路没有问题,但语义与实现上并不对等。通知的收发从语义上来说应该是Client
API级别的。而PubSuber接收到的广播事件却是Pool级别的,并不依赖client(Pool创建PubSuber以及Client)。我们不应该在Pool层面上接收广播事件。因此这里存在一个事件的截获与二次转发的过程。这是我们针对EventBus的第一个应用场景:用它转发PubSuber接收到的广播通知给client。

PubSuber接收到广播消息之后通过EventBus 作二次转发:

    public class NotifyHandler implements IPubSubListener {

        @Override
        public void onChange(String channel, byte[] data, Map<String, Object> params) {
            NotifyEvent notifyEvent = new NotifyEvent();
            Message broadcastMsg = pubsuberManager.deserialize(data, Message.class);
            if (broadcastMsg != null && broadcastMsg.getMessageType().equals(MessageType.BroadcastMessage)) {
                notifyEvent.setMsg(broadcastMsg);
                getComponentEventBus().post(notifyEvent);
            }
        }
    }

事件发布完了之后,EventBus会将其分发到该事件的订阅者处理,这里需要注意的是创建的EventBus是一个异步EventBus的实例,它在一个独立的线程上执行事件处理器方法。而所有的事件处理器都需要通过Client进行注册:

    public void registerEventProcessor(Object eventProcessor) {
        componentEventBus.register(eventProcessor);
    }

以上这一步,就将消息通知跟Client关联起来。而且对多个client注册不同的事件处理器,还可以起到多播的作用(原来在Pool级别是一个事件,现在在Client级别,多个Client可以应对若干个处理器)。

EventBus通过注解来解析事件处理器与事件之间的关联关系,更多的实现细节,请参考之前的文章。下面就是订阅广播通知的方式:

    public static class NotificationEventProcessor {

        @Subscribe
        public void onNotification(NotifyEvent event) {
            logger.info("onNotification");
            Message message = event.getMsg();
            assertNotNull(message);
            assertEquals("test", new String(message.getContent(), Constants.CHARSET_OF_UTF8));
        }

    }

仅仅需要一个注解即可。当然最后别忘记移除注册,如果你不再希望接收通知的话,整个过程如下:

    public void testBroadcast() throws Exception {
        String secret = "kljasdoifqoikjhhhqwhebasdfasdf";

        Message msg = MessageFactory.createMessage(MessageType.BroadcastMessage);
        msg.setContentType("text/plain");
        msg.setContentEncoding("utf-8");

        msg.setContent("test".getBytes(Constants.CHARSET_OF_UTF8));

        NotificationEventProcessor eventProcessor = new NotificationEventProcessor();
        client.registerEventProcessor(eventProcessor);

        client.broadcast(secret, new Message[]{msg});

        TimeUnit.SECONDS.sleep(10);

        client.unregisterEventProcessor(eventProcessor);
    }

这样,原先的拓扑结构就不再包含广播通知的实现了:

事件驱动替代责任链模式

客户端跟消息总线的一次通信,需要经历多个业务逻辑环节。这些业务逻辑有些有顺序关系,有些没有。我们希望将逻辑进行拆分、自由组合搭配并且能够互不干扰得扩展。在此之前的实现基于责任链模式,有一点问题:当长连接消费时,因为真正的消费通常是chain的最后一个调用(方式是:阻塞,一直等到超过设定的时间),所以整个递归链都阻在最后一个调用。而递归调用的实现是基于栈,因此如果最后一个调用不返回(很多时候这种长连接的生命周期跟应用的生命周期相同),整个调用链以及调用中的局部变量一直都不被释放,某种程度上这有点像内存泄露了。这个问题,我曾在之前的文章中探讨过,但一直没找到太好的解决方法,除非我们放弃使用责任链模式。

但基于同步事件驱动的方式似乎能起到跟责任链模式一样的效果。它通过事件分发来驱动业务逻辑调用。将chain的每一个调用都看做是一个事件处理方法,一个单向通信逻辑(比如produce)对应一个事件处理器(produceEventProcessor)。因为此处的EventBus是同步的(事件处理逻辑在调用线程上执行,执行顺序跟事件发生的顺序相同),所以只要编排好事件顺序,一一触发事件,事件处理器也就会一一按照事件触发的顺序执行。

我们以消息生产者来看一下通过EventBus改造后的业务逻辑是什么样子。

首先我们定义一个生产消息的事件处理器:

public class ProduceEventProcessor extends CommonEventProcessor {

}

为了使得逻辑关系紧凑,我们将事件以内部类的方式定义在生产消息的事件处理器内部:

//region events definition
    public static class ValidateEvent extends CarryEvent {
    }

    public static class PermissionCheckEvent extends CarryEvent {
    }

    public static class ProduceEvent extends CarryEvent {
    }
    //endregion

定义每个事件的事件处理方法:

@Subscribe
public void onValidate(ValidateEvent event) {
}

@Subscribe
public void onPermissionCheckEvent(PermissionCheckEvent event) {
}

@Subscribe
public void onProduce(ProduceEvent event) {

}

在client被调用以生产消息时,首先创建该事件处理器的实例,然后向EventBus注册事件处理器:

        EventBus carryEventBus = this.getContext().getCarryEventBus();

        //register event processor
        ProduceEventProcessor eventProcessor = new ProduceEventProcessor();
        carryEventBus.register(eventProcessor);

只有注册了该实例,在发布事件时,才会触发该实例的事件处理方法。注册完成该实例之后,需要初始化事件对象,这里事件之间以及事件处理器之间没有必然联系,我们以一个消息上下文对象的引用来让它们以共享“内存”的方式进行数据交换:

        //init events
        ProduceEventProcessor.ValidateEvent validateEvent = new ProduceEventProcessor.ValidateEvent();
        ProduceEventProcessor.MsgBodySizeCheckEvent msgBodySizeCheckEvent = new ProduceEventProcessor.MsgBodySizeCheckEvent();
        ProduceEventProcessor.PermissionCheckEvent permissionCheckEvent = new ProduceEventProcessor.PermissionCheckEvent();
        ProduceEventProcessor.MsgIdGenerateEvent msgIdGenerateEvent = new ProduceEventProcessor.MsgIdGenerateEvent();
        ProduceEventProcessor.MsgBodyCompressEvent msgBodyCompressEvent = new ProduceEventProcessor.MsgBodyCompressEvent();
        ProduceEventProcessor.ProduceEvent produceEvent = new ProduceEventProcessor.ProduceEvent();

        validateEvent.setMessageContext(ctx);
        msgBodySizeCheckEvent.setMessageContext(ctx);
        permissionCheckEvent.setMessageContext(ctx);
        msgIdGenerateEvent.setMessageContext(ctx);
        msgBodyCompressEvent.setMessageContext(ctx);
        produceEvent.setMessageContext(ctx);

准备工作就绪,现在开始发布事件。这里事件的发布顺序跟执行顺序是一致的,所以我们需要根据业务逻辑来编排事件,以形成原先的串联调用的效果:

//arrange event order and emit!
carryEventBus.post(validateEvent);
carryEventBus.post(msgBodySizeCheckEvent);
carryEventBus.post(permissionCheckEvent);
carryEventBus.post(msgIdGenerateEvent);
carryEventBus.post(msgBodyCompressEvent);
carryEventBus.post(produceEvent);

这就是重构的整个过程。我们发现这里不再存在链式(递归)调用了,各个事件处理器方法之间也没有耦合性,它们通过MessageContext来共享上下文。如果我们要增加新的业务逻辑,如何扩展?四步走:

(1)定义一个新事件对象

(2)定义一个新的事件处理器方法

(3)实例化该事件对象

(4)根据需要插入原先的编排过的事件中去并发布该事件

跟原先的事件没有任何关系。

更多实现,可以查看项目源码:banyan

原文发布时间为:2015-06-30

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-08-06 23:35:50

消息总线重构之EventBus的相关文章

消息总线重构之简化客户端

这段时间对消息总线进行了再次重构.本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法. 简化client的复杂度 之前的client需要同时连接两个分布式组件.消息总线的访问需要用户提供pubsuberHost,pubsuberPort参数,因此它首先连接的就是pubsuber.而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ.而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsub

消息总线之模型重构

前段时间重新对消息总线的通信模型进行设计&重构,这篇文章谈谈其中的一些想法. RabbitMQ简介 消息总线对RabbitMQ的官方java client进行了定制.简化.这里首先谈谈RabbitMQ作为一个Message Broker的两个核心概念: Exchange Exchange(交换器),其实你可以简单得将它看成是Router(路由器),路由就是它的主要职责.它支持非常灵活的 Exchange-Exchange.Exchange-Queue 之间的绑定,并提供了4种Exchange类型

消息总线优化之PubSub

近段时间,暂缓了消息总线feature的开发,花了部分时间对原先的pubsub机制进行了针对性的优化与重构.这里记录一下优化的过程以及相比原来的设计有哪些改观. PubSub在消息总线内部的作用 PubSub在消息总线内部主要用于对所有在线客户端进行实时管控的作用.每个客户端在使用消息总线时,都"被迫"注册到PubSub上,并"被迫"订阅了一些Channel,以便消息总线管控台实时下发一些管控指令及时生效. 之前的设计回顾 这里有必要回顾一下之前的设计.消息总线内部

消息总线扩展之主动转发

问题简述 消息总线目前为Java编程语言提供了SDK,同时针对其他语言提供了一个称之为httpBridge的http代理.这基本可以满足大部分主流编程语言对消息总线的使用需求,但这也仅仅是对技术层面上的需求的满足.在业务层面上,尤其是面对老的业务系统的适配一直都是个难题,这篇文章谈谈面对一个在线上运行的业务系统,如何使得引入消息总线的总体成本尽可能得低. 就消息总线的两种使用方式而言,无论是SDK的方式还是httpBridge的方式,都需要往第三方系统引入对消息总线的依赖,这些依赖包括但不仅限于

消息总线扩展之面向消息的数据集成

最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向.这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向. 相关技术简介 XML 谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了.其实,XML的用途远不止充当配置文件这一方面.它还被广泛应用于异构系统集成.数据集成.语义/协议转换等等方面,甚至成为构建平台非常重要的基石.虽然XML一直以来被人诟病其解析效率低下以及数据量太冗余

消息总线扩展之集成Thrift-RPC

本文主要探讨了消息总线支持Thrift RPC的实现过程.鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API.然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施. Thrift简介 Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目.Thrift通过一个中间语言(IDL, 接口定义语言)来定义

再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案.那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析. 核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占.对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法.比如有人喜欢简单.安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且

谈消息总线客户端的多线程实现

最近在实现一个基于RabbitMQ的消息总线.因为它提供了Client(客户端),这里就牵扯到凡是技术组件的client都无法回避的并发问题.本文借实现消息总线的client谈谈在实现过程中的想法以及最终的处理方式,当然这些都不仅仅适用于消息总线的client,其他通用组件的client也同样适用. 并发问题的分类 其实上面所提到的并发问题,从大的层面上可以划分为两类问题: 自身固有的并发问题:这个存在的前提条件是client自身内部使用了多线程技术,并且本身就存在线程安全的缺陷. 被动调用的并

谈消息总线的路由模型

最近在写一个基于RabbitMQ的消息总线.虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型.当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制. 预备知识 RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish:Channel#basicConsum