尽管JVM在并发上已经做了很多优化工作,如偏向锁、轻量级锁、自旋锁等等。但是基于Synchronized
wait
notify
实现的同步机制还是无法满足日常开发中。原生同步机制在时间和空间上的开销也一直备受诟病。为了提升Java程序在并发场景下的性能、扩展性和健壮性,java.util.concurrent的使用必不可少。java.util.concurrent 包含许多线程安全、测试良好、高性能的并发构建块。通过使用java.util.concurrent,开发人员可以提高并发类的线程安全、可伸缩性、性能、可读性和可靠性。
java.util.concurrent
的功能很强大,要想完整了解其全部细节也是很不容易的,需要多年的学习和实践经验。不过,通过深入其核心部分,可以快速了解其骨架和底层实现机制。那么谁才是java.util.concurrent
的核心组件呢?稍微看过一点java.util.concurrent
源码的同学知道,concurrent包下很多组件如:ReentrantLock
Semaphore
CountDownLatch
在其内部都有一个sync类,而这个sync
有继承自java.util.concurrent.locks.AbstractQueuedSynchronizer
,而这个AbstractQueuedSynchronizer
就是concurrent包的核心。尽管AbstractQueuedSynchronizer
只是一个类,但其实质上却提供了一个框架,通过提供基于FIFO的队列管理机制、线程阻塞机制和状态同步机制,用户可以快速基于AbstractQueuedSynchonizer
完成一系列复杂的进程同步操作。如果第一次接触到AbstractQueuedSynchronizer
,建议读一下其作者的论文:The java.util.concurrent Synchronizer Framework。
1 概述
AbstractQueuedSynchronizer
(以下简称AQS
)从字面理解是一个抽象的基于队列的同步器,所以AQS
至少要完成以下几部分工作:
- 同步状态的原子性管理
- 等待线程队列的维护
- 线程的阻塞和唤醒
- 仅定义核心操作,留出足够的扩展性给子类
AQS
定义了两个核心操作:acquire
release
及其变种。前者用于进入同步块前获取同步块执行权,后者用于释放对于同步块的占有权。
acquire
核心逻辑如下:
// 循环里不断尝试,典型的失败后重试
while (synchronization state does not allow acquire) {
// 同步状态不允许获取,进入循环体,也就是失败后的处理
enqueue current thread if not already queued; // 如果当前线程不在等待队列里,则加入等待队列
possibly block current thread; // 可能的话,阻塞当前线程
}
// 执行到这里,说明已经成功获取,如果之前有加入队列,则出队列。
dequeue current thread if it was queued;
release
核心逻辑如下:
update synchronization state; // 更新同步状态
if (state may permit a blocked thread to acquire) // 检查状态是否允许一个阻塞线程获取
unblock one or more queued threads; // 允许,则唤醒后继的一个或多个阻塞线程。
而要实现上述两个核心接口,就必须实现前文提到的AQS
主要工作的前三项:
- 同步状态的原子性管理
- 阻塞线程队列的维护
- 线程的阻塞和唤醒
实际使用中,AQS
提供了以下5个模板方法:
tryAcquire(int) // 试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。
tryRelease(int) // 试图设置状态来反映独占模式下的一个释放。
tryAcquireShared(int) // 试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。
tryReleaseShared(int) // 试图设置状态来反映共享模式下的一个释放。
isHeldExclusively() // 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true。此方法是在每次调用非等待 AbstractQueuedSynchronizer.ConditionObject 方法时调用的。(等待方法则调用 release(int)。)
2 实现
2.1 同步状态的原子性管理
AQS
内部维护一个32bit字段state
用于描述当前状态,state
字段有volatile
修饰,保证了其可见性。同时AQS
还提供了getState
,setState
, compareAndSetState
等方法用于状态的读取和更新:
- getState:提供一个基于内存语义(memory semantics)的volatile变量(state)读取
- setState:提供一个基于内存予以(memory semantics)的volatile变量(state)更新
- compareAndSetState:提供一个基于CAS(compare and swap)的原子性状态更新操作
通过简单的原子读写就可以达到内存可视性,减少了同步的需求。子类可以获取和设置状态的值,通过定义状态的值来表示 AQS 对象是否被获取或被释放。
2.2 线程的阻塞与唤醒
AQS
基于java.util.concurrent.locks.LockSupport
支持创建锁和其他同步类需要的基本线程阻塞、解除阻塞原语。
这个类最主要的功能有两个:
- park:把线程阻塞
- unpark:让线程恢复执行
其实除了LockSupport
,Java之初就有Object
对象的wait和notify方法可以实现线程的阻塞和唤醒。那么它们的区别是什么呢?
主要的区别应该说是它们面向的对象不同。阻塞和唤醒是对于线程来说的,LockSupport的park/unpark更符合这个语义,以“线程”作为方法的参数, 语义更清晰,使用起来也更方便。而wait/notify的实现使得“线程”的阻塞/唤醒对线程本身来说是被动的,要准确的控制哪个线程、什么时候阻塞/唤醒很困难, 要不随机唤醒一个线程(notify)要不唤醒所有的(notifyAll)。
LockSupport
并不需要获取对象的监视器。LockSupport机制是每次unpark
给线程1个“许可”——最多只能是1,而park
则相反,如果当前 线程有许可,那么park方法会消耗1个并返回,否则会阻塞线程直到线程重新获得许可,在线程启动之前调用park/unpark
方法没有任何效果。
// 1次unpark给线程1个许可
LockSupport.unpark(Thread.currentThread());
// 如果线程非阻塞重复调用没有任何效果
LockSupport.unpark(Thread.currentThread());
// 消耗1个许可
LockSupport.park(Thread.currentThread());
// 阻塞
LockSupport.park(Thread.currentThread());
因为它们本身的实现机制不一样,所以它们之间没有交集,也就是说LockSupport阻塞的线程,notify/notifyAll没法唤醒。
2.3 队列维护
队列管理是AQS
的核心部分,作者采用了基于CLH锁队列
来实现内部队列。CLH锁(可参考:CLH锁)通常用于自旋锁,我们反而用于阻塞同步器,但使用相同的基本策略:在(线程)它自己结点持有关于线程的一些控制信息。每个结点的 “status” 字段跟踪一个线程是否应该阻塞。一个结点在它的前驱释放时被通知。队列的每个结点作为一个特定通知风格(specific-notification-style)的监视器服务,持有单一等待线程。”status” 字段不控制线程是否授予。一个线程可能尝试去获取如果它是第一个进入队列,但成为第一个不保证就成功;它只是获得权利去竞争,所以当前释放的竞争者线程可能需要再次等待(注:这是公平性的问题,子类的实现可以进行控制)。
为了进入CLH锁队列,你只需要原子地把它作为一个新的尾结点拼接;为了出队列,你只需要设置 “head” 字段。
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
队列部分比较复杂,详细的介绍请参考下一篇博客。
3 总结
本文只在于提纲挈领式地指出AQS
的大致框架以及主要作用,读者需要了解作为一个维护内部竞争队列的同步器,AQS
需要完成三部分工作:
- 共享状态的原子性维护
- 线程的阻塞与唤醒
- 竞争队列的维护