Disruptor(无锁并发框架)-发布

原文:http://blog.codeaholics.org/2011/the-disruptor-lock-free-publishing/

译者:罗立树

假如你生活在另外一个星球,我们最近开源了一套高性能的基于消息传递的开源框架。

下面我给大家介绍一下如何将消息通过Ring buffer在无锁的情况下进行处理。

在深入介绍之前,可以先快速阅读一下Trish发表的文章,该文章介绍了ring buffer和其工作原理。

这篇文章的要点如下:

1.ring buffer是由一个大数组组成的。

2.所有ring buffer的“指针”(也称为序列或游标)是java long类型的(64位有符号数),指针采用往上计数自增的方式。(不用担心越界,即使每秒1,000,000条消息,也要消耗300,000年才可以用完)。

3.对ring buffer中的指针进行按ring buffer的size取模找出数组的下标来定位入口(类似于HashMap的entry)。为了提高性能,我们通常将ring buffer的size大小设置成实际使用的2倍。

这样我们可以通过位运算(bit-mask )的方式计算出数组的下标。

Ring buffer的基础结构

注意:和代码中的实际实现,我这里描述的内容是进行了简化和抽象的。从概念上讲,我认为更加方面理解。

ring buffer维护两个指针,“next”和“cursor”。

在上面的图示里,是一个size为7的ring buffer(你应该知道这个手工绘制的图示的原理),从0-2的坐标位置是填充了数据的。

“next”指针指向第一个未填充数据的区块。“cursor”指针指向最后一个填充了数据的区块。在一个空闲的ring bufer中,它们是彼此紧邻的,如上图所示。

填充数据(Claiming a slot,获取区块)

Disruptor API 提供了事务操作的支持。当从ring buffer获取到区块,先是往区块中写入数据,然后再进行提交的操作。

假设有一个线程负责将字母“D”写进ring buffer中。将会从ring buffer中获取一个区块(slot),这个操作是一个基于CAS的“get-and-increment”操作,将“next”指针进行自增。这样,当前线程(我们可以叫做线程D)进行了get-and-increment操作后,

指向了位置4,然后返回3。这样,线程D就获得了位置3的操作权限。

接着,另一个线程E做类似以上的操作。

提交写入

以上,线程D和线程E都可以同时线程安全的往各自负责的区块(或位置,slots)写入数据。但是,我们可以讨论一下线程E先完成任务的场景…

线程E尝试提交写入数据。在一个繁忙的循环中有若干的CAS提交操作。线程E持有位置4,它将会做一个CAS的waiting操作,直到  “cursor”变成3,然后将“cursor”变成4。

再次强调,这是一个原子性的操作。因此,现在的ring buffer中,“cursor”现在是2,线程E将会进入长期等待并重试操作,直到 “cursor”变成3。

然后,线程D开始提交。线程E用CAS操作将“cursor”设置为3(线程E持有的区块位置)当且仅当“cursor”位置是2.“cursor”当前是2,所以CAS操作成功和提交也成功了。

这时候,“cursor”已经更新成3,然后所有和3相关的数据变成可读。

这是一个关键点。知道ring buffer填充了多少 – 即写了多少数据,那一个序列数写入最高等等,是游标的一些简单的功能。“next”指针是为了保证写入的事务特性。

最后的疑惑是线程E的写入可见,线程E一直重试,尝试将“cursor”从3更新成4,经过线程D操作后已经更新成3,那么下一次重试就可以成功了。

总结

写入数据可见的先后顺序是由线程所抢占的位置的先后顺序决定的,而不是由它的提交先后决定的。但你可以想象这些线程从网络层中获取消息,这是和消息按照时间到达的先后顺序是没什么不同的,而两个线程竞争获取一个不同循序的位置。

因此,这是一个简单而优雅的算法,写操作是原子的,事务性和无锁,即使有多个写入线程。 

时间: 2024-12-22 02:53:38

Disruptor(无锁并发框架)-发布的相关文章

无锁并发框架Disruptor

概述 在逛并发编程网的时候,看到了并发框架Disruptor译文这个系列文章. Martin Fowler在自己网站上写了一篇LMAX架构(译文)的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,能够在无锁的情况下实现网络

无锁并发和无等待并发的对比分析

原文地址:作者:rethinkdb  译者:sooerr 校对:方腾飞 有两种非阻塞线程同步算法,即无锁和无等待,这两种算法经常会产生混淆. 在无锁系统中,当任何特定的运算被阻塞的时候,所有CPU可以继续处理其他的运算.换种方式说,在无锁系统中,当给定线程被其他线程阻塞的时候,所有CPU可以不停的继续处理其他工作.无锁算法大大增加系统整体的吞吐量,因为它只偶尔会增加一定的交易延迟.大部分高端数据库系统是基于无锁算法而构造的,以满足不同级别. 相反,无等待算法保证了所有CPU在连续处理有效工作的时

并发框架Disruptor译文

Martin Fowler在自己网站上写了一篇LMAX架构的 文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理 器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,并获得2011 Duke's 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作.本文是Disru

使用CAS实现无锁的SkipList

感谢同事[付哲]发布此文. 无锁 并发环境下最常用的同步手段是互斥锁和读写锁,例如pthread_mutex和pthread_readwrite_lock,常用的范式为: void ConcurrencyOperation() { mutex.lock(); // do something mutex.unlock(); } 这种方法的优点是: 编程模型简单,如果小心控制上锁顺序,一般来说不会有死锁的问题: 可以通过调节锁的粒度来调节性能. 缺点是: 所有基于锁的算法都有死锁的可能: 上锁和解锁

使用JAVA实现高并发无锁数据库操作步骤分享_java

1. 并发中如何无锁.一个很简单的思路,把并发转化成为单线程.Java的Disruptor就是一个很好的例子.如果用java的concurrentCollection类去做,原理就是启动一个线程,跑一个Queue,并发的时候,任务压入Queue,线程轮训读取这个Queue,然后一个个顺序执行. 在这个设计模式下,任何并发都会变成了单线程操作,而且速度非常快.现在的node.js, 或者比较普通的ARPG服务端都是这个设计,"大循环"架构.这样,我们原来的系统就有了2个环境:并发环境 +

C#算法之基于无锁的C#并发队列实现

最近开始学习无锁编程,和传统的基于Lock的算法相比,无锁编程具有其独特的优点,Angel Lucifer 的关于无锁编程一文对此有详细的描述. 无锁编程的目标是在不使用Lock的前提下保证并发过程中共享数据的一致性,其主要的实现基础是CAS 操作,也就是compare_and_swap,通过处理器提供的指令,可以原子地更新共享数据,并同时监测其他线 程的干扰,.Net中的对应实现是InterLocked.CompareExchange函数. 既然不使用Lock,那在无锁编程中要时刻注意的是,代

并发无锁队列学习之一【开篇】

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

基于锁的并发算法 vs 无锁的并发算法

原文链接 作者:Martin Thompson  译者:曹姚君 校对: 方腾飞 上周在由Heinz Kabutz通过JCrete 组织的开放空间会议(unconference)上,我参加一个新的java规范 JSR166 StampedLock的审查会议.StampedLock 是为了解决多个readers 并发访问共享状态时,系统出现的内存地址竞争问题.在设计上通过使用乐观的读操作,StampedLock 比ReentrantReadWriteLock 更加高效: 在会议期间,我突然意思到两点

如何实现超高并发的无锁缓存?

一.需求缘起 [业务场景] 有一类写多读少的业务场景:大部分请求是对数据进行修改,少部分请求对数据进行读取. 例子1:滴滴打车,某个司机地理位置信息的变化(可能每几秒钟有一个修改),以及司机地理位置的读取(用户打车的时候查看某个司机的地理位置). void SetDriverInfo(long driver_id, DriverInfoi); // 大量请求调用修改司机信息,可能主要是GPS位置的修改 DriverInfo GetDriverInfo(long driver_id);  // 少