JDK 中 AQS 源码分析

kaisesai 2020-11-08 23:53:14
java CAS


0. 目录

0. 目录
1. AQS 的介绍
2. AQS 的基本知识点
3. AQS 如何使用?
4. Semaphore
4.1 Semaphore 例子
4.2 Semaphore 类结构
4.3 Semphore 获取同步资源
4.4 Semphore 释放同步资源
4.5 Semphore 小结
5. ReentrantLock
5.1 ReentrantLock 例子
5.2 ReentrantLock 获取锁
5.3 ReentrantLock 释放锁
5.4 ReentrantLock 小结
6. ConditionObject 条件对象
6.1 ConditionObject 例子
6.2 条件对象等待
6.3 条件对象唤醒
7. 小结
8. 参考资料

今天我们来学习下 AQS。

1. AQS 的介绍

什么是 AQS 呢?AQS 是 java.util.concurrent.locks.AbstractQueuedSynchronizer 类的简写。那 AbstractQueuedSynchronizer 这个类又是什么玩意呢?

它是一个抽象的队列同步器,主要用在 java 多线程编程模型中,用来构建锁或者其他同步组件,来保证多线程之间进行数据安全性的操作的一个并发编程的基础框架。

在 jdk 中的 java.util.concurrent(简称 juc)包中,很多并发类都用到了 AQS 类,比如我们非常熟悉的 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 这些类就用到了 AQS。

2. AQS 的基本知识点

在讲解 AQS 的工作方式之前,我们有必要先了解几个概念,它们分别是:

  • 同步状态:它是一个被 volatile 关键字修饰的 int 类型的变量,在 AQS 中用来表示资源;
  • CAS:compareAndSet 比较并交换,这是一个原子操作,在 AQS 中主要是对同步状态进行操作。在 JDK1.8 中通过 Unsafe 类来进行操作,会调用系统底层的函数库来实现;
  • 线程的阻塞和唤醒:获取资源的线程需要排队,在排队过程中需要阻塞和被唤醒,这里在 AQS 中通过 LockSupport 类的 park() 和 unpark() 方法对线程进行阻塞和唤醒,其实它内部是调用了 Unsafe 的 park、unpark 方法;
  • 同步队列:在 AQS 中获取资源的线程会进入到队列中等待,这里的队列就是同步队列,被称为 CLH 同步队列。CLH 是三个英文文字的首字母,它是由三个人发明的一种双向链表,队列中的有等待状态、线程、前驱节点、后继节点这些属性;
  • 条件队列:在 AQS 中还有一种队列叫作条件队列,它和上面的同步队列共同使用一个节点类,但是它的使用场景是用在一些需要满足条件的场景。它是一个单向链表,具有等待状态、线程、下一个等待者节点属性;
  • 独占模式与共享模式:获取资源的时候,可以声明其获取的模式是独占式的还是共享式的。所谓独占式就是说在同一时刻只允许有一个线程获取资源,而共享式就是同一时刻允许多个线程获取资源。在同步阻塞队列中我们可以看到它的用法。
  • 公平与非公平:公平的意思就是当线程需要获取资源的时候,不论同队队列中是否等待的线程,也会先尝试抢占下资源,如果没有获取到资源就再去排队。非公平就是当前线程需要获取资源时,如果同步队列中已经有排队的线程,那该线程就会乖乖的去排队。

同步队列的结构如图示:

同步队列

条件队列的结构如下图:

条件队列

知道了上面的这些概念,我们来看 AQS 的工作方式。

AQS 内部使用了一个 int 类型的 state 成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作。它实现了一套获取资源和线程排队的逻辑,有获取资源、释放资源等操作,把需要获取资源的方法进行抽象,通过子类调用内部的同步状态的方式来自定义实现获取资源和释放资源的相关操作。

3. AQS 如何使用?

AQS 是基于模板方法模式来设计的,使用者需要继承 AQS 来显示指定的方法,随后将 AQS 组合在自定义的同步组件中的实现中,并且调用同步器提供的模板方法,这些模板方法将会调用使用者重写的方法。

AQS 的使用方式主要是通过子类继承的方式,子类实现它的抽象方法来提供管理同步状态。

AQS 提供的可重写的方法,根据工作的模式可以分为两类:

  • 独占模式

    • tryAcquire(int arg):独占式获取同步状态,实现该方法需要查询当前状态并且判断同步状态是否符合预期,然后再进行 CAS 设置同步状态。
    • tryRelease(int arg):独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态。
    • isHeldExclusively():当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程锁独占。
  • 共享模式

    • tryAcquireShared(int arg):共享式获取同步状态,返回大于等于 0 的值,表示获取成功,反之,获取失败。
    • tryReleaseShared(int arg):共享式释放同步状态。

子类重写这些方法时,需要使用 AQS 提供的方法来访问和修改同步状态,这些方法有:

  • getState():获取当前同步状态;
  • setState(int newState):设置当前同步状态;
  • compareAndSetState(int expect, int update):使用 CAS 方法设置当前状态,该方法能保证状态设置的原子性。

子类实现 AQS 的方式的时候,可以提供公平与非平的两种获取资源的方式。

AQS 提供的模板方法同样也是根据工作模式来分为两类:

  • 独占模式
    • void acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg) 方法。
    • void acquireInterruptibly(int arg):与 acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出中断异常并返回。
    • boolean tryAcquireNanos(int arg, int nanos):在 acquireInterruptibly(int arg) 基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了则返回 true。
    • boolean release(int arg):独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
  • 共享模式
    • void acquireShared(int arg):共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待。与独占式获取的主要区别是在同一时刻可以有多个线程获取同步状态。
    • void acquireSharedInterruptibly(int arg):与 acquireShared(int arg) 相同,该方法响应中断。
    • boolean tryAcquireNanos(int arg, int nanos):在 acquireSharedInterruptibly(int arg) 基础上增加了超时限制。
    • boolean releaseShared(int arg):共享式的释放资源。
    • Collection<Thread> getQueuedThreads():获取在同步队列上等待的线程集合。

4. Semaphore

我们通过 Semaphore 类来看下 AQS 是怎么玩的。

Semaphore 信号量是一个了同步工具类,它的用法是声明一个具有一定资源的令牌,然后其他线程可以获取和释放这些令牌,当没有令牌的时候,线程需要阻塞等待,直到令牌被其他线程释放。

Semaphore 类可以用作线程的资源隔离与限流,比如 springcloud 中的 hystrix 组件中的资源隔离限流技术中一种方式就是使用到了 Semaphore 信号量实现的。

4.1 Semaphore 例子

我们先看下 Semaphore 类的用法:

public static void main(String[] args) throws InterruptedException {
// 创建一个信号量,令牌数为 1
Semaphore semaphore = new Semaphore(1);
// 启动一个线程
Thread t1 = new Thread(() -> {
try {
// 获取一个令牌
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获取到一个资源");
// t1 线程睡眠 2 秒
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 释放了一个资源");
// 释放令牌
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程 1");
// 启动线程
t1.start();
// 主线程睡眠 1 秒
Thread.sleep(1000);
// 主线程获取令牌
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 线程获取到了资源");
// 主线程释放令牌
semaphore.release();
System.out.println(Thread.currentThread().getName() + " 线程释放到了资源");
}

可以看到上面的例子是创建一个令牌数为 1 的信号量,然后启动一个线程 t1,t1 先执行获取令牌,然后睡眠 2 秒,最后释放令牌;主线程在 t1 线程启动之后先睡眠 1 秒(为了让 t1 线程获取到令牌),然后再尝试获取令牌。

程序运行的结果是:

线程 1 获取到一个资源
线程 1 释放了一个资源
main 线程获取到了资源
main 线程释放到了资源

可以看到,Semaphore 信号量的获取令牌操作,在令牌用完的情况,有效的阻塞了需要获取令牌的线程,直到其他线程释放了令牌。

4.2 Semaphore 类结构

我们看下它的类图结构:

Semaphore

Semaphore 在内部实现了一个继承 AQS 的同步器 Sync,并且也提供了它的子类 NonfairSync 非公平同步器、FairSync 公平同步器。

public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
private final Sync sync;
/**
* 重写了 AQS 类,实现了它的 tryAcquireShared、tryReleaseShared 类。
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
/**
* @return 获取令牌
*/
final int getPermits() {
return getState();
}
/**
* 非公平的获取共享资源
*
* @param acquires
* @return
*/
final int nonfairTryAcquireShared(int acquires) {
// 无限循环,是为了保障高并发情况下的操作一定可以成功
for (;;) {
// 获取 volatile 资源
int available = getState();
// 执行资源减一
int remaining = available - acquires;
// 如果资源小于 0 则退出,如果资源大于 0 则进行 CAS 设置资源,设置失败的话则继续循环,重复上述操作
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* 释放共享资源
*
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
// 无限循环,是为了保障高并发情况下的操作一定可以成功
for (;;) {
// 获取资源
int current = getState();
// 加资源
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 设置资源
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* 非公平
*
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
/**
* 以非公平的方式,尝试获取共享资源
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// 以非公平的方式来获取资源
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平
*
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
/**
* 以公平的方式获取共享资源
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// 无限循环
for (;;) {
// 判断等待队列中有前继者
if (hasQueuedPredecessors()) {
// 说明前面有排队的,则直接返回 -1
return -1;
}
// 操作资源
int available = getState();
// 资源减操作
int remaining = available - acquires;
// 结果资源小于 0,返回
// 结果资源大于等于 0,进行 CAS 设置,操作成功则返回结果资源;CAS 失败则继续重新执行上述操作
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public Semaphore(int permits) {
// 默认创建非公平的同步器
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
// 创建公平或者非公平的同步器
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
// 获取共享模式资源
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
// 以不可中断的方式去获取资源
sync.acquireShared(1);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void release() {
// 释放资源
sync.releaseShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public int availablePermits() {
return sync.getPermits();
}
public int drainPermits() {
return sync.drainPermits();
}
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
public boolean isFair() {
return sync instanceof FairSync;
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}

Semaphore 类默认的单个参数的构造器创建的是非公平同步器,非公平同步器会在多线程并发获取锁时可以减小线程上下文切换。

4.3 Semphore 获取同步资源

我们先分析下 Semaphore 类获取令牌的方法调用链:

  1. 调用 Semaphore#acquire() 方法;
  2. 调用 AbstractQueuedSynchronizer#acquireSharedInterruptibly(1) 方法;

查看 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly 方法:

 /**
* 在共享模式上获取资源,如果被中断则放弃。
*
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 当前线程已经被中断,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享资源,如果返回负数,则执行下面的以阻塞方式获取资源
if (tryAcquireShared(arg) < 0) {
// 执行以共享模式可中断方式获取资源
doAcquireSharedInterruptibly(arg);
}
}

这个 acquireSharedInterruptibly 方法上面讲到的就是 AQS 提供的模板方法,它是以共享模式来获取同步状态的方法,会先调用 tryAcquireShared() 方法尝试获取资源,在 Semaphore 类中的 Sync 子类 NonfairSync、FairSync 同步器实现这个方法。

我们主要分析下非公平的同步器的实现。首先 NonfairSync 是非公平同步器,它重写的了 AQS 的 tryAcquireShared 方法,里边调用了父类 Sync 的 nonfairTryAcquireShared() 方法,我们再来看下它的实现,java.util.concurrent.Semaphore.FairSync#tryAcquireShared:

 /**
* 以公平的方式获取共享资源
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// 无限循环
for (;;) {
// 判断等待队列中有前继者
if (hasQueuedPredecessors()) {
// 说明前面有排队的,则直接返回 -1
return -1;
}
// 操作资源
int available = getState();
// 资源减操作
int remaining = available - acquires;
// 结果资源小于 0,返回
// 结果资源大于等于 0,进行 CAS 设置,操作成功则返回结果资源;CAS 失败则继续重新执行上述操作
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}

这里边使用了无限循环操作,里边先获取了同步状态,然后对同步状态进行了减法操作,它的方法参数 acquires 值在我们的例子中是 1,随后判断新的资源状态值,如果小于 0 则说明没有资源可以获取,那就直接返回;如果大于等于 0,那么就执行 CAS 操作来更新同步状态值,这里由于是多线程高并发的访问,所以可能会操作失败,那么就重新执行上述操作,直到操作成功或者同步状态值小于 0。

接着该方法执行完毕后,接着执行 AQS 的 doAcquireSharedInterruptibly 方法,看下它的实现。java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly:

/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 添加一个等待者节点,把节点放入队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 无限循环
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
if (p == head) {
// 如果前驱节点等于头节点(头节点是空节点)(说明是队列是刚刚初始化,并且只有它一个节点),那么就再来一次尝试获取共享资源的操作
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果获取到了资源,那么就把当前节点设置为头节点和传播模式
setHeadAndPropagate(node, r);
// 设置节点下个节点为 null,帮助垃圾回收这个节点
p.next = null; // help GC
// 设置失败表示为 false
failed = false;
return;
}
}
// 在获取资源失败是否阻塞(设置节点状态),并且执行阻塞和检查中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 抛出异常
throw new InterruptedException();
}
}
} finally {
if (failed) {
// 出现了异常,则执行取消获取资源
cancelAcquire(node);
}
}
}
/**
* 在获取资源失败后,检查并且更新节点状态
*
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 信号状态,该节点已经设置了状态,要求释放以发出信号,以便可以安全地阻塞
{
return true;
}
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 前驱节点状态是取消状态,则进行删除这些节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 让前驱结点的后继节点指向 node 节点
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 否则节点状态就是 0 或者 PROPAGATE(-3) 状态,设置前继节点状态为 SIGNAL(-1) 状态,即等待状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 对当前线程和给定的模式(独占、共享模式),来创建一个入队节点 node。
*
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 创建一个 CLH 等待队列
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取尾节点变量
Node pred = tail;
if (pred != null) {
// 尾部节点不为 null,则把节点通过 CAS 操作放入尾部
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾部节点为空,或者 CAS 操作失败时,执行下面操作
enq(node);
return node;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
// 检查线程是否中断
return Thread.interrupted();
}

这里的操作是:

  1. 先通过调用 addWaiter(Node.SHARED) 方法,创建一个 CLH 的同步队列的节点,并且把节点放到同步队列的尾部并返回节点;
  2. 然后获取节点的前驱节点,判断前驱节点是否为头节点。如果前驱节点是头节点,说明前面没有排队的节点,当前的这个节点可以获取资源了,然后再调用 tryAcquireShared() 方法进行尝试获取同步状态资源;如果资源获取成功即方法返回值大于等于 0,那么就将当前节点设置为头节点,然后方法结束;
  3. 如果前驱节点不是头节点,说明它的前面还有等待的节点获取资源。则调用 shouldParkAfterFailedAcquire() 方法,进行节点等待状态的更新(更新为 Node.SIGNAL)以及清除已取消状态的节点,并且返回是否应该被阻塞(即等待状态是否为 Node.SIGNAL);
  4. 如果需要被阻塞,接着执行 parkAndCheckInterrupt() 方法,方法内部调用 LockSupport.park(this) 对当前线程进行阻塞;
  5. 阻塞成功的话,那么这个线程就会一直等待其他线程释放资源或者被其他线程中断来唤醒它。

OK,上面的一套操作就是一个线程获取 Semphore 的令牌(即 AQS 的同步状态)的过程。它会先实现尝试资源然后,如果资源获取成功则返回,如果获取失败则把当前线程构建成一个 CLH 同步队列节点,放入队列的尾部,然后执行线程阻塞,等待其他线程唤醒它或者中断它。

4.4 Semphore 释放同步资源

Semphore 的释放资源是调用 release() 完成的。我们看它下的调用过程。

 public void release() {
// 释放资源
sync.releaseShared(1);
}

通过调用 AQS 同步器的 releaseShared(int arg) 方法:

 /**
* 共享模式释放资源。
*
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
// 先尝试释放资源
if (tryReleaseShared(arg)) {
// 执行释放
doReleaseShared();
return true;
}
return false;
}

看到了它是先调用了 tryReleaseShared(arg) 方法和 doReleaseShared() 方法。

Semphore 类的同步器中重写了 tryReleaseShared(arg) 方法,java.util.concurrent.Semaphore.Sync#tryReleaseShared:

/**
* 释放共享资源
*
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
// 无限循环,是为了保障高并发情况下的操作一定可以成功
for (;;) {
// 获取资源
int current = getState();
// 加资源
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 设置资源
if (compareAndSetState(current, next))
return true;
}
}

这里的操作是获取同步状态,然后让状态值增加,通过 CAS 操作设置同步状态,直到状态设置成功为止。

看下 java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared:

 /**
* 释放共享模式行为,设置后继者的状态为信号状态,并且确保传播。
*
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 无限循环
for (;;) {
// 获取头节点
Node h = head;
// 头节点不为空,并且头节点不等于尾节点
if (h != null && h != tail) {
// 头节点的状态
int ws = h.waitStatus;
// 如果是 SIGNAL 状态
if (ws == Node.SIGNAL) {
// CAS 操作设置节点状态为 0,表示不再需要阻塞了
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue; // loop to recheck cases
}
// 唤醒后继节点,让后继节点可以开始争抢资源操作
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// 节点状态是 0,并且通过 CAS 方式设置节点状态为 PROPAGATE(-3) 传播方式
continue; // loop on failed CAS
}
}
// 头节点没有发生变化,则退出循环
if (h == head) // loop if head changed
{
break;
}
}
}
/**
* 唤醒后继节点
*
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 节点状态
int ws = node.waitStatus;
if (ws < 0) {
// 如果节点状态是小于 0,则设置状态为 0
compareAndSetWaitStatus(node, ws, 0);
}
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取下一个节点,后继节点
Node s = node.next;
// 下一个节点为空,或者下一个节点的状态大于 0,即 CANCELLED(1) 状态
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列尾部开始往前遍历,直到遍历到当前这个节点为止,寻找节点状态不是取消状态的节点
for (Node t = tail; t != null && t != node; t = t.prev) {
// 寻找节点状态不是 CANCELLED(1) 状态的节点
if (t.waitStatus <= 0) {
// 这里是一直找,就算找到了也还会继续往前找到,直到前驱节点为空或者前驱节点等于当前节点
s = t;
}
}
}
// 找到了符合条件的后继节点
if (s != null) {
// 执行唤醒这个后继节点的线程
LockSupport.unpark(s.thread);
}
}

这个操作主要是设置同步队列头节点的节点状态为 0,然后调用 unparkSuccessor() 方法唤醒等待的节点线程。唤醒线程的操作是调用 LockSupport.unpark(Thread) 方法完成的。

当前节点释放了资源后,被换唤醒的线程将接着在 parkAndCheckInterrupt() 方法中返回,接着又开始新的一轮的获取同步资源的操作。

4.5 Semphore 小结

现在我们通过 Semphore 类的获取和释放资源来了解到了 AQS 在共享模式下获取和释放同步状态的的过程,包括线程获取同步资源失败后,进入AQS 同步队列排队阻塞、被唤醒的过程。

那么我们接着来看下 AQS 独占模式下的获取和释放同步状态的过程。

5. ReentrantLock

ReentrantLock 是一个独占式的锁,它的类图结构如下:

ReentrantLock

可以看到它的内部也有一个静态的 AQS 实现类 Sync,同时也有公平和非公平的同步器。

5.1 ReentrantLock 例子

它的使用例子如下:

public static void main(String[] args) {
// 默认创建非公平锁
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
doLock(lock);
}, "t0").start();
doLock(lock);
}
private static void doLock(ReentrantLock2 lock) {
try {
lock.lock();
System.out
.println(Thread.currentThread().getName() + "线程拿到了锁");
// 睡眠 1 秒
Thread.sleep(1000);
System.out
.println(Thread.currentThread().getName() + "线程释放了锁");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println();
lock.unlock();
}
}

它的输出结果如下:

main线程拿到了锁
main线程释放了锁
t0线程拿到了锁
t0线程释放了锁

这个例子中创建了一个 ReentrantLock 实例,默认它使用的非公平锁,启动了一个线程进行加锁和解锁操作,然后主线程也进行加锁和解锁,从输出的结果中可以看到,当一个线程获取了锁时,其他线程获取锁时会阻塞,直到持有锁的线程释放锁。

它保证了锁的独占性。

5.2 ReentrantLock 获取锁

ReentrantLock 的默认构造方法如下:

/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

它默认会创建一个非公平的同步器。

ReentrantLock 的加锁的操作,它的代码为 java.util.concurrent.locks.ReentrantLock#lock:

public void lock() {
sync.lock();
}

它会调用同步器的 lock() 方法,我们以非公平同步器为准来看它的实现,java.util.concurrent.locks.ReentrantLock.NonfairSync#lock:

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 先尝试获取资源,通过 CAS 把资源由 0 设置为 1
if (compareAndSetState(0, 1)) {
// 如果获取成功,则把当前线程设置到独占线程上
setExclusiveOwnerThread(Thread.currentThread());
} else {
// 获取资源
acquire(1);
}
}

这里的逻辑是:

  1. 先尝试通过 CAS 操作把同步状态由 0 设置为 1,如果设置成功,则执行 setExclusiveOwnerThread() 方法,设置当前线程为独占线程;
  2. 如果没有设置成功,说明有别的线程已经更改了同步状态,那就执行 acquire(int arg) 方法。

``setExclusiveOwnerThread(thread)` 方法是 java.util.concurrent.locks.AbstractOwnableSynchronizer#setExclusiveOwnerThread 类的方法:

/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
// 设置独占线程
exclusiveOwnerThread = thread;
}

acquire(int arg) 方法是 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire 的方法:

/**
* 在独占模式下获取资源,忽略中断
*
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
/*
获取资源
1. 尝试获取资源 state
2. 添加等待节点 Node,创建的是等待队列
3. 从队列中进行获取资源,无限循环
4. 如果线程被中断则设置中断状态
*/
if (!tryAcquire(arg)) {
// 以独占方式来创建当前线程的节点,并且添加到队列尾部
Node node = addWaiter(Node.EXCLUSIVE);
// 获取资源
if (acquireQueued(node, arg)) {
// 如果获取资源失败,则中断线程
selfInterrupt();
}
}
}

逻辑和 acquireShared(int arg) 方法类似。

  1. 先执行 tryAcquire(arg) 方法尝试获取资源;
  2. 如果资源获取成功则返回,如果获取失败,那么就执行 addWaiter(Node.EXCLUSIVE) 方法,创建一个同步队列的节点,并放入到队列尾部,不过它和共享模式的同步队列节点不同,它是独占式的节点;
  3. 接着执行 acquireQueued(node, arg) 方法,获取资源。

其中 tryAcquire(arg) 方法,它在 ReentrantLock 的非同步器 NonfairSync 中的被重写了,java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire:

 protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* 以非公平的方式获取资源
*
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 资源
int c = getState();
// 资源为 0,说明没有人获得资源
if (c == 0) {
// CAS 设置资源状态
if (compareAndSetState(0, acquires)) {
// 资源设置成功则,设置独占线程
setExclusiveOwnerThread(current);
return true;
}
}
// 资源被其获取
else if (current == getExclusiveOwnerThread()) {
// 如果是当前线程获取的资源,则让资源加一
int nextc = c + acquires;
if (nextc < 0) // overflow
{
throw new Error("Maximum lock count exceeded");
}
// 设置资源状态
setState(nextc);
return true;
}
// 其他情况就直接返回 false,表示资源获取失败
return false;
}

它的内部调用了 nonfairTryAcquire(int acquires) 方法,逻辑是:

  1. 获取同步状态,如果状态为 0,则进行 CAS 操作设置状态的值,如果设置成功,再执行 setExclusiveOwnerThread(current) 方法设置独占线程,方法结束返回 true;
  2. 如果同步状态不为 0,或者 CAS 操作失败,那就判断当前线程是否独占线程。如果线程是独占线程,说明该线程已经拿到了状态,则把状态值增加(重入的意思就是在这里体现的,),然后设置新的状态值(这里不需要进行 CAS 操作,因为它拿到了状态,不用担心被修改可直接修改,state 是 volatile 修饰的,一个它的修改可以立即被其他线程感知到),方法结束返回 true;
  3. 如果上述条件都不符合,则直接返回 false,方法结束。

再来看下 acquireQueued(Node node, int arg) 方法,看下它的实现 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued:

/**
* 以独占的模式来获取资源,入队操作
*
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 无限循环获取
for (;;) {
// 获取前一个节点
final Node p = node.predecessor();
// 尝试获取资源
if (p == head && tryAcquire(arg)) {
// 设置头部节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取失败,设置节点状态
// 阻塞线程以及检查中断,阻塞之后,需要被前驱节点释放或者被中断,才能继续执行上述操作
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果失败则进行取消获取资源
if (failed)
cancelAcquire(node);
}
}

同样,这里方法和上面讲 Semphore 时看到的 AQS 的 doAcquireSharedInterruptibly(int arg) 方法逻辑很像,逻辑为:

  1. 获取节点的前驱节点,如果前驱节点等于头节点,则说明前面没有排队的节点,那就执行 tryAcquire(arg) 方法获取资源;
  2. tryAcquire(arg) 方法获取资源成功后,把节点设置为同步队列的头节点,然后方法结束,返回 true;
  3. 如果节点的前驱节点不是头节点,或者尝试获取资源失败,则执行 shouldParkAfterFailedAcquire(p, node) 方法更新节点的等待状态以及判断是否要被阻塞;
  4. 如果需要被阻塞,则执行 parkAndCheckInterrupt() 阻塞该线程。该线程就需要等待其他线程唤醒或者中断该线程。

这样一个 ReentrantLock 获取锁的流程就就结束了,回顾下,一个线程如果要获取锁资源,那就先尝试获取,如果获取成功则设置独占线程;如果获取失败,那就构建一个该线程的独占模式的同步队列节点,把节点加入到同步队列的尾部,然后尝试获取资源或者阻塞,直到被其他线程唤醒或者中断,接着重复上述的获取资源或者阻塞的操作。

5.3 ReentrantLock 释放锁

ReentrantLock 释放锁的操作是 unlock() 方法,java.util.concurrent.locks.ReentrantLock#unlock 实现如下:

public void unlock() {
sync.release(1);
}

它调用了同步器的 release(int arg) 方法,java.util.concurrent.locks.AbstractQueuedSynchronizer#release 实现如下:

/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 尝试释放资源
if (tryRelease(arg)) {
// 释放成功,获取 CLH 等待队列中的头节点
Node h = head;
// 判断它不为空,并且它的等待状态不是 0
if (h != null && h.waitStatus != 0) {
// 唤醒后继节点
unparkSuccessor(h);
}
return true;
}
return false;
}

这个释放资源的逻辑同样也和 releaseShared(int arg) 类似:

  1. 先执行 tryRelease(arg) 尝试释放资源;
  2. 释放失败的话直接返回 false,方法结束。
  3. 释放成功的话,获取同步队列头节点,判断是否不为空,并且它的等待状态不为 0,那么就执行 unparkSuccessor(h) 方法唤醒后续等待着的节点,然后返回 true。

tryRelease(arg) 方法在 ReentrantLock 的 Sync 同步器的实现为 java.util.concurrent.locks.ReentrantLock.Sync#tryRelease:

protected final boolean tryRelease(int releases) {
// 释放资源状态
// 当前资源减去要释放的资源
int c = getState() - releases;
// 检查当前线程是否独占线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
// 是否
boolean free = false;
// 释放的资源状态为 0
if (c == 0) {
// 表示资源空闲了
free = true;
// 将独占线程设置为 null
setExclusiveOwnerThread(null);
}
// 设置资源状态
setState(c);
return free;
}

逻辑:

  1. 获取同步资源,然后减操作得到新的值;
  2. 判断当前线程是否为独占线程,如果不是则抛出异常,说明其他线程非法的操作;
  3. 判断新的状态值是否为0,如果为 0 则说明资源空闲了,清空独占线程。这里主要是针对 ReentrantLock 的多次 lock 一定要对应相同次数 unlock 方法,否则资源是不会被释放的。
  4. 最后设置新的同步资源状态值,方法结束。

5.4 ReentrantLock 小结

ReentrantLock 的获取锁和释放的锁的流程我们已经了解了,它是一个种独占式的获取资源,当一个线程获取了锁时,其他线程是不能获取的,同时它支持多次加锁,当然也需要对应相同次数的解锁操作。一个线程要获取锁资源时(默认非公平锁的情况),会先尝试获取资源,如果获取失败,则创建当前线程的同步队列独占模式节点,并加入到同步队列尾部,然后尝试获取资源或者阻塞操作,直到其他线程释放了锁资源或者中断了它,该线程才会接着执行自己的操作获取锁资源操作。

6. ConditionObject 条件对象

到目前为止,我们已经了解到了 AQS 的特点:公平/非公平、独占/共享、同队列队,那么还有一个条件队列这个特点。

这个条件队列比较特殊,它是也是一种队列,和同步队列用的相同的 Node 节点类,不过它是一个单链表。它使用了 nextWaiter 字段来表示下一个等待者节点。

它的节点是由在 AQS 里的内部类 ConditionObject 来创建的,条件队列的节点共享创建它的 AQS 实例的属性(同步资源、线程)。

条件队列的工作机制是:条件队列的节点转化为同步队列的节点,同步队列的节点获取资源操作。

ConditionObject 的类图为:

ConditionObject

它主要通过 await()signal() 相关的方法进行线程的阻塞和唤醒操作。

6.1 ConditionObject 例子


public static void main(String[] args) throws InterruptedException {
// 创建一把锁
ReentrantLock lock = new ReentrantLock();
// 创建一个锁上的条件对象
Condition c1 = lock.newCondition();
// t1 线程调用条件等待
new Thread(() -> {
// 加锁
lock.lock();
System.out.println(Thread.currentThread().getName() + ":加锁!");
try {
System.out.println(Thread.currentThread().getName() + ":太累了,我先睡会,一会叫醒我哈!");
// 让条件等待
c1.await();
System.out.println(Thread.currentThread().getName() + ":我醒来了!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":解锁!");
// 解锁
lock.unlock();
}
}, "t1").start();
Thread.sleep(1000);
// t2 线程调用条件唤醒
new Thread(() -> {
// 加锁
lock.lock();
System.out.println(Thread.currentThread().getName() + ":加锁!");
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + ":开饭了,赶紧起来吧!");
// 条件唤醒
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":解锁!");
// 解锁
lock.unlock();
}
}, "t2").start();
}

上面的代码创建一个重入锁,由锁创建一个条件对象,一个线程 t1 进行加锁,调用条件对象的等待操作,最后释放锁;另一个线程调用加锁,然后调用条件对象的唤醒操作,最后释放锁。

输出的结果为:

t1:加锁!
t1:太累了,我先睡会,一会叫醒我哈!
t2:加锁!
t2:开饭了,赶紧起来吧!
t2:解锁!
t1:我醒来了!
t1:解锁!

这里的意思是,t1 线程先获取锁,然后调用了条件对象的 await() 方法,把该线程进入阻塞状态,随后线程 t2 获取到锁,然后调用条件对象的 signal() 唤醒方法,t2 释放锁,t1 线程被唤醒接着执行它剩下的操作。

6.2 条件对象等待

先看下条件对象的创建过程,lock.newCondition() 方法,java.util.concurrent.locks.ReentrantLock#newCondition:

public Condition newCondition() {
return sync.newCondition();
}

它又调用了同步器的创建条件方法,java.util.concurrent.locks.ReentrantLock.Sync#newCondition:

final ConditionObject newCondition() {
return new ConditionObject();
}

接着看下它的 await() 方法,java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await():

/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
// 判断线程是否被中断
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个条件等待队列节点
Node node = addConditionWaiter();
// 完全释放节点资源,返回已经保存的资源状态
int savedState = fullyRelease(node);
// 中断模式
int interruptMode = 0;
// 无限的循环,判断条件等待节点是否存在与 CLH 同步队列中
while (!isOnSyncQueue(node)) {
// 如果不存在 CLH 同步队列中,则阻塞当前线程
LockSupport.park(this);
// 当前线程被唤醒了,则执行检查中断和无线等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 到了这一步,说明该线程已经被唤醒了
// 进行获取独占模式资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
// 如果获取资源成功,并且中断模式不是 THROW_IE
// 则把中断模式设置为 REINTERRUPT
interruptMode = REINTERRUPT;
}
// 再次检查节点,清理被取消状态的条件等待节点
if (node.nextWaiter != null) // clean up if cancelled
{
// 删除已经取消的条件等待节点
unlinkCancelledWaiters();
}
// 如果中断模式不为 0,则进行报告中断
if (interruptMode != 0) {
// 报告中断线程
reportInterruptAfterWait(interruptMode);
}
}
/**
* 添加一个条件等待者到等待对了。
*
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
// 获取最后一个条件等待者节点
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果最后一个条件等待者已经被取消,则进行清除
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除取消状态的等待者
unlinkCancelledWaiters();
// 重新获取最后一个等待者节点
t = lastWaiter;
}
// 添加一个条件队列,它的等待状态是 Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
// 给第一个等待者节点赋值
firstWaiter = node;
} else {
// 把新的节点添加到最后一个等待者节点上
t.nextWaiter = node;
}
// 为最后一个等待者节点赋值
lastWaiter = node;
return node;
}
/**
* 用当前状态值调用释放;返回保存状态。取消节点并在失败时抛出异常。
*
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取同步器资源状态
int savedState = getState();
// 释放资源
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 如果释放资源图中失败,则把节点的等待状态设置为取消状态 Node.CANCELLED
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}

这里的主要逻辑是:

  1. 先检查线程是否被中断,被中断的话抛出异常;
  2. 执行 addConditionWaiter() 方法,创建一个条件队列节点 new Node(Thread.currentThread(), Node.CONDITION) ,把节点添加到条件队列的尾部;
  3. 执行 fullyRelease() 方法,释放资源,即释放同步状态,同时把当前线程对应的同步队列节点删除;
  4. 执行 isOnSyncQueue(node) 判断节点是否不是同步队列节点,如果不是同步队列节点,则进行阻塞,等待被其他线程唤醒或中断。
  5. 如果该线程被唤醒了,则再执行 acquireQueued(node, saveState) 获取资源;
  6. 再次检查节点,清理被取消状态的条件等待节点;
  7. 如果有必要就处理中断信息。

这里的逻辑可以理解为,当前获取了同步状态线程,如果当前线程处在同步队列中的节点那就移除,然后创建一个条件等待队列节点,并放入条件队列尾部,随后把当前线程阻塞,当被唤醒时重试获取同步资源。

6.3 条件对象唤醒

再看下 signal() 唤醒方法,java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal:

/**
* 将等待时间最长的线程(如果存在)从该条件的等待队列移至拥有锁的等待队列。
*
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 获取第一个条件等待者节点
Node first = firstWaiter;
if (first != null) {
// 执行唤醒头部的条件等待节点
doSignal(first);
}
}
private void doSignal(Node first) {
// 无限循环
do {
// 获取第一个条件等待者的下一个等待者,给第一条件等待者赋值
if ( (firstWaiter = first.nextWaiter) == null) {
// 如果为空,说明条件队列为空,将最后一个条件等待者变量也置为空
lastWaiter = null;
}
// 将条件等待者的下一个等待者属性置空
first.nextWaiter = null;
// 这里是条件等待队列的头节点
// 将条件等待节点转化为 CLH 同步队列节点并加入 CLH 同步队列中
// 如果转换节点失败,则又把 firstWaiter 的节点赋值给 first
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 设置节点的等待状态,由 Node.CONDITION 设置为 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
return false;
}
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 把条件等待节点加入到 CLH 同步队列的尾部,返回节点的前驱节点
Node p = enq(node);
// 获取前驱节点的等待状态
int ws = p.waitStatus;
// 节点等待状态如果大于 0,即取消状态,或者通过 CAS 设置前驱节点的等待状态为 SIGNAL 操作失败,那就唤醒这个节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
// 只由前驱节点的等待状态是取消状态,或者设置前驱节点等待状态为 Node.SIGNAL 失败时,才会执行唤醒目前节点的线程
LockSupport.unpark(node.thread);
}
return true;
}

这里主要的逻辑是,获取条件队列,移除头部的节点,然后把该节点转成同步队列的节点,放入到同步队列的尾部,同时唤醒节点的线程,返回方法。

唤醒的线程就又开始执行获取同步资源。

条件对象的工作流程如下:

条件唤醒节点转换流程

在 JDK 中使用条件对象功能的有很多,比如 ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque 阻塞队列等等。

7. 小结

当然 JDK 的 JUC 包中还有很多其他的同步工具类都是使用了 AQS 来完成同步功能,它们的核心原理都是一样,都是以操作同步状态资源,通过同步队列以及条件等待队列排队的方式来让获取资源的线程排队。

只有了解了 AQS 的工作原理才能更加深入了解并发相关的组件。

8. 参考资料

  • java 并发编程的艺术
  • JDK 1.8 源码

以上代码都已经上传到我的 GitHub 上了。

版权声明
本文为[kaisesai]所创,转载请带上原文链接,感谢
https://my.oschina.net/kaisesai/blog/4708368

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云