记得很久以前有一次面试被问到如何编写无锁程序,我当时觉得那个面试官脑子进水了,我们确实可以在某些情况下减少锁的使用,但是怎么可能避免呢?当然我现在还是持这种观点,在Java中,你可以有很多方法减少锁的使用(至少在你自己的代码中看起来):
1. 比如常见的可以使用volatile关键字来保证某个字段在一个线程中的更新对其他线程的可见性;
2. 可以使用concurrent.atomic包中的各种Atomic类来实现某些基本类型操作的,它主要采用忙等机制(CAS,compare and swap方法)以及内部的volatile变量来实现不加锁情况下对某个字段特定更新的线程安全;
3. 使用ThreadLocal,为每个线程保存保存自己的对象,保证对它的操作永远只有一个线程;
4. 使用concurrent包下已经实现的线程安全集合,如ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList、CopyOnWriteArraySet等,在这些集合的实现中也会看到volatile和CAS的身影,但是有些时候它们自身也不得不使用锁;
5. 使用BlockingQueue实现生产者消费者模式,这个BlockingQueue有点类似EventBus,很多时候可以简化多线程编程环境下的复杂性,在特定情况下还可以采用单个生产者或单个消费者的方式来避免多线程环境,但是在对一些临界资源和类操作时,还是会需要使用锁,而且在很多BlockingQueue的内部实现中也使用了锁;
6. 使用Guava提供的StripLock方式来将一些特定的操作、数据映射到固定的线程中,从而保证对它们的操作的线程安全。
然而很多时候Java提供的那些所谓的线程安全类只是一些小范围的操作,比如AtomicInteger中的incrementAndGet()/decrementAndGet(),ConcurrentHashMap中的get和put,当需要将一些操作组合在一起的时候,我们还是不得不使用锁,比如更具一个key取出Map中的值,对值进行一定操作,然后将该值写入Map中。以前我一直觉得这种操作不用锁是不太可能解决的,但是最近做的一个项目需要从REUTERS中source数据,REUTERS中的很多数据tick都非常快,为了提高项目的处理能力,我们必须要减少锁的使用,因而我开始尝试着在不用锁的情况下实现某些操作的组合而依然能保持线程安全,然后发现在组合使用CAS和ConcurrentMap接口中提供的putIfAbsent、replace、remove等根据传入的值再来更新状态的方式还真的能实现不用锁组合某些操作而依然保持线程安全(至少在自己的代码中无锁)。对于这种尝试出厂了一个ReferenceCountSet。
ReferenceCountSet实现
从这个Set的名字中应该已经能够知道它的用途了,在我的这个项目中,我们需要自己维护对某些实例的引用计数,所以最基本的,必须有一个集合存储一个实例到它的引用计数的映射关系,而一个实例在这个集合中必须是唯一存在,因而我自然想到了使用Map来保存;另外在对引用计数增加时,需要分以下几个步骤实现:
1. 判断一个新添加的实例是否已经在这个Map中存在
2. 如果不存在,则将该实例添加到这个Map中,并设置其引用计数为1
3. 如果已经存在,则需要取出已经存在的引用计数,对其加1,然后将新值写入这个Map中。
对remove方法来说也是类似的,即需要取出Map中已经存在的计数值以后,对其引用减1,然后判断是否为0来决定是否需要将当前实例真正从这个Map中移除。
既然需要那么多的组合操作,显然没有一个已经存在的Map可以实现不加锁情况下的线程安全。因而我起初的选择是使用HashMap<E, Integer>加锁实现(以add为例):
synchronized(map) {
Integer refCount = map.get(element);
if (refCount == null) {
refCount = 1;
} else {
refCount += 1;
}
map.put(element, refCount);
}
但是如何不用锁实现这些组合操作呢?秘诀就在于灵活的使用AtomicInteger和ConcurrentMap接口。首先需要将这个Map改成ConcurrentMap,如ConcurrentHashMap,然后将这个Map的值改成AtomicInteger,然后采用如下实现即可:
public boolean add(E element) {
AtomicInteger refCount = data.get(element);
if (refCount == null) {
// Avoid to put AtomicInteger(0) as during remove we need this value to compare
AtomicInteger newRefCount = new AtomicInteger(1);
refCount = data.putIfAbsent(element, newRefCount);
if (refCount == null) {
return true;
}
}
refCount.incrementAndGet();
return true;
}
在这个add方法实现中,我们首先直接使用传入的element获取内部存在AtomicInteger值,如果该值为null,表示当前还没有对它有引用计数,因而我们初始化一个AtomicInteger(1)对象,但是这时我们不能直接将这个1作为该对象的引用计数,因为另一个线程可能在这中间已经添加相同对象的引用计数了,这个时候如果我们还直接写入会覆盖在这个中间添加的引用计数。所以我们需要使用ConcurrentMap中的putIfAbsent方法,即如果其他线程在这个还是没有对这个对象有引用计数更新的情况下,我们才会真正写入现在的引用计数值,从而不会覆盖在这中间写入的值,该方法返回原来已经在这个Map中的值,因而如果返回null,表示在这个过程中没有其他线程对这个对象的计数值有改变,所以直接返回;如果返回不为null,表示在这个中间其他线程有对这个对象有做引用计数的改变,并且返回更新的AtomicInteger值,此时只需要像已经存在引用计数实例的情况下对这个返回的AtomicInteger只自增即可,由于AtomicInteger是线程安全的,因而这个操作也是安全的。并且由于对每个线程都使用同一个AtomicInteger引用实例,因而每个线程的自增都会正确的反映在Map的值中,因而这个操作也是正确的。
这里,我其实是将这个ConcurrentMap封装在一个Set中,因而我们还需要实现一些其他方法,如size、contains、iterator、remove等,由于其他方法的在ConcurrentHashMap中已经是线程安全的了,因而我们只需要实现remove方法即可。这里的remove方法也包含了多个操作的组合:先取出以存在的计数,对其减1,如果发现它的计数已经减到0,将它从这个Map中移除。这里需要使用ConcurrentMap中提供的条件remove方法来实现:
public boolean remove(Object obj) {
AtomicInteger refCount = data.get(obj);
if (refCount == null) {
return false;
}
if (refCount.decrementAndGet() <= 0) {
return data.remove(obj, refCount);
}
return false;
}
这里需要解释的就是这个条件remove方法,即当前在对这个Object对象的引用计数已经减到0的情况下,我们不能直接将其移除,因为在这个中间可能有另一个线程又增加了对它的引用计数,所以我们需要使用条件remove,即只有当前Map中对这个Object对象的值和传入的值相等时才将它移除,这样就保证了当前线程的操作不会覆盖中间其他线程的结果。
在这个remove的实现中,有人可能会说在data.get(data)这句话执行完成后,加入它返回null,此时其他线程可能已经会添加了这个Object对象的引用,此时这个方法的执行结果就不对了,但是在这种情况下,即使加锁,也无法解决这个问题,而且很多情况下的线程安全只能保证happen-before原则。关于这个类的实现也有一些其他细节的东西,具体可以查看这里:
https://github.com/dinglevin/levin-tools/blob/master/src/main/java/org/levin/tools/corejava/sets/ReferenceCountSet.java