消息总线优化之PubSub

近段时间,暂缓了消息总线feature的开发,花了部分时间对原先的pubsub机制进行了针对性的优化与重构。这里记录一下优化的过程以及相比原来的设计有哪些改观。

PubSub在消息总线内部的作用

PubSub在消息总线内部主要用于对所有在线客户端进行实时管控的作用。每个客户端在使用消息总线时,都“被迫”注册到PubSub上,并“被迫”订阅了一些Channel,以便消息总线管控台实时下发一些管控指令及时生效。

之前的设计回顾

这里有必要回顾一下之前的设计。消息总线内部的Pub/Sub的机制是通过第三方技术组件的实现(目前支持Zookeeper跟Redis),关于Pub/Sub这里首先普及几个概念,首先组件根据自身业务定义Channel,某个组件如果需要关注某Channel的变更就注册对某Channel的关注(subscribe),当有组件因为业务需要向Channel发送变更(publish),凡是subscribe该Channel的所有组件都会获取到变更。这里因为Zookeeper跟Redis都支持数据存储,所以这里的publish的内容其实既可以被Push给subscribe该Channel的所有组件,也可以使得其他组件根据Channel
pull下来。

其实之前的做法的关注点在“自动化”以及“扩展性”。为了所谓的扩展性,我们利用Java注解扫描的方式来使得整个Channel的定义“自动化”,这样就无需硬编码了。并且当后续业务扩展,新增一个Channel的时候,之前Channel的定义无需作任何改变。另外为了客户端首次获取(目前的推送机制zookeeper以及redis都支持KV数据存储)以及后续更新推送数据的对客户端的一致性,我们让一个Channel对应数据库的一张表,同时每个Channel都对应自己的数据自动获取方式。

当然Pub/Sub从服务端角度来看是数据的上行(从数据库提取数据,push到subscribe的客户端),从客户端角度来看是数据的下行。因此这里我们定义了一个IDataExchange接口,用来与Pub/Sub组件进行数据交换:

然后定义了一个@Exchanger注解,它包含两个属性:

  • table:表示对应的表;
  • path:也即channel,对应频道名称;

然后涉及到变更的表都会实现为一个独立的XXXExchanger。

为了让每个Channel的数据源是以一致的接口对外提供,这里统一定义了一个获取数据源的接口:IDataFetcher:

public interface IDataFetcher {

    public byte[] fetchData(IDataConverter converter);

}

该接口接收一个数据序列化器,然后将获取到的数据进行序列化并以byte[]作为统一的返回值,因为需要将数据存储到Pub/Sub组件里去(它们大都支持字节数组的API接口)。

整体的设计如下所示:

这样的设计对最初的关注点(自动化、扩展性、客户端首次获取数据以及后续获取变更数据导致代码处理上的一致性)而言,确实够了。但就性能而言,却非常低效。因为是一张表对应一个Channel,所以其实是全表推送,既然是全表推送,那么就无法鉴别客户端,无法鉴别客户端,就可能代码无效推送(跟某个客户端无关的关系数据,也会被推送过来),从而产生频繁推送,无效解析等一系列恶性循环。另外全表数据,相对来说是原始数据,还需要各个客户端做相应的解析,计算出合适的视图,用于内部控制以及权限校验等,并且所有的客户端在这一步执行的逻辑几乎是一样的。需要解析生成的视图如下:

    private Map<String, Node>   proconNodeMap;
    private Map<String, Node>   reqrespNodeMap;
    private Map<String, Node>   rpcReqRespNodeMap;
    private Map<String, Node>   pubsubNodeMap;
    private Map<String, Node>   idNodeMap;
    private Map<String, Node>   secretNodeMap;
    private Map<String, Config> clientConfigMap;
    private ExchangerManager    exchangeManager;
    private Map<String, Sink>   tokenSinkMap;
    private Map<String, String> pubsubChannelMap;
    private Node                notificationExchangeNode;

优化之后的设计

对于Pub/Sub重新设计之后采用——推拉结合的模式。不再推送数据,只推送变更通知以及变更的KEY(secret)。然后客户端按需拉取。

优化后的设计,带来如下一些优点:

减少客户端内存占用

之前Pub/Sub的设计是“首次拉取,变更全推”的做法。而且拉取的是全表数据,这对于客户端内存的占用是个极大的损耗。而优化之后,将只存储跟当前secret相关的数据视图。

服务端准备“数据视图”,减少客户端计算时间

优化之后针对客户端使用的数据专门定制了数据结构,在服务端按照键值对的形式计算出某个secret对应的客户端需要使用的视图数据并缓存在pub/sub组件的内存中。这个数据视图的数据结构如下:

这样,客户端在验证通信权限的时候,将会非常快。

减少远程访问通信开销

通信次数

减少通信次数的主要手段是本地缓存(local
cache),客户端获取数据的方式是:如果本地有,则从本地取,如果本地没有,则从远端获取获取完之后缓存在本地内存里。部分代码如下所示:

    public synchronized NodeView getNodeView(String secret) {
        if (Strings.isNullOrEmpty(secret)) {
            throw new NullPointerException("the secret can not be null or empty");
        }

        if (this.secretNodeViewMap.containsKey(secret)) {   //local cache
            return this.secretNodeViewMap.get(secret);
        } else {                                            //remote data then local cache
            NodeView nodeViewObj = this.pubsuberManager.get(secret, NodeView.class);
            this.secretNodeViewMap.put(secret, nodeViewObj);
            return nodeViewObj;
        }
    }

当然通信次数的减少,还得益于特地为客户端定制的“数据视图”,并且是按照每个队列的secret拆分成key/value的。管控台导致的数据变更将过渡为变更通知事件,然后再按序更新本地缓存。而不会像原来那样,推送数据变更,从而导致太多无效网络交互以及数据计算。

通信数据量

减少通信数据量的主要手段是只获取有效数据,比如当调用消息总线API的时候,每个API都要求传入一个secret来指示当前对应的队列节点,因此我们只需要从远程获取客户端需要的跟当前secret相关的“数据视图”。当然这里我们作了一个假设:大部分场景下,一个客户端在某个JVM进程内通常只使用一个secret。因为API被设计为某个使用者只需要知道自己队列对应的secret即可使用,因此这样的假设是合理的。当然也不排除某个应用涉及到多个队列的操作,这种情况最多多获取几个secret的数据视图。但基本的原则是:不取多余数据,按需取用。并且,推送也从原来的数据变成了现在的变更通知,该通知虽然是广播式的,但却是“自认领”的机制:

    public void onChannelDataChanged(String channel, Object obj) {
        logger.debug("=-=-=-=-=-=- received change from channel : " + channel + " =-=-=-=-=-=-");
        if (channel.equals(Constants.PUBSUB_NODEVIEW_CHANNEL)) {
            String secret = obj.toString();
            this.updateNodeView(secret);
        } else if (channel.equals(Constants.PUBSUB_SERVER_STATE_CHANNEL)) {
            String serverState = obj.toString();
            this.setServerState(serverState);
        } else if (channel.equals(Constants.PUBSUB_CONFIG_CHANNEL)) {
            this.updateConfig(obj.toString());
        } else if (channel.equals(Constants.PUBSUB_NOTIFICATION_EXCHANGE_CHANNEL)) {
            this.updateNotificationNode();
        }
    }

拉取更新:

    public synchronized void updateNodeView(String secret) {
        if (this.secretNodeViewMap.containsKey(secret)) {
            this.secretNodeViewMap.remove(secret);
            this.getNodeView(secret);
        }
    }

可以看到,只有在推送的secret在本地有缓存时,才会去远端拉取更新。否则,将直接丢弃该变更通知。

取舍

当然,这种完全定制化的机制,也彻底废弃了之前关注的自动化以及扩展性的特性。这是必要的,因为我们队消息总线的定位还是希望它具有更好的性能。

原文发布时间为:2015-05-04

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-09-20 01:13:47

消息总线优化之PubSub的相关文章

再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案.那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析. 核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占.对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法.比如有人喜欢简单.安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且

谈消息总线的路由模型

最近在写一个基于RabbitMQ的消息总线.虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型.当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制. 预备知识 RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish:Channel#basicConsum

消息总线重构之简化客户端

这段时间对消息总线进行了再次重构.本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法. 简化client的复杂度 之前的client需要同时连接两个分布式组件.消息总线的访问需要用户提供pubsuberHost,pubsuberPort参数,因此它首先连接的就是pubsuber.而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ.而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsub

WSE3.0构建Web服务安全(4) MTOM消息传输优化和文件上传、下载

MTOM消息优化传输机制主要应用于大量数据的传输,很多文章中也直接得出结论:使用MTOM文件传输效率高.为什么MTOM的数据传输效率会比别的方式要高?MTOM真的如此完美吗,它有什么不足?什么情况下使用MTOM?这些疑问,本文WSE3.0构建Web服务安全系列文章的第4节:MTOM消息优化传输机制和文件上传.下载--将为您一一解答.本节结构为1.MTOM基础概念2.WSE3.0工具配置MTOM3.代码实现与分析4.总结.最后附上实现代码供大家参考. WSE3.0中引入MTOM机制,给我们借助WS

消息总线扩展之主动转发

问题简述 消息总线目前为Java编程语言提供了SDK,同时针对其他语言提供了一个称之为httpBridge的http代理.这基本可以满足大部分主流编程语言对消息总线的使用需求,但这也仅仅是对技术层面上的需求的满足.在业务层面上,尤其是面对老的业务系统的适配一直都是个难题,这篇文章谈谈面对一个在线上运行的业务系统,如何使得引入消息总线的总体成本尽可能得低. 就消息总线的两种使用方式而言,无论是SDK的方式还是httpBridge的方式,都需要往第三方系统引入对消息总线的依赖,这些依赖包括但不仅限于

消息总线重构之EventBus

最近花了不少时间对消息总线进行了重构.重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景: (1)改进广播通知 (2)业务逻辑串联,用事件驱动替代责任链模式 EventBus简介 EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读.总得来说,EventBus是观察者模型的实现,利用它你既可以实现观察者模型的业务场景,还可以基于它的事件驱动机制来实现应用程序内组件之间的解耦与通信. 改进广播通知 广播通知是消息总线提

消息总线扩展之面向消息的数据集成

最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向.这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向. 相关技术简介 XML 谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了.其实,XML的用途远不止充当配置文件这一方面.它还被广泛应用于异构系统集成.数据集成.语义/协议转换等等方面,甚至成为构建平台非常重要的基石.虽然XML一直以来被人诟病其解析效率低下以及数据量太冗余

消息总线扩展之集成Thrift-RPC

本文主要探讨了消息总线支持Thrift RPC的实现过程.鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API.然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施. Thrift简介 Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目.Thrift通过一个中间语言(IDL, 接口定义语言)来定义

谈消息总线客户端的多线程实现

最近在实现一个基于RabbitMQ的消息总线.因为它提供了Client(客户端),这里就牵扯到凡是技术组件的client都无法回避的并发问题.本文借实现消息总线的client谈谈在实现过程中的想法以及最终的处理方式,当然这些都不仅仅适用于消息总线的client,其他通用组件的client也同样适用. 并发问题的分类 其实上面所提到的并发问题,从大的层面上可以划分为两类问题: 自身固有的并发问题:这个存在的前提条件是client自身内部使用了多线程技术,并且本身就存在线程安全的缺陷. 被动调用的并