【并发编程篇】从源码层面深入理解AQS
1. 引言
前面并发专题介绍了,使用基于CAS自旋实现的轻量级锁有两个大的问题:
- CAS恶性空自旋会浪费大量的CPU资源。
- 在SMP架构的CPU上会导致“总线风暴”。
解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:分散操作热点和使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS的性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSynchronizer(抽象同步器类,简称为AQS)。
1.1 锁与队列的关系
无论是单体服务应用内部的锁,还是分布式环境下多体服务应用所使用的分布式锁,为了减少由于无效争夺导致的资源浪费和性能恶化,一般都基于队列进行排队与削峰。
1.1.1 CLH锁的内部队列
CLH自旋锁使用的CLH(Craig,Landin,and Hagersten Lock Queue)是一个单向队列,也是一个FIFO队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问,队列头部的节点表示占有锁的节点,新加入的抢锁线程则需要等待,会插入队列的尾部。

1.1.2 分布式锁的内部队列
在分布式锁的实现中,比较常见的是基于队列的方式进行不同节点中“等锁线程”的统一调度和管理。

1.1.3 AQS的内部队列
AQS是JUC提供的一个用于构建锁和同步容器的基础类。JUC包内许多类都是基于AQS构建的,例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。
AQS是CLH队列的一个变种,主要原理和CLH队列差不多,这也是前面对CLH队列进行长篇大论介绍的原因。AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接的后继节点。所以双向链表可以从任意一个节点开始很方便地访问前驱节点和后继节点。每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)

2. AQS基本成员
2.1 什么是AQS?
java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的
2.2 AQS具备的特性
- 阻塞等待队列
- 可重入
- 允许中断
- 公平/非公平
- 共享/独占
2.3 AQS的核心成员
AQS出于“分离变与不变”的原则,基于模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各种锁的子类(或者其内部类)去实现。
2.3.1 状态标志位
AQS中维持了一个单一的volatile修饰的状态信息state,AQS使用int类型的state标示锁的状态,可以理解为锁的同步状态。
//同步状态,使用 volatile保证线程可见
private volatile int state;
state因为使用volatile保证了操作的可见性,所以任何线程通过getState()获得状态都可以得到最新值。AQS提供了getState()、setState()来获取和设置同步状态,具体如下:
// 获取同步的状态
protected final int getState() {
return state;
}
// 设置同步的状态
protected final void setState(int newState) {
state = newState;
}
// 通过CAS设置同步的状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
由于setState()无法保证原子性,因此AQS给我们提供了compareAndSetState()方法利用底层UnSafe的CAS机制来实现原子性。compareAndSetState()方法实际上调用的是unsafe成员的compareAndSwapInt()方法。
2.3.2 队列节点类
AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员如下:
static final class Node {
/**节点等待状态值1:取消状态*/
static final int CANCELLED = 1;
/**节点等待状态值-1:标识后继线程处于等待状态*/
static final int SIGNAL = -1;
/**节点等待状态值-2:标识当前线程正在进行条件等待*/
static final int CONDITION = -2;
/**节点等待状态值-3:标识下一次共享锁的acquireShared操作需要无条件传播*/
static final int PROPAGATE = -3;
//节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
//普通的同步节点的初始值为0,条件等待节点的初始值为CONDITION(-2)
volatile int waitStatus;
//节点所对应的线程,为抢锁线程或者条件等待线程
volatile Thread thread;
//前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态
volatile Node prev;
//后继节点
volatile Node next;
//若当前Node不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上
//此属性指向下一个条件等待节点,即其条件队列上的后继节点
Node nextWaiter;
...
}
state 状态
- 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
- CANCELLED,值为1,表示当前的线程被取消;
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
- CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列 中;
- PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
waitStatus属性
每个节点与等待线程关联,每个节点维护一个状态waitStatus,waitStatus的各种值以常量的形式进行定义。waitStatus的各常量值具体如下:
(1)static final int CANCELLED=1
waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞,表示线程因为中断或者等待超时,需要从等待队列中取消等待。
由于该节点线程等待超时或者被中断,需要从同步队列中取消等待,因此该线程被置1。节点进入了取消状态,该类型节点不会参与竞争,且会一直保持取消状态。
(2)static final int SIGNAL=?1
waitStatus为SIGNAL(-1)时表示其后继的节点处于等待状态,当前节点对应的线程如果释放了同步状态或者被取消,就会通知后继节点,使后继节点的线程得以运行。
(3)static final int CONDITION=?2
waitStatus为-2时,表示该线程在条件队列中阻塞(Condition有使用),表示节点在等待队列中(这里指的是等待在某个锁的CONDITION上,关于CONDITION的原理后面会讲到),当持有锁的线程调用了CONDITION的signal()方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上,去竞争锁(注意:这里的同步队列就是我们讲的AQS维护的FIFO队列,等待队列则是每个CONDITION关联的队列)。
节点处于等待队列中,节点线程等待在CONDITION上,当其他线程对CONDITION调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入对同步状态的获取中。
(4)static final int PROPAGATE=?3
waitStatus为-3时,表示下一个线程获取共享锁后,自己的共享状态会被无条件地传播下去,因为共享锁可能出现同时有N个锁可以用,这时直接让后面的N个节点都来工作。这种状态在CountDownLatch中使用到了。
为什么当一个节点的线程获取共享锁后,要唤醒后继共享节点?共享锁是可以多个线程共有的,当一个节点的线程获取共享锁后,必然要通知后继共享节点的线程也可以获取锁了,这样就不会让其他等待的线程等很久,这种向后通知(传播)的目的也是尽快通知其他等待的线程尽快获取锁。
(5)waitStatus为0
waitStatus为0时,表示当前节点处于初始状态。
Node节点的waitStatus状态为以上5种状态的一种。
thread成员
Node的thread成员用来存放进入AQS队列中的线程引用;Node的nextWaiter成员用来指向自己的后继等待节点,此成员只有线程处于条件等待队列中的时候使用
抢占类型常量标识
Node节点还定义了两个抢占类型常量标识:SHARED和EXCLUSIVE,具体如下:
static final class Node {
//标识节点在抢占共享锁
static final Node SHARED = new Node();
//标识节点在抢占独占锁
static final Node EXCLUSIVE = null;
...
}
SHARED表示线程是因为获取共享资源时阻塞而被添加到队列中的;
EXCLUSIVE表示线程是因为获取独占资源时阻塞而被添加到队列中的。
FIFO双向同步队列
AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。
AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点head和tail记录队首和队尾元素,元素的节点类型为Node类型,具体如下:
/*首节点的引用*/
private transient volatile Node head;
/*尾节点的引用*/
private transient volatile Node tail;
AQS的首节点和尾节点都是懒加载的。懒加载的意思是在需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。head节点只能被setHead()方法修改,并且节点的waitStatus不能为CANCELLED。尾节点只在有新线程阻塞时才被创建

同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明 的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原 CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
AQS 依赖CLH同步队列来完成同步状态的管理:
当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造 成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同 步队列)

条件等待队列
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:
调用await方法阻塞线程;
当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条 件队列)
Condition接口详解
- 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向
Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。
- 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤 醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所 以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用 Condition#await方法而阻塞的线程。
@Slf4j
public class ConditionTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 开始处理任务");
condition.await();
log.debug(Thread.currentThread().getName() + " 结束处理任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 开始处理任务");
Thread.sleep(2000);
condition.signal();
log.debug(Thread.currentThread().getName() + " 结束处理任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}
}
3、AQS中的模板模式
AQS同步器是基于模板模式设计的,并且是模板模式很经典的一个运用。下面简单地给大家介绍一下模板模式,模板模式是很容易理解的设计模式之一。如果需要自定义同步器,一般的方法是继承AQS,并重写指定方法(钩子方法),按照自己定义的规则对state(锁的状态信息)进行获取与释放,将AQS组合在自定义同步组件的实现中,自定义同步器去调用AQS的模板方法,而这些模板方法会调用重写的钩子方法。
3.1 AQS的模板流程
AQS定义了两种资源共享方式:
Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。独享锁又可分为公平锁和非公平锁。
Share(共享锁):多个线程可同时占有锁资源,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock的Read锁。
AQS为不同的资源共享方式提供了不同的模板流程,包括共享锁、独享锁模板流程。这些模板流程完成了具体线程进出等待队列的(如获取资源失败入队/唤醒出队等)基础、通用逻辑。基于基础、通用逻辑,AQS提供了一种实现阻塞锁和依赖FIFO等待队列的同步器的框架,AQS模板为ReentedLocK、CountDownLatch、Semaphore提供了优秀的解决方案。
3.2 AQS中的钩子方法
自定义同步器时,AQS中需要重写的钩子方法大致如下:
·tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false。
·tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
·tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
·tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
·isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。
以上钩子方法的默认实现会抛出UnsupportedOperationException异常。除了这些钩子方法外,AQS类中的其他方法都是final类型的方法,所以无法被其他类继承,只有这几个方法可以被其他类继承。
3.2.1 tryAcquire独占式获取锁
AQS在这里没有对tryAcquire()进行功能的实现,只有一个抛出异常的语句,我们需要自己对其进行实现,可以对其重写实现公平锁、不公平锁、可重入锁、不可重入锁。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
3.2.2 tryRelease独占式释放锁
tryRelease尝试释放独占锁,需要子类来实现。
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
3.2.3 tryAcquireShared共享式获取
tryAcquireShared尝试进行共享锁的获得,需要子类来实现。
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}
3.2.4 tryReleaseShared共享式释放
tryReleaseShared尝试进行共享锁的释放,需要子类来实现。
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}
3.2.5 查询是否处于独占模式
isHeldExclusively()的功能是查询线程是否正在独占资源。在独占锁的条件队列中用到
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
4. AQS锁抢占的原理
4.1 AQS模板方法:acquire(arg)
acquire是AQS封装好的获取资源的公共入口,它是AQS提供的利用独占的方式获取资源的方法,源码实现如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
4.2 钩子实现:tryAcquire(arg)
public class DreamerLock extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int unused) {
//cas 加锁 state=0
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int unused) {
//释放锁
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
}
DreamerLock的tryAcquire()流程是:CAS操作state字段,将其值从0改为1,若成功,则表示锁未被占用,可成功占用,并且返回true;若失败,则获取锁失败,返回false。
DreamerLock的实现非常简单,是不可以重入的,仅仅为了学习AQS而编写。如果是可以重入的锁,在重复抢锁时会累计state字段值,表示重入锁的次数.
4.3 直接入队:addWaiter
在acquire模板方法中,如果钩子方法tryAcquire尝试获取同步状态失败的话,就构造同步节点(独占式节点模式为Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节点加入同步队列的队尾。
private Node addWaiter(Node mode) {
//创建新节点
Node node = new Node(Thread.currentThread(), mode);
// 加入队列尾部,将目前的队列tail作为自己的前驱节点pred
Node pred = tail;
// 队列不为空的时候
if (pred != null) {
node.prev = pred;
// 先尝试通过AQS方式修改尾节点为最新的节点
// 如果修改成功,将节点加入队列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋
enq(node);
return node;
}
在addWaiter()方法中,首先需要构造一个Node对象,具体代码如下:
Node node = new Node(Thread.currentThread(), mode);
构造Node对象所用到的两个参数如下:
(1)当前线程
构造Node对象时,将通过Thread.currentThread()获取当前线程作为第一个参数,该线程会被赋值给Node对象的thread成员属性,相当于将线程与Node节点进行绑定。在后续轮到此Node节点去占用锁时,就需要其thread属性获得需要唤醒的线程。
(2)Node共享类型
mode是一个表示Node类型的参数,用于标识新节点是独占地还是共享地去抢占锁。mode虽然为Node类型,但是仅仅起到类型标识的作用。mode可能的值有两个,以常量的形式定义在Node类中,具体如下:
static final class Node { /** 常量标识:标识当前的队列节点类型为共享型抢占 */
static final Node SHARED = new Node(); /** 常量标识:标识当前的队列节点类型为独占型抢占 */
static final Node EXCLUSIVE = null; // 省略其他代码
}
如果抢占独占锁,那么mode值为EXCLUSIVE;如果抢占共享锁,那么mode值为SHARED。
4.4 自旋入队:enq
addWaiter()第一次尝试在尾部添加节点失败,意味着有并发抢锁发生,需要进行自旋。enq()方法通过CAS自旋将节点添加到队列尾部。
/**
* 这里进行了循环,如果此时存在tail,就执行添加新队尾的操作
* 如果依然不存在,就把当前线程作为head节点
* 插入节点后,调用acquireQueued()进行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//队列为空,初始化尾节点和头节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* CAS 操作head指针,仅仅被enq()调用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS 操作tail指针,仅仅被enq()调用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
4.5 自旋抢占:acquireQueued()
在节点入队之后,启动自旋抢锁的流程。acquireQueued()方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是头节点时才能尝试获取锁,原因是:
(1)头节点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点。
(2)维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行for死循环。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋检查当前节点的前驱节点是否为头节点,才能获取锁
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 节点中的线程循环地检查自己的前驱节点是否为 head 节点
// 前驱节点是head时,进一步调用子类的tryAcquire(…)实现
if (p == head && tryAcquire(arg)) {
// tryAcquire()成功后,将当前节点设置为头节点,移除之前的头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起
// 如果需要挂起
// 调用parkAndCheckInterrupt()方法挂起当前线程,直到被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 若两个操作都是true,则置true
}
} finally {
//如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了)
//那么取消节点在队列中的等待
if (failed)
//取消请求,将当前节点从队列中移除
cancelAcquire(node);
}
}
acquireQueued()自旋过程中会阻塞线程,等待被前驱节点唤醒后才启动循环。如果成功就返回,否则执行shouldParkAfterFailedAcquire()、parkAndCheckInterrupt()来达到阻塞的效果。
调用acquireQueued()方法的线程一定是node所绑定的线程(由它的thread属性所引用),该线程也是最开始调用lock()方法抢锁的那个线程,在acquireQueued()的死循环中,该线程可能重复进行阻塞和被唤醒。
AQS队列上每一个节点所绑定的线程在抢锁的过程中都会自旋执行acquireQueued()方法的死循环,也就是说,AQS队列上每个节点的线程都不断自旋.
如果头节点获取了锁,那么该节点绑定的线程会终止acquireQueued()自旋,线程会去执行临界区代码。此时,其余的节点处于自旋状态,处于自旋状态的线程当然也不会执行无效的空循环而导致CPU资源浪费,而是被挂起(Park)进入阻塞状态。AQS队列的节点自旋不像CLH节点那样在空自旋而耗费资源。
4.6 挂起预判:shouldParkAfterFailedAcquire()
acquireQueued()自旋在阻塞自己的线程之前会进行挂起预判。shouldParkAfterFailedAcquire()方法的主要功能是:将当前节点的有效前驱节点(是指有效节点不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。具体可以分为三种情况:
- 如果前驱节点的状态为?1(SIGNAL),说明前驱的等待标志已设好,返回true表示设置完毕。
- 如果前驱节点的状态为1(CANCELLED),说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好:调整前驱节点的next指针为自己。
- 如果是其他情况:?3(PROPAGATE,共享锁等待)、?2(CONDITION,条件等待)、0(初始状态),那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱节点释放锁,当前节点就可以抢占锁了。
其源码如下:
private static boolean shouldParkAfterFailedAcquire(
Node pred, Node node) {
int ws = pred.waitStatus; // 获得前驱节点的状态
if (ws == Node.SIGNAL) //如果前驱节点状态为SIGNAL(值为-1)就直接返回
return true;
if (ws > 0) { // 前驱节点以及取消CANCELLED(1)
do {
// 不断地循环,找到有效前驱节点,即非CANCELLED(值为1)类型节点
// 将pred记录前驱的前驱
pred = pred.prev;
// 调整当前节点的prev指针,保持为前驱的前驱
node.prev = pred;
} while (pred.waitStatus > 0);
// 调整前驱节点的next指针
pred.next = node;
} else {
// 如果前驱状态不是CANCELLED,也不是SIGNAL,就设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
// 设置前驱状态之后,此方法返回值还是为false,表示线程不可用,被阻塞
}
return false;
}
在第一次进入此方法时,首先会进入后一个if判断的else分支,通过CAS设置pred前驱的waitStatus为SIGNAL,然后返回false。
此方法返回false之后,获取独占锁的acquireQueued()方法会继续进行for循环去抢锁:
(1)假设node的前驱节点是头节点,tryAcquire()抢锁成功,则获取到锁。
(2)假设node的前驱节点不是头节点,或者tryAcquire()抢锁失败,仍然会再次调用此方法。
第二次进入此方法时,由于上一次进入时已经将pred.waitStatus设置为?1(SIGNAL)了,因此这次会进入第一个判断条件,直接返回true,表示应该调用parkAndCheckInterrupt()阻塞当前线程,等待前一个节点执行完成之后唤醒。
1.waitStatus等于-3
什么时候遇到前驱节点状态waitStatus等于-3(PROPAGATE)的场景呢?PROPAGATE只能在使用共享锁的时候出现,并且只可能设置在head上。所以,对于非队尾节点,如果它的状态为0或PROPAGATE,那么它肯定是head。当等待队列中有多个节点时,如果head的状态为0或PROPAGATE,说明head处于一种中间状态,且此时有线程刚才释放锁了。而对于抢锁线程来说,如果检测到这种状态,说明再次执行acquire()方法是极有可能获得锁的。
2.waitStatus大于0
什么时候会遇到前驱节点的状态waitStatus大于0的场景呢?当pred前驱节点的抢锁请求被取消,后期状态为CANCELLED(值为1)时,当前节点(如果被唤醒)就会循环移除所有被取消的前驱节点,直到找到未被取消的前驱。在移除所有被取消的前驱节点后,此方法将返回false,再次去执行acquireQueued()的自旋抢占。
3.waitStatus等于0
什么时候遇到前驱节点状态waitStatus等于0(初始状态)的场景呢?分为两种情况:
(1)node节点刚成为新队尾,但还没有将旧队尾的状态设置为SIGNAL。
(2)node节点的前驱节点为head。
前驱节点为waitStatus等于0的情况是最常见的。比如现在AQS的等待队列中有很多节点正在等待,当前线程刚执行完毕addWaiter()(节点刚成为新队尾),然后现在开始执行获取锁的死循环(独占锁对应的是acquireQueued()里的死循环,共享锁对应的是doAcquireShared()里的死循环),此时节点的前驱(也就是旧队尾的状态)肯定还是0(也就是默认初始化的值),然后死循环执行两次,第一次执行shouldParkAfterFailedAcquire()自然会检测到前驱状态为0,然后将0设置为SIGNAL,第二次执行shouldParkAfterFailedAcquire(),由于前驱节点为SIGNAL,当前线程直接返回true,去执行自我阻塞。
4.7 线程挂起:parkAndCheckInterrupt()
parkAndCheckInterrupt()的主要任务是暂停当前线程,具体如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 调用park()使线程进入waiting状态
return Thread.interrupted(); // 如果被唤醒,查看自己是否已经被中断
}
AbstractQueuedSynchronizer会把所有的等待线程构成一个阻塞等待队列,当一个线程执行完lock.unlock()时,会激活其后继节点,通过调用LockSupport.unpark(postThread)完成后继线程的唤醒。
5. AQS的两个关键点:节点的入队和出队
由于AQS的实现非常精妙,因此理解AQS的原理还是比较困难的。理解AQS原理的一个比较重要的关键点在于掌握节点的入队和出队。
5.1 节点的自旋入队
节点在第一次入队失败后,就会开始自旋入队,分为以下两种情况:
- 如果AQS的队列非空,新节点入队的插入位置在队列的尾部,并且通过CAS方式插入,插入之后AQS的tail将指向新的尾节点。
- 如果AQS的队列为空,新节点入队时,AQS通过CAS方法将新节点设置为头节点head,并且将tail指针指向新节点。
节点的入队代码在enq()方法中,因为enq()非常重要,所以将其代码重复如下:
private Node enq(final Node node) {
for (;;) { //自旋入队
Node t = tail;
if (t == null) {
//队列为空,初始化尾节点和头节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
5.2 节点的出队
节点出队的算法在acquireQueued()方法中,这是一个非常重要的模板方法。acquireQueued()方法不断在前驱节点上自旋(for死循环),如果前驱节点是头节点并且当前线程使用钩子方法tryAcquire(arg)获得了锁,就移除头节点,将当前节点设置为头节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 在前驱节点上自旋
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// (1)前驱节点是头节点
// (2)通过子类的tryAcquire()钩子实现抢占成功
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头节点,之前的头节点出队
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 省略park(无限期阻塞)线程的代码
}
} finally {
// 省略其他
}
}
AQS释放锁时是如何唤醒后继线程的呢?AQS释放锁的核心代码如下:
public final boolean release(long arg) {
if (tryRelease(arg)) { //释放锁的钩子方法的实现
Node h = head; //队列头节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒后继线程
return true;
}
return false;
}
//unparkSuccessor的核心代码如下:
private void unparkSuccessor(Node node) {
// 省略不相关代码
Node s = node.next; //后继节点
// 省略不相关代码
if (s != null)
LockSupport.unpark(s.thread); //唤醒后继节点的线程
}
6. 参考文档
- 感谢你赐予我前进的力量

