队列同步器之AbstractQueuedSynchronizer
AbstractQueuedSynchronizer是一款抽象的队列同步器,简称AQS。在Java并发包下许多工具类(如:ReentrantLock,CountDownLatch,ThreadPoolExecutor,Semaphore,ReentrantReadWriteLock)都间接使用了该同步器来实现各自的功能。可以说,了解了这个同步器,就了解了并发包下的这些工具类的核心思想。本文通过一些展示AbstractQueuedSynchronizer的一些核心方法,以此来揭示它的设计思想。我相信了解了队列同步器,你便可以很轻松了解上述的一些工具类实现原理,并且可以基于这个同步器来实现定制自己的业务需求。
核心变量
首先我们看下它的一些核心的变量定义:
// 队列头
private transient volatile Node head;
// 队列尾
private transient volatile Node tail;
// 同步状态
private volatile int state;
// Node节点的定义
static final class Node {
// 共享模式。
static final Node SHARED = new Node();
// 独占模式。就是一个NULL节点
static final Node EXCLUSIVE = null;
// 各种等待状态定义,暂时了解一下,后面再具体分析
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 当前Node的等待状态,上面定义的某一个值之一或者是0,默认初始值即为0。
volatile int waitStatus;
// 前序节点以及后序节点,熟悉双向链表应该就不难理解了。
volatile Node prev;
volatile Node next;
// Node代表的线程,这个是Node包装的真实内容。
volatile Thread thread;
// 下一个等待者,后面再详细分析
Node nextWaiter;
}
既然是同步队列,因此有队列头,队列尾的值,很好理解。值得重点关注的是这里的state,它是个非常关键的变量,当前同步器的状态,以及各个线程之间的竞争关系都是通过对该值CAS式的修改来实现的。再看下对这些值的修改:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
// CAS设置头节点
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// CAS设置尾节点
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// CAS设置下一个节点
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
// CAS设置状态值
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
首先在static静态代码块里面,通过Unsafe类拿到这些关键变量的在内存中的位移地址。Unsafe顾名思义是不安全的,它主要用来操作底层的内存资源等。这里不展开说Unsafe类,暂时只需要做个简单理解,有兴趣同学可以网上查阅相关资料。总之,拿到了内存的位移地址,就可以知道这些变量在内存中的地址,为后续CAS操作这些变量提供了基础。
然后就是campareAndSetXXX的方法定义了,封装了Unsafe的CAS方法,来原子性的修改这些关键变量。
核心方法
有了对上述核心变量的知识,我们看下它的一些核心方法。
Acquire获取
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 节点以独占式状态,加入队列。
selfInterrupt();
}
// 尝试获取
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
核心方法acquire,单词简单翻译过来就是获取、取得。结合同步器的语境,我把它理解为获取通行证或者取得凭证。方法的int参数可以理解为凭证的个数。
方法进来会调用尝试获取(tryAcquire),这个方法并没有任何实现,它交由子类去实现。该方法返回布尔值,表示尝试获取成功还是失败。如果尝试获取成功,则acquire直接返回;获取失败返回false,则进入后续的acquireQueued方法。
private Node addWaiter(Node mode) {
// 初始化,包装了当前的线程。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 如果尾节点不为空,则尝试快速添加一次节点。失败了,再调用后面的enq入队列方法。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// Node加入到队列
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquire方法失败了,则将节点加入到队列中。其中,addWaiter方法就是入队列操作,如果tail节点不为空,说明队列中已经有了数据,则快速尝试CAS加一次队列,将该节点加入到队尾,原有的tail的next指向当前节点。如果竞争不激烈情况下,这里始终可以成功,因此提高了性能;竞争激烈的情况下,这里还是回退到调用enq方法,以死循环方式,不停尝试向队尾添加node对象。
添加队列成功以后,再调用acquireQueued方法,字面意思是,让已经排队的节点获取凭证。具体看下它的实现:
1、判断当前Node的前序节点是否是head节点。由于head节点一般是个伪节点(DummyHead),它并不实际存储值,因此如果判断成立,则当前节点就是第一个实际存储值的节点,然后再次尝试获取(tryAcquire)下,获取成功则直接返回,然后头节点再次配置成伪节点。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
2、之后的判断便是shouldParkAfterFailedAcquire,字面理解就是它的实际含义:获取失败之后,线程是否需要挂起呢?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
代码中看出,这里就需要通过前序节点的状态(waitStatus)来判断。上述的Node状态终于在这里露出了点面目。如果前序节点是在SIGNAL状态,则返回true,否则其他状态返回false,即不需要挂起。为了方便理解,我们想象一种场景:最开始线程A进入时,acquire成功,由于这个线程执行任务比较耗时,导致后续的线程进入时,全都需要进入队列排队;因此当第一个排队线程进入时,这里的前序节点其实就是head节点,头节点的waitStatus初始是0,根据判断流程来到:compareAndSetWaitStatus(pred, ws, Node.SIGNAL),将头节点等待状态设置为SIGNAL,最后返回false。然而由于我们的A线程还在执行任务,acquireQueued来到下一次循环,很显然这次循环下的shouldParkAfterFailedAcquire返回了true,表示需要park。由此可见,它是通过修改前序节点的waitStatus,来引导自身进入park。
3、既然需要park,则就实际调用parkAndCheckInterrupt方法来挂起线程:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 这是挂起线程的实现。
return Thread.interrupted();
}
它利用LockSupport.park的方法,来挂起当前的线程。其内部其实是Unsafe.park方法来实现。
4、另外一点需要说明,Thread.interrupted会清除中断状态,因此acquireQueued是不响应中断的。当线程被中断时,虽然会跳出挂起状态,但在下一次loop循环时,只要没获取到凭证,依旧会被park,即被挂起。如果需要响应中断式的acquire,AQS提供了acquireInterruptibly方法,实现大同小异,此处不再展开。
到这里,线程从进入,然后没获取到凭证进入队列,最后挂起;这整个流程我们都梳理了一下。我们可以看到这个队列同步器的大致样貌:线程进来首先尝试获取凭证,没有获取到则进入队列挂起;同样的,根据waitStatus状态,即便多个线程同时进入情况下也是依次会挂起。这就像路上堵车一般,前面的车停止了,后面来的车一样也要停止等待。但是队列并不能保证最先等待的线程一定最先获取到凭证(有可能刚进入的线程先抢占到了凭证);如果想要完全的FIFO,则你需要自己实现tryAcquire。
Release释放
有获取,便有释放。只有释放了凭证,才能让等待队列中的其他线程得到调度。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
同样的,释放时先调用尝试释放(tryRelease),它也是空实现,需要子类去实现。尝试释放成功之后,则unpark(唤醒)头节点的后序节点:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
先将节点(这里是Head节点)的waitStatus状态更改为0。后面就是判断如果当前的next节点是空,或者waitStatus>0(即为CANCELLED)时,从队尾开始遍历,找到最后一个不为CANCELLED状态的节点。最后,对该节点调用unpark方法来唤醒线程。
唤醒后的线程继续在acquireQueued中的循环执行,尝试获取凭证操作。因此unpark唤醒是按队列先后顺序来的,但是在tryAcquire步骤能否获取到,则”听天由命”了。
稍微总结一下:acquire默认是独占式的,尝试获取不成功则进入队列排队挂起,之后就要等待顺序地唤醒操作。但是队列并不能保证严格顺序性,因为刚进入的线程有可能先被执行(类似于窗口被插队)。AQS的子类默认需要实现tryAcquire和tryRelease操作,真正的获取和释放由子类去控制实现,AQS提供了对获取失败的线程排队,线程挂起和唤醒等底层功能。如何获取凭证、获取等等这些问题是子类需要关心的问题,因此职责交还给子类去负责。
AcquireShared共享获取
上面都是介绍了独占式获取,下面就是共享式的获取,它们有什么区别呢?
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
共享式获取,也是先尝试获取下,tryAcquireShared同样需要子类自己实现。tryAcquireShared会返回一个整数:
- 返回负数。则表示获取失败。
- 返回0。表示共享式地获取成功;但是后续的共享式地获取不能成功。
- 返回正数。表示共享式获取成功,后续的共享式获取也可能成功。
这里我们需要仔细理解为什么tryAcquireShared返回整数,而tryAcquire返回布尔值?只有这样才能方便我们往下面理解。
独占式获取是只允许一个线程获取到凭证而执行,其余都需要等待;而共享式获取则允许一个或多个线程同时获取并执行,因此它返回一个整数来区分多种情况;这也是这两种不同获取方式一个重要的区别。后续的理解都要抓住这个本质。
因此,获取失败之后,进入到真正的共享式获取逻辑:doAcquireShared。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
有了之前分析acquire的逻辑,这里我们就非常容易理解了。节点添加到队列,然后进入循环,判断当前是否是首个实际节点。如果是的话,再次尝试共享式获取,这里需要注意的是,当获取成功(即返回的r>=0)时,同样要重新设置head,同时添加了一个Propagate(传播)功能,那这个传播作用是什么呢?
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
从这段逻辑我们可以看到,当tryAcquireShared返回为正数时(即propagate参数),表明其他线程还可以共享式获取,并且后序节点如果也是共享状态,则可以得到唤醒。这里的doReleaseShared后面将会分析。我们只需要知道传播Propagate就是用来判断后续节点是否也是共享状态,是的话,同样将它唤醒,这也是共享式的含义。
后面的shouldParkAfterFailedAcquire以及parkAndCheckInterrupt就不再多赘述,上面已经做过分析,这里表示:如果共享式资源已经被其他线程获取完了,那其它的线程只能挂起。
并且这个方法对head做了两次空判断,以及对它的waitStatus做了<0的判断,以此来保证传播性。可能这一层的判断,有点不太好理解。这里判断的含义,我们等下文的共享释放理解之后,一起来尝试说明。
ReleaseShared共享释放
共享式获取同样对应有一个共享式释放:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared也是一个由子类实现的方法。尝试共享释放(tryReleaseShared)成功后,也会进入doReleaseShared方法。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // head如果等于tail,说明队列为空
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这里判断头节点的waitStatus值,如果为SIGNAL,则尝试唤醒Head节点的后续节点。因为SIGNAL状态代表着下一个节点正在Park挂起状态。然后就是CAS修改头节点状态,如果修改失败,说明当前有另外的线程并发修改Head状态值,于是loop循环需要重新来一遍。
如果当前的状态值为0,则CAS修改状态为PROPAGATE状态。这里的设置和setHeadAndPropagate方法中的h.waitStatus < 0判断遥相呼应。
我们要清楚一点就是,共享式的方式要比独占式情况复杂一些。因为共享式涉及到了多个线程可能同时acquire,然后可能同时release。因此等待队列中的Head是一个竞争关键资源。所以doReleaseShared中就要利用死循环来重复执行,因为head竞争有可能失败,所以不得不重试,性能上就会有些损耗。
我们再来假设上述的场景,以便理解:线程A和线程B,它们两个获取完了所有的共享凭证并开始执行各自任务,其余进来的线程不得不排队挂起等待。由于线程A和B都是耗时操作,所有的线程挂起之后,队列中的每个节点(tail除外)的waitStatus此时都是SIGNAL。一段时间之后,线程A和线程B同时释放了凭证。在doReleaseShared方法中,它们俩都会竞争Head节点,表现为CAS修改状态。我们再假设,B线程竞争胜利,它成功将waitStatus从SIGNAL状态修改为0,然后唤醒后序节点线程C。与此同时A线程开始loop循环重试。C线程被唤醒之后又尝试获取共享凭证,成功之后进入setHeadAndPropagate方法。好巧不巧,A线程重试之后,将head的waitStatus修改为了PROPAGATE,因此不管此时的C线程有没有执行到setHead(node)(改变头节点)这一步,传播状态都将延续,因为doReleaseShared依旧可以执行,因为C线程的h.waitStatus < 0是成立的,如果setHead已经被执行,无非A线程将再次loop循环,这可能会导致有多余的唤醒,这里的解释我们看下作者在源代码中的注释就明白:
private void setHeadAndPropagate(Node node, int propagate) {
...
/*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
共享式获取和释放的逻辑确实是有些难懂理解的,不防多读几遍,结合代码在纸上多演练一下。这也体现了并发编程的难度以及代码阅读性差。同样也告诫我们,对待多并发编程一定要对各种情况进行充分考量,并发情况下很多问题有时候仅仅通过测试是很难复线的。