ReferenceCountSet无锁实现

记得很久以前有一次面试被问到如何编写无锁程序,我当时觉得那个面试官脑子进水了,我们确实可以在某些情况下减少锁的使用,但是怎么可能避免呢?当然我现在还是持这种观点,在Java中,你可以有很多方法减少锁的使用(至少在你自己的代码中看起来):

1.     比如常见的可以使用volatile关键字来保证某个字段在一个线程中的更新对其他线程的可见性;

2.     可以使用concurrent.atomic包中的各种Atomic类来实现某些基本类型操作的,它主要采用忙等机制(CAS,compare and swap方法)以及内部的volatile变量来实现不加锁情况下对某个字段特定更新的线程安全;

3.     使用ThreadLocal,为每个线程保存保存自己的对象,保证对它的操作永远只有一个线程;

4.     使用concurrent包下已经实现的线程安全集合,如ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList、CopyOnWriteArraySet等,在这些集合的实现中也会看到volatile和CAS的身影,但是有些时候它们自身也不得不使用锁;

5.     使用BlockingQueue实现生产者消费者模式,这个BlockingQueue有点类似EventBus,很多时候可以简化多线程编程环境下的复杂性,在特定情况下还可以采用单个生产者或单个消费者的方式来避免多线程环境,但是在对一些临界资源和类操作时,还是会需要使用锁,而且在很多BlockingQueue的内部实现中也使用了锁;

6.     使用Guava提供的StripLock方式来将一些特定的操作、数据映射到固定的线程中,从而保证对它们的操作的线程安全。

然而很多时候Java提供的那些所谓的线程安全类只是一些小范围的操作,比如AtomicInteger中的incrementAndGet()/decrementAndGet(),ConcurrentHashMap中的get和put,当需要将一些操作组合在一起的时候,我们还是不得不使用锁,比如更具一个key取出Map中的值,对值进行一定操作,然后将该值写入Map中。以前我一直觉得这种操作不用锁是不太可能解决的,但是最近做的一个项目需要从REUTERS中source数据,REUTERS中的很多数据tick都非常快,为了提高项目的处理能力,我们必须要减少锁的使用,因而我开始尝试着在不用锁的情况下实现某些操作的组合而依然能保持线程安全,然后发现在组合使用CAS和ConcurrentMap接口中提供的putIfAbsent、replace、remove等根据传入的值再来更新状态的方式还真的能实现不用锁组合某些操作而依然保持线程安全(至少在自己的代码中无锁)。对于这种尝试出厂了一个ReferenceCountSet。

ReferenceCountSet实现

从这个Set的名字中应该已经能够知道它的用途了,在我的这个项目中,我们需要自己维护对某些实例的引用计数,所以最基本的,必须有一个集合存储一个实例到它的引用计数的映射关系,而一个实例在这个集合中必须是唯一存在,因而我自然想到了使用Map来保存;另外在对引用计数增加时,需要分以下几个步骤实现:

1.     判断一个新添加的实例是否已经在这个Map中存在

2.     如果不存在,则将该实例添加到这个Map中,并设置其引用计数为1

3.     如果已经存在,则需要取出已经存在的引用计数,对其加1,然后将新值写入这个Map中。

对remove方法来说也是类似的,即需要取出Map中已经存在的计数值以后,对其引用减1,然后判断是否为0来决定是否需要将当前实例真正从这个Map中移除。

既然需要那么多的组合操作,显然没有一个已经存在的Map可以实现不加锁情况下的线程安全。因而我起初的选择是使用HashMap<E, Integer>加锁实现(以add为例):

synchronized(map) {
    Integer refCount = map.get(element);
    if (refCount == null) {
        refCount = 1;
    } else {
        refCount += 1;
    }
    map.put(element, refCount);
}

但是如何不用锁实现这些组合操作呢?秘诀就在于灵活的使用AtomicInteger和ConcurrentMap接口。首先需要将这个Map改成ConcurrentMap,如ConcurrentHashMap,然后将这个Map的值改成AtomicInteger,然后采用如下实现即可:    

public boolean add(E element) {
    AtomicInteger refCount = data.get(element);
    if (refCount == null) {
        // Avoid to put AtomicInteger(0) as during remove we need this value to compare
        AtomicInteger newRefCount = new AtomicInteger(1);
        refCount = data.putIfAbsent(element, newRefCount);
        if (refCount == null) {
            return true;
        }
    }
    refCount.incrementAndGet();
    return true;
}

在这个add方法实现中,我们首先直接使用传入的element获取内部存在AtomicInteger值,如果该值为null,表示当前还没有对它有引用计数,因而我们初始化一个AtomicInteger(1)对象,但是这时我们不能直接将这个1作为该对象的引用计数,因为另一个线程可能在这中间已经添加相同对象的引用计数了,这个时候如果我们还直接写入会覆盖在这个中间添加的引用计数。所以我们需要使用ConcurrentMap中的putIfAbsent方法,即如果其他线程在这个还是没有对这个对象有引用计数更新的情况下,我们才会真正写入现在的引用计数值,从而不会覆盖在这中间写入的值,该方法返回原来已经在这个Map中的值,因而如果返回null,表示在这个过程中没有其他线程对这个对象的计数值有改变,所以直接返回;如果返回不为null,表示在这个中间其他线程有对这个对象有做引用计数的改变,并且返回更新的AtomicInteger值,此时只需要像已经存在引用计数实例的情况下对这个返回的AtomicInteger只自增即可,由于AtomicInteger是线程安全的,因而这个操作也是安全的。并且由于对每个线程都使用同一个AtomicInteger引用实例,因而每个线程的自增都会正确的反映在Map的值中,因而这个操作也是正确的。

这里,我其实是将这个ConcurrentMap封装在一个Set中,因而我们还需要实现一些其他方法,如size、contains、iterator、remove等,由于其他方法的在ConcurrentHashMap中已经是线程安全的了,因而我们只需要实现remove方法即可。这里的remove方法也包含了多个操作的组合:先取出以存在的计数,对其减1,如果发现它的计数已经减到0,将它从这个Map中移除。这里需要使用ConcurrentMap中提供的条件remove方法来实现:    

public boolean remove(Object obj) {
    AtomicInteger refCount = data.get(obj);
    if (refCount == null) {
        return false;
    }
    if (refCount.decrementAndGet() <= 0) {
        return data.remove(obj, refCount);
    }
    return false;
}

这里需要解释的就是这个条件remove方法,即当前在对这个Object对象的引用计数已经减到0的情况下,我们不能直接将其移除,因为在这个中间可能有另一个线程又增加了对它的引用计数,所以我们需要使用条件remove,即只有当前Map中对这个Object对象的值和传入的值相等时才将它移除,这样就保证了当前线程的操作不会覆盖中间其他线程的结果。

在这个remove的实现中,有人可能会说在data.get(data)这句话执行完成后,加入它返回null,此时其他线程可能已经会添加了这个Object对象的引用,此时这个方法的执行结果就不对了,但是在这种情况下,即使加锁,也无法解决这个问题,而且很多情况下的线程安全只能保证happen-before原则。关于这个类的实现也有一些其他细节的东西,具体可以查看这里:

https://github.com/dinglevin/levin-tools/blob/master/src/main/java/org/levin/tools/corejava/sets/ReferenceCountSet.java

时间: 2024-10-30 09:43:47

ReferenceCountSet无锁实现的相关文章

无锁并发框架Disruptor

概述 在逛并发编程网的时候,看到了并发框架Disruptor译文这个系列文章. Martin Fowler在自己网站上写了一篇LMAX架构(译文)的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,能够在无锁的情况下实现网络

C#算法之基于无锁的C#并发队列实现

最近开始学习无锁编程,和传统的基于Lock的算法相比,无锁编程具有其独特的优点,Angel Lucifer 的关于无锁编程一文对此有详细的描述. 无锁编程的目标是在不使用Lock的前提下保证并发过程中共享数据的一致性,其主要的实现基础是CAS 操作,也就是compare_and_swap,通过处理器提供的指令,可以原子地更新共享数据,并同时监测其他线 程的干扰,.Net中的对应实现是InterLocked.CompareExchange函数. 既然不使用Lock,那在无锁编程中要时刻注意的是,代

《OpenACC并行程序设计:性能优化实践指南》一 1.5 无锁编程

1.5 无锁编程 互斥锁是用于同步进程或线程的常用机制,这些进程或线程需要访问并行程序中的一些共享资源.互斥锁就像它们名字所说的:如果一个线程锁住了资源,另一个线程希望访问它需要等待第一个线程解锁这个资源.一旦资源被解锁,第二个线程在处理这个资源时会一直锁住它.程序的线程必须遵守:一旦使用完共享资源尽快解锁,以保持程序执行流程. 由于OpenACC中没有锁,编程人员需要熟悉无锁编程和数据结构的概念.无锁方法保证至少一个执行该方法的线程的进展.可能存在某些线程可以被延迟的情况,但是保证至少一个线程

无锁和无等待的定义和例子

原文链接,译文连接,译者:周可人,校对:梁海舰 在查阅google之后,我发现没有一处对并发算法或是数据结构规定的演进条件(progress condition,注:参考[1],译者认为翻译为演进状态更为合适)做合理的解释.甚至在"The Art of Multiprocessor Programming"中也只有围绕书本的一小段定义,大部分定义是单行的句子,因而造成了我们普通人含义模糊的理解,所以在这里我把我对这些概念的理解整理在一起,并且在每一种概念后面给出相应的例子. 我们先将演

线程-一个完全无锁无原子的疑问,以及猜想?

问题描述 一个完全无锁无原子的疑问,以及猜想? 先基于一个单生产单消费的情况,我写了如下一个class:templateclass SingleLockFree{public:SingleLockFree(){m_tail = new Node();m_Head = m_tail;}~SingleLockFree(){ //做最后未处理的内存的释放}void Push(T t)//生产线程{Node* p = new Node();//内存分配待优化m_tail->_data = t;m_tai

无锁并发和无等待并发的对比分析

原文地址:作者:rethinkdb  译者:sooerr 校对:方腾飞 有两种非阻塞线程同步算法,即无锁和无等待,这两种算法经常会产生混淆. 在无锁系统中,当任何特定的运算被阻塞的时候,所有CPU可以继续处理其他的运算.换种方式说,在无锁系统中,当给定线程被其他线程阻塞的时候,所有CPU可以不停的继续处理其他工作.无锁算法大大增加系统整体的吞吐量,因为它只偶尔会增加一定的交易延迟.大部分高端数据库系统是基于无锁算法而构造的,以满足不同级别. 相反,无等待算法保证了所有CPU在连续处理有效工作的时

从volatile解读ConcurrentHashMap(jdk1.6.0)无锁读

作者:绫萱 volatile常常用于修饰多线程共享变量,用来保证该变量的可见性.volatile的语意:某个写线程对volatile变量的写入马上可以被后续的某个读线程"看"到. volatile保证可见性的原理:volatile是通过在编译器生成字节码时,在对volatile变量进行读写指令序列的前后加入内存屏障,来禁止一些处理器重排序保证写入一定发生在读之前的这种happen-before关系.   简单理解:在本次线程内,当读取一个变量时,为提高存取速度,编译器优化时有时会先把变

使用JAVA实现高并发无锁数据库操作步骤分享_java

1. 并发中如何无锁.一个很简单的思路,把并发转化成为单线程.Java的Disruptor就是一个很好的例子.如果用java的concurrentCollection类去做,原理就是启动一个线程,跑一个Queue,并发的时候,任务压入Queue,线程轮训读取这个Queue,然后一个个顺序执行. 在这个设计模式下,任何并发都会变成了单线程操作,而且速度非常快.现在的node.js, 或者比较普通的ARPG服务端都是这个设计,"大循环"架构.这样,我们原来的系统就有了2个环境:并发环境 +

并发无锁队列学习之一【开篇】

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (