谈消息总线客户端的多线程实现

最近在实现一个基于RabbitMQ的消息总线。因为它提供了Client(客户端),这里就牵扯到凡是技术组件的client都无法回避的并发问题。本文借实现消息总线的client谈谈在实现过程中的想法以及最终的处理方式,当然这些都不仅仅适用于消息总线的client,其他通用组件的client也同样适用。

并发问题的分类

其实上面所提到的并发问题,从大的层面上可以划分为两类问题:

  • 自身固有的并发问题:这个存在的前提条件是client自身内部使用了多线程技术,并且本身就存在线程安全的缺陷。
  • 被动调用的并发问题:指的是该client处于多线程调用环境下产生的线程安全问题。

如果你想单纯得看待怎样的并发问题算是自身固有的并发问题,那么你可以假设一个前提:如果你client处于单线程的被调用环境中,那么你client内部使用了多线程,并且存在线程安全问题,就可以看成是纯粹意义上的自身固有的并发问题。说得再直白一点,如果你内部没有采用多线程,那么这个client你可以认为它不存在自身固有的并发问题。但有时候外部调用的多线程环境也能触发你client内部产生并发问题,这种情况下的并发问题我们将其归类为被动调用的并发问题。

听起来可能有些抽象,我们通过一个实例让它更直观一点。我们看redis的客户端jedis。jedis内部对redis的数据结构命令的调用实现并没有采用多线程技术,因此我们可以认为它没有自身固有的并发问题,但一旦你在多线程的环境下共享其Jedis对象(主对象),那么各种各样莫名其妙的错误就出来了。我们可以认为这是被动调用的并发问题。所以要实现一个绝对线程安全的类是非常不容易的,可以说代价也非常大——因为你必须同时考虑这两种不同的并发问题。换句话说,当你的client内部外部都可能存在多线程环境,那么你必须同时考虑调用以及被调用的线程安全。

为了不让讨论的问题变得大而空,这篇文章我们将关注点放在被动调用的并发问题。

RabbitMQ 通信简介

简单介绍一下RabbitMQ跟通信相关的两个关键对象:Connection、Channel。要通信,肯定要先建立TCP连接,这个过程主要由Connection
负责。RabbitMQ支持从一个Connection创建多个Channel,Channel定义并实现了通信的各种API。因此Connection
主要负责链路的建立,而Channel主要负责通信逻辑。RabbitMQ这么设计是为了避免创建太多Connection(每个Connection都是跟RabbitMQ
Server的TCP连接)。而对Channel的设计就是网络中常用的“多路复用”技术。

共享实例

这是我最初的想法:构建完全线程安全的类,无论是怎样的被调用环境,都只使用一个单独的Messagebus
对象。这是对被调环境最友好的方式,但这种情况下你就必须以被调环境是多线程环境为基准来设计client的实现模型。

因此我之前的想法是在client内部建立一个Channel
的对象池。所有的单工(单向)通信,都直接从对象池中获取一个Channel对象,通信结束后再归还Channel到对象池。因为这里的Pool构建在Apache
Common Pool的基础上,所以对Channel对象的获取与归还是线程安全的,而Channel对象在RabbitMQ 官方Java
client中是被明确标注为线程安全的,因此在整个通信逻辑上没有线程安全问题。但完全共享实例意味着内部关键对象也是共享的,这里涉及到client内部的两个比较关键的对象:

  • Pubsuber:用于从一个pubsuberCenter获取实时配置变更数据
  • ConfigManager:用于解析pubsuberManager push过来的数据并及时调整客户端的控制逻辑

在共享实例的实现方式下,这两个对象提供的API必须是线程安全的。与此同时,所有client主对象的API都必须是同步的(最简单也是性能最差的API同步实现方式是将整个方法直接用synchronized标记,如果整个方法不实现为同步的,就必须在方法内部小心得处理同步问题)。

由共享实例向线程独占过渡

因为多线程问题最主要的根源就是数据共享的问题,因此共享实例的实现方式算是正中下怀,而且这是非常考验实现者并发能力的,本人作为并发新手,如果有办法敬而远之,那么是再好不过的了。因此我的思路逐渐从共享变量向独占过渡。

之前谈到一个Connection可以创建多个Channel。RabbitMQ
官方client在其doc上已经直接说明了:Channel是线程安全的,它直接支持在多线程环境下通信。

因此,我的思路很快就变成了像下面这样:

这种模式下,Connection被实现为单例模式,在一个JVM进程中(不管被调环境是否处于多线程状态)只会存在一个Connection对象。一个Client主对象(就是上图中的Messagebus对象)只关联一个独立的Channel并且约定一个Client主对象只能适用于一个独立的线程,不得跨线程使用。这样,整个client的通信逻辑中就完全不必关注并发问题。但这种模式下,Pubsuber以及ConfigManager仍然是单例的(也就是说是共享实例的模式),因此他们的API还必须实现为线程安全的。

这看起来是一种不错的方法,但当我已经开始动手实现之后,才发现一个问题:如果Messagebus对象在各个线程中是独占的,谁行驶Connection的关闭动作?如果它被某个线程上的Messagebus对象关闭了,对其他线程上正在工作的Messagebus将是一个灾难——它们将以一种极其不优雅的方式抛出异常。共享实例模式下,client主对象倒不存在这个问题,因为其client主对象的控制权完全归主线程所有。

完全线程独占的实现

client主对象以及所有的关联对象由线程独占(Connection对象在不同的线程中也是不同的实例)。这也是jedis的实现方式,其实在思考上面两种方式之前,我就了解了这种实现方式。但我一直在尝试是否还有其他的方式,因为这么做毕竟比较浪费资源——客户端创建多少个通信线程,就至少有多少套对象簇(包括这么多Connection对象)。但这也是最简单、通信性能最好的方式——因为它完全不需要处理被调线程安全问题,并且也不像上面一种思路中需要为谁掌管Connection的控制权而纠结。单个线程的内存模型如下:

也就是说,当你创建一个Messagebus
client对象,就会有上面虚线框内的关联对象簇被创建(它们跟client主对象有相同的生命周期)。

这种实现方式是如何解决被调多线程并发问题的?两步:

  • 基于约定:规定不得在多线程之间共享client主对象,否则后果自负
  • Client主对象以及依赖对象完全独占,并创建Client主对象的对象池

现在,在多线程环境下就不存在任何对象共享了:

你可以看一下它的使用示例

这种模式,client的所有实现代码都无需再为线程安全问题而做任何伤害性能的加锁操作而且对象的归属上也非常清晰。但毫无疑问,它也存在一些缺点:

  1. 它浪费了不少资源:不只是客户端的JVM的内存空间,还有RabbitMQ Server的连接资源
  2. 安全隐患:这种模式,如果服务端不做任何处理,客户端甚至可以通过不断创建Messagebus 对象,而发起DDos攻击

其实,归根到底一切都是权衡——看你关注什么又愿意舍弃什么。

总结

其实很多API都无法完全做到十全十美,本文的并发问题只是一个普遍现象,其他的问题还有乱用、错用的问题。比如上面说到的第二种模型,如果我们不谈消息总线,只采用RabbitMQ原生的java
client的话,多线程通信时你可以这样:在主线程上创建Connection对象,然后为每个线程分配独立的Channel对象,最终在主线程上关闭Connection对象。但Channel对象开放了获取Connection对象的API,因此也就给了每个线程对Connection的控制权。你只能说技术组件只能顾好自身,它不揣测任何使用场景以及使用意图。

原文发布时间为:2015-03-08

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-11-03 05:46:35

谈消息总线客户端的多线程实现的相关文章

再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案.那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析. 核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占.对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法.比如有人喜欢简单.安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且

谈消息总线的路由模型

最近在写一个基于RabbitMQ的消息总线.虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型.当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制. 预备知识 RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish:Channel#basicConsum

消息总线重构之简化客户端

这段时间对消息总线进行了再次重构.本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法. 简化client的复杂度 之前的client需要同时连接两个分布式组件.消息总线的访问需要用户提供pubsuberHost,pubsuberPort参数,因此它首先连接的就是pubsuber.而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ.而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsub

消息总线扩展之面向消息的数据集成

最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向.这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向. 相关技术简介 XML 谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了.其实,XML的用途远不止充当配置文件这一方面.它还被广泛应用于异构系统集成.数据集成.语义/协议转换等等方面,甚至成为构建平台非常重要的基石.虽然XML一直以来被人诟病其解析效率低下以及数据量太冗余

消息总线重构之EventBus

最近花了不少时间对消息总线进行了重构.重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景: (1)改进广播通知 (2)业务逻辑串联,用事件驱动替代责任链模式 EventBus简介 EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读.总得来说,EventBus是观察者模型的实现,利用它你既可以实现观察者模型的业务场景,还可以基于它的事件驱动机制来实现应用程序内组件之间的解耦与通信. 改进广播通知 广播通知是消息总线提

消息总线VS消息队列

前段时间实现了一个基于RabbitMQ的消息总线,实现的过程中自己也在不断得思考.总结以及修正.需要考虑各个维度:效率.性能.网络.吞吐量.甚至需要自己去设想API可能的使用场景.模式.不过能有一件事情,自己愿意去做,在走路.吃饭.坐公交的时候都在思考如何去改进它,然后在实践的过程中,促使去思考并挖掘自己知识面的空白,也是一件让人开心的事情. 借此记录下自己在实现的过程中,以及平时的一些想法. 这是第一篇,先谈谈消息总线跟消息队列的区别,以及对于企业级应用需要将消息队列封装成消息总线的必要性.

消息总线扩展之主动转发

问题简述 消息总线目前为Java编程语言提供了SDK,同时针对其他语言提供了一个称之为httpBridge的http代理.这基本可以满足大部分主流编程语言对消息总线的使用需求,但这也仅仅是对技术层面上的需求的满足.在业务层面上,尤其是面对老的业务系统的适配一直都是个难题,这篇文章谈谈面对一个在线上运行的业务系统,如何使得引入消息总线的总体成本尽可能得低. 就消息总线的两种使用方式而言,无论是SDK的方式还是httpBridge的方式,都需要往第三方系统引入对消息总线的依赖,这些依赖包括但不仅限于

消息总线优化之PubSub

近段时间,暂缓了消息总线feature的开发,花了部分时间对原先的pubsub机制进行了针对性的优化与重构.这里记录一下优化的过程以及相比原来的设计有哪些改观. PubSub在消息总线内部的作用 PubSub在消息总线内部主要用于对所有在线客户端进行实时管控的作用.每个客户端在使用消息总线时,都"被迫"注册到PubSub上,并"被迫"订阅了一些Channel,以便消息总线管控台实时下发一些管控指令及时生效. 之前的设计回顾 这里有必要回顾一下之前的设计.消息总线内部

消息总线扩展之集成Thrift-RPC

本文主要探讨了消息总线支持Thrift RPC的实现过程.鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API.然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施. Thrift简介 Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目.Thrift通过一个中间语言(IDL, 接口定义语言)来定义