Analysis of AQS source code in JDK

goats 2020-11-08 23:53:14
analysis aqs source code jdk


0. Catalog

0.  Catalog 
1. AQS  Introduction to 
2. AQS  Basic knowledge of 
3. AQS  How to use ?
4. Semaphore
4.1 Semaphore  Example 
4.2 Semaphore  Class structure 
4.3 Semphore  Get synchronization resources 
4.4 Semphore  Release synchronization resources 
4.5 Semphore  Summary 
5. ReentrantLock
5.1 ReentrantLock  Example 
5.2 ReentrantLock  Get the lock 
5.3 ReentrantLock  Release the lock 
5.4 ReentrantLock  Summary 
6. ConditionObject  Conditions of the object 
6.1 ConditionObject  Example 
6.2  Conditional object wait 
6.3  Conditional object wake up 
7.  Summary 
8.  Reference material 

Let's learn today AQS.

1. AQS Introduction to

What is? AQS Well ?AQS yes java.util.concurrent.locks.AbstractQueuedSynchronizer Class . that AbstractQueuedSynchronizer What is this class ?

It is an abstract queue synchronizer , Mainly used in java In multithreading programming model , Used to build locks or other synchronization components , To ensure the operation of data security between multithreads, a basic framework of concurrent programming .

stay jdk Medium java.util.concurrent( abbreviation juc) In bag , Many concurrent classes are used AQS class , For example, we are very familiar with ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch These classes are used AQS.

2. AQS Basic knowledge of

In the interpretation of the AQS Before the way of working , We need to understand a few concepts first , They are :

  • sync : It's a quilt volatile Keyword modifier int Variable of type , stay AQS Is used to represent resources in ;
  • CAS:compareAndSet Compare and exchange , This is an atomic operation , stay AQS It is mainly used to operate the synchronization state . stay JDK1.8 Pass through Unsafe Class to operate , Will call the system's underlying function library to achieve ;
  • Thread blocking and wake up : The thread that gets the resource needs to be queued , You need to block and wake up in the queue , Here it is AQS Pass through LockSupport Class park() and unpark() Method to block and wake the thread , In fact, it calls Unsafe Of park、unpark Method ;
  • Synchronous queue : stay AQS The thread that gets the resource in the queue will enter the queue and wait , The queue here is the synchronization queue , go by the name of CLH Synchronous queue .CLH It's the initials of three English characters , It's a two-way linked list invented by three people , There is a waiting state in the queue 、 Threads 、 Precursor node 、 These attributes of subsequent nodes ;
  • Condition queue : stay AQS There is also a kind of queue called conditional queue , It shares a node class with the synchronization queue above , But its usage scenarios are used in some scenarios that need to meet the conditions . It's a one-way linked list , Has a wait state 、 Threads 、 Next wait node properties ;
  • Exclusive mode and sharing mode : When getting resources , It can be declared whether the acquired mode is exclusive or shared . The so-called exclusive means that only one thread is allowed to acquire resources at the same time , The shared mode allows multiple threads to acquire resources at the same time . We can see its usage in the synchronous blocking queue .
  • Fair and unfair : Fairness means that when a thread needs to get resources , Whether waiting threads in the same queue or not , They will try to seize resources first , If you don't get the resources, you have to wait in line . Unfairness is when the current thread needs to obtain resources , If there are already queued threads in the synchronization queue , Then the thread will be obediently queued .

The structure of the synchronization queue is shown in the figure :

 Synchronous queue

The structure of the condition queue is shown in the figure below :

 Condition queue

I know the concepts above , Let's see AQS How it works .

AQS There's a... Inside int Type of state Member variables to represent the synchronization state , Through the built-in FIFO Queue to complete the queuing of resource acquisition thread . It implements a set of logic to obtain resources and thread queuing , Have access to resources 、 Release resources and other operations , Abstract the methods that need to obtain resources , Through the way of subclass calling the internal synchronization state, the operation of getting and releasing resources can be realized .

3. AQS How to use ?

AQS It is based on the template method pattern , Users need inheritance AQS To display the specified method , And then AQS Combined in the implementation of a custom synchronization component , And call the template method provided by the synchronizer , These template methods will call the methods rewritten by the consumer .

AQS The main way to use is through subclass inheritance , Subclass implements its abstract methods to provide managed synchronization state .

AQS Overridable methods provided , According to the mode of work, it can be divided into two categories :

  • Exclusive mode

    • tryAcquire(int arg): Get sync state exclusively , To implement this method, we need to query the current state and judge whether the synchronization state meets the expectation , And then we can move on CAS Set synchronization state .
    • tryRelease(int arg): Exclusive release synchronization state , Threads waiting to get synchronization status will have the opportunity to get synchronization status .
    • isHeldExclusively(): Is the current synchronizer occupied by threads in exclusive mode , Generally, this method indicates whether it is exclusive to the current thread lock .
  • Sharing mode

    • tryAcquireShared(int arg): Shared get synchronization status , Return greater than or equal to 0 Value , Show success , conversely , Acquisition failure .
    • tryReleaseShared(int arg): Shared release synchronization state .

When subclasses override these methods , Need to use AQS Provides methods to access and modify the synchronization state , These methods are :

  • getState(): Gets the current synchronization status ;
  • setState(int newState): Set the current synchronization state ;
  • compareAndSetState(int expect, int update): Use CAS Method to set the current state , This method can ensure the atomicity of state setting .

Subclass implementation AQS The way when , It can provide two kinds of access to resources, fair and uneven .

AQS The template methods provided are also divided into two categories according to the working mode :

  • Exclusive mode
    • void acquire(int arg): Get sync state exclusively , If the current thread gets the synchronization status successfully , The method returns , otherwise , Will enter the synchronization queue and wait , This method will call overridden tryAcquire(int arg) Method .
    • void acquireInterruptibly(int arg): And acquire(int arg) identical , But the method responds to interrupts , The current thread does not get the synchronization status and enters the synchronization queue , If the current thread is interrupted , The method throws an interrupt exception and returns .
    • boolean tryAcquireNanos(int arg, int nanos): stay acquireInterruptibly(int arg) Based on this, the timeout limit is increased , If the current thread does not get the synchronization status within the timeout period , So it's going to return false, If it is obtained, it will be returned true.
    • boolean release(int arg): Exclusive release synchronization state , This method will release the synchronization state , Wake up the thread contained in the first node in the synchronization queue .
  • Sharing mode
    • void acquireShared(int arg): Shared get synchronization status , If the current thread does not get the synchronization state , Will enter the synchronization queue and wait . The main difference with exclusive acquisition is that multiple threads can get the synchronization state at the same time .
    • void acquireSharedInterruptibly(int arg): And acquireShared(int arg) identical , This method responds to an interrupt .
    • boolean tryAcquireNanos(int arg, int nanos): stay acquireSharedInterruptibly(int arg) Based on this, the timeout limit is increased .
    • boolean releaseShared(int arg): Shared release of resources .
    • Collection<Thread> getQueuedThreads(): Gets the collection of threads waiting on the synchronization queue .

4. Semaphore

We go through Semaphore Class AQS How did it work .

Semaphore Semaphore is a synchronization tool class , Its usage is to declare a token with certain resources , Then other threads can get and release these tokens , When there is no token , Threads need to block and wait , Until the token is released by another thread .

Semaphore Class can be used as resource isolation and flow limiting for threads , such as springcloud Medium hystrix One way of resource isolation and current limiting technology in components is to use Semaphore Semaphores achieve .

4.1 Semaphore Example

Let's take a look first Semaphore The use of the class :

public static void main(String[] args) throws InterruptedException {
// Create a semaphore , The number of tokens is 1
Semaphore semaphore = new Semaphore(1);
// Start a thread 
Thread t1 = new Thread(() -> {
try {
// Get a token 
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " Get a resource ");
// t1 Thread sleep 2 second 
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " Released a resource ");
// release token 
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, " Threads 1");
// Start thread 
t1.start();
// Main thread sleep 1 second 
Thread.sleep(1000);
// The main thread gets the token 
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " The thread got the resource ");
// The main thread releases the token 
semaphore.release();
System.out.println(Thread.currentThread().getName() + " Thread released to resource ");
}

As you can see, the above example is to create a token with the number of 1 The amount of signal , Then start a thread t1,t1 Get token first , Then sleep 2 second , Finally, the token is released ; The main thread is in t1 Sleep after thread starts 1 second ( In order to make t1 The thread got the token ), Then try to get the token .

The result of running the program is :

 Threads  1 Get a resource 
 Threads  1 Released a resource 
main  The thread got the resource 
main  Thread released to resource 

You can see ,Semaphore Token acquisition operation of semaphore , When the token is used up , Effectively blocking the thread that needs to get the token , Until another thread releases the token .

4.2 Semaphore Class structure

Let's take a look at its class diagram structure :

Semaphore

Semaphore An inheritance is implemented internally AQS The synchronizer Sync, It also provides its subclasses NonfairSync Unfair synchronizer 、FairSync Fair synchronizer .

public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
private final Sync sync;
/**
* Rewrote AQS class , It's realized tryAcquireShared、tryReleaseShared class .
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
/**
* @return Get token
*/
final int getPermits() {
return getState();
}
/**
* Unfair access to shared resources
*
* @param acquires
* @return
*/
final int nonfairTryAcquireShared(int acquires) {
// Infinite loop , It is to ensure that the operation under high concurrency can be successful 
for (;;) {
// obtain volatile resources 
int available = getState();
// Executive resources minus one 
int remaining = available - acquires;
// If the resource is less than 0 The exit , If the resource is greater than 0 Is to CAS Set up resources , If the setting fails, the loop will continue , Repeat the above operation 
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* Free up shared resources
*
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
// Infinite loop , It is to ensure that the operation under high concurrency can be successful 
for (;;) {
// Access to resources 
int current = getState();
// Add resources 
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS Set up resources 
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* Unfair
*
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
/**
* In an unfair way , Try to get shared resources
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// Access to resources in an unfair way 
return nonfairTryAcquireShared(acquires);
}
}
/**
* fair
*
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
/**
* Access to shared resources in a fair way
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// Infinite loop 
for (;;) {
// Determine whether there is a successor in the waiting queue 
if (hasQueuedPredecessors()) {
// There is a queue in front of it , Then return directly -1
return -1;
}
// Operating resources 
int available = getState();
// Resource reduction operation 
int remaining = available - acquires;
// Result resource less than 0, return 
// Result resource is greater than or equal to 0, Conduct CAS Set up , If the operation is successful, the result resource will be returned ;CAS If it fails, continue to perform the above operation again 
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public Semaphore(int permits) {
// By default, an unfair synchronizer is created 
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
// Create a fair or unfair synchronizer 
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
// Get shared mode resources 
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
// Access to resources in an uninterrupted manner 
sync.acquireShared(1);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void release() {
// Release resources 
sync.releaseShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public int availablePermits() {
return sync.getPermits();
}
public int drainPermits() {
return sync.drainPermits();
}
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
public boolean isFair() {
return sync instanceof FairSync;
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}

Semaphore Class's default single parameter constructor creates an unfair synchronizer , The unfair synchronizer can reduce the thread context switch when multithreads acquire locks concurrently .

4.3 Semphore Get synchronization resources

Let's analyze first Semaphore Class to get the token's method call chain :

  1. call Semaphore#acquire() Method ;
  2. call AbstractQueuedSynchronizer#acquireSharedInterruptibly(1) Method ;

see java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly Method :

 /**
* Get resources in shared mode , If interrupted, give up .
*
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// The current thread has been interrupted , Throw an exception 
if (Thread.interrupted())
throw new InterruptedException();
// Try to get shared resources , If you return a negative number , Then execute the following to obtain resources in blocking mode 
if (tryAcquireShared(arg) < 0) {
// Execution obtains resources in a shared mode interruptible manner 
doAcquireSharedInterruptibly(arg);
}
}

This acquireSharedInterruptibly The method mentioned above is AQS Template method provided , It's a way to get synchronization status in shared mode , Will call first tryAcquireShared() Method attempts to get the resource , stay Semaphore Class Sync Subclass NonfairSync、FairSync The synchronizer implements this method .

We mainly analyze the implementation of unfair synchronizer . First NonfairSync It's an unfair synchronizer , It's rewritten AQS Of tryAcquireShared Method , The parent class is called inside Sync Of nonfairTryAcquireShared() Method , Let's look at how it works ,java.util.concurrent.Semaphore.FairSync#tryAcquireShared:

 /**
* Access to shared resources in a fair way
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// Infinite loop 
for (;;) {
// Determine whether there is a successor in the waiting queue 
if (hasQueuedPredecessors()) {
// There is a queue in front of it , Then return directly -1
return -1;
}
// Operating resources 
int available = getState();
// Resource reduction operation 
int remaining = available - acquires;
// Result resource less than 0, return 
// Result resource is greater than or equal to 0, Conduct CAS Set up , If the operation is successful, the result resource will be returned ;CAS If it fails, continue to perform the above operation again 
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}

Here we use the infinite loop operation , We got the synchronization status first , Then the synchronous state is subtracted , Its method parameters acquires The value in our example is 1, The new resource state value is then determined , If it is less than 0 No resources are available , Then go straight back ; If greater than or equal to 0, Then execute CAS Operation to update the synchronization status value , This is due to the high concurrency of multithreading access , Therefore, the operation may fail , Then perform the above operation again , Until the operation is successful or the synchronization status value is less than 0.

Then, after the method is executed , Then perform AQS Of doAcquireSharedInterruptibly Method , Look at its implementation .java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly:

/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Add a waiting node , Put the node at the end of the queue 
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// Infinite loop 
for (;;) {
// Get the precursor node of the node 
final Node p = node.predecessor();
if (p == head) {
// If the precursor node is equal to the head node ( The head node is an empty node )( The description is that the queue has just been initialized , And it has only one node ), Then try to get the shared resource again 
int r = tryAcquireShared(arg);
if (r >= 0) {
// If the resource is obtained , Then set the current node as the head node and propagation mode 
setHeadAndPropagate(node, r);
// Set the next node as null, Help garbage collection this node 
p.next = null; // help GC
// Setting failure is represented as false
failed = false;
return;
}
}
// Is it blocked when the resource acquisition fails ( Set node state ), And perform blocking and check interrupt 
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// Throw an exception 
throw new InterruptedException();
}
}
} finally {
if (failed) {
// Something is wrong , Then cancel the resource acquisition 
cancelAcquire(node);
}
}
}
/**
* After resource acquisition failure , Check and update node status
*
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// The state of the precursor node 
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// Signal status , The node has already set the state , Request release to signal , So that it can be blocked safely 
{
return true;
}
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// The state of the precursor node is cancelled , Delete these nodes 
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// Let the successor node of the precursor node point to node node 
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// Otherwise, the node state is 0 perhaps PROPAGATE(-3) state , Set the previous node status to SIGNAL(-1) state , The waiting state 
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* For the current thread and the given pattern ( Monopoly 、 Sharing mode ), To create a queued node node.
*
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// Create a CLH Waiting in line 
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// Get tail node variable 
Node pred = tail;
if (pred != null) {
// Tail node is not null, Then the node is passed through CAS Put the operation in the tail 
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// The tail node is empty , perhaps CAS When the operation fails , Do the following 
enq(node);
return node;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// Blocking threads 
LockSupport.park(this);
// Check if the thread is broken 
return Thread.interrupted();
}

The operation here is :

  1. First by calling addWaiter(Node.SHARED) Method , Create a CLH The node that synchronizes the queue for , And put the node at the end of the synchronization queue and return the node ;
  2. Then get the precursor node of the node , Determine whether the precursor node is the head node . If the precursor node is the head node , Indicates that there is no queued node in front , The current node can get resources , Then call tryAcquireShared() Method to get the synchronization state resource ; If the resource acquisition is successful, the method return value is greater than or equal to 0, Then set the current node as the head node , Then the method ends ;
  3. If the precursor node is not a head node , It shows that there are waiting nodes to get resources in front of it . Call shouldParkAfterFailedAcquire() Method , Update the waiting state of nodes ( Updated to Node.SIGNAL) And clearing the nodes in the cancelled state , And whether the return should be blocked ( That is, whether the waiting state is Node.SIGNAL);
  4. If it needs to be blocked , Then perform parkAndCheckInterrupt() Method , Method internal invocation LockSupport.park(this) Block the current thread ;
  5. If the blocking is successful , Then the thread will wait for other threads to release resources or be interrupted by other threads to wake it up .

OK, The above set of operations is a thread acquisition Semphore The token ( namely AQS The synchronization state of ) The process of . It implements the trial resource first, and then , If the resource acquisition is successful , If the fetch fails, the current thread is constructed as a CLH Synchronization queue node , Put at the end of the queue , Then thread blocking is performed , Wait for other threads to wake it up or interrupt it .

4.4 Semphore Release synchronization resources

Semphore The release resource of is called release() Accomplished . Let's look at the calling procedure under it .

 public void release() {
// Release resources 
sync.releaseShared(1);
}

By calling AQS Synchronizer's releaseShared(int arg) Method :

 /**
* Sharing mode releases resources .
*
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
// Try releasing resources first 
if (tryReleaseShared(arg)) {
// Execution release 
doReleaseShared();
return true;
}
return false;
}

See, it's called first tryReleaseShared(arg) Methods and doReleaseShared() Method .

Semphore Class is overridden in the synchronizer of the tryReleaseShared(arg) Method ,java.util.concurrent.Semaphore.Sync#tryReleaseShared:

/**
* Free up shared resources
*
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
// Infinite loop , It is to ensure that the operation under high concurrency can be successful 
for (;;) {
// Access to resources 
int current = getState();
// Add resources 
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS Set up resources 
if (compareAndSetState(current, next))
return true;
}
}

The operation here is to get the synchronization status , Then let the state value increase , adopt CAS Operation setting synchronization status , Until the status is set successfully .

look down java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared:

 /**
* Release shared mode behavior , Set the status of the successor to the signal state , And ensure communication .
*
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// Infinite loop 
for (;;) {
// Get the header node 
Node h = head;
// The header node is not empty , And the head node is not equal to the tail node 
if (h != null && h != tail) {
// The state of the head node 
int ws = h.waitStatus;
// If it is SIGNAL state 
if (ws == Node.SIGNAL) {
// CAS Set the node status to 0, No more blocking is required 
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue; // loop to recheck cases
}
// Wake up the successor node , Let the successor nodes start to compete for resource operations 
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// Node status is 0, And through CAS Set the node status to PROPAGATE(-3) Way of communication 
continue; // loop on failed CAS
}
}
// The head node did not change , Then exit the loop 
if (h == head) // loop if head changed
{
break;
}
}
}
/**
* Wake up the successor node
*
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// Node status 
int ws = node.waitStatus;
if (ws < 0) {
// If the node status is less than 0, Set the status to 0
compareAndSetWaitStatus(node, ws, 0);
}
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// Get the next node , The subsequent nodes 
Node s = node.next;
// The next node is empty , Or the state of the next node is greater than 0, namely CANCELLED(1) state 
if (s == null || s.waitStatus > 0) {
s = null;
// Traverse from the end of the queue , Until the current node is traversed , Find nodes whose node status is not cancelled 
for (Node t = tail; t != null && t != node; t = t.prev) {
// Find node status is not CANCELLED(1) Nodes of state 
if (t.waitStatus <= 0) {
// This is always looking for , Even if we find it, we will continue to find , Until the predecessor node is empty or the predecessor node is equal to the current node 
s = t;
}
}
}
// The following nodes are found 
if (s != null) {
// Execute the thread that wakes this successor node 
LockSupport.unpark(s.thread);
}
}

This operation is mainly to set the node status of the synchronization queue header node to 0, And then call unparkSuccessor() Method wakes the waiting node thread . The operation to wake up the thread is to call LockSupport.unpark(Thread) Method .

After the current node releases resources , The thread that is awakened by the switch will then parkAndCheckInterrupt() Return in method , Then we start a new round of operation to obtain synchronous resources .

4.5 Semphore Summary

Now let's go through Semphore Class to get and release resources to understand AQS The process of getting and releasing synchronization state in shared mode , This includes when a thread fails to get synchronous resources , Get into AQS Synchronous queue queue blocking 、 The process of being awakened .

So let's go on to see AQS The process of acquiring and releasing synchronous state in exclusive mode .

5. ReentrantLock

ReentrantLock It is an exclusive lock , Its class diagram structure is as follows :

ReentrantLock

You can see that it also has a static inside AQS Implementation class Sync, There are also fair and unfair synchronizers .

5.1 ReentrantLock Example

Examples of its use are as follows :

public static void main(String[] args) {
// Create unfair lock by default 
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
doLock(lock);
}, "t0").start();
doLock(lock);
}
private static void doLock(ReentrantLock2 lock) {
try {
lock.lock();
System.out
.println(Thread.currentThread().getName() + " The thread gets the lock ");
// sleep 1 second 
Thread.sleep(1000);
System.out
.println(Thread.currentThread().getName() + " The thread released the lock ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println();
lock.unlock();
}
}

Its output is as follows :

main The thread gets the lock
main The thread released the lock
t0 The thread gets the lock
t0 The thread released the lock

This example creates a ReentrantLock example , By default, it uses an unfair lock , Start a thread to lock and unlock , Then the main thread also locks and unlocks , You can see from the output , When a thread acquires a lock , Other threads block when they acquire locks , Until the thread holding the lock releases the lock .

It guarantees the exclusivity of the lock .

5.2 ReentrantLock Get the lock

ReentrantLock The default construction method for is as follows :

/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

It creates an unfair synchronizer by default .

ReentrantLock Lock operation of , Its code is java.util.concurrent.locks.ReentrantLock#lock:

public void lock() {
sync.lock();
}

It will call the synchronizer lock() Method , Let's look at its implementation in terms of an unfair synchronizer ,java.util.concurrent.locks.ReentrantLock.NonfairSync#lock:

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// Try to get resources first , adopt CAS Resources from 0 Set to 1
if (compareAndSetState(0, 1)) {
// If successful , Set the current thread to the exclusive thread 
setExclusiveOwnerThread(Thread.currentThread());
} else {
// Access to resources 
acquire(1);
}
}

Here's the logic :

  1. Try to pass first CAS The synchronization status is changed from 0 Set to 1, If the setting is successful , execute setExclusiveOwnerThread() Method , Set the current thread as exclusive ;
  2. If it is not set successfully , Indicates that another thread has changed the synchronization state , Then perform acquire(int arg) Method .

``setExclusiveOwnerThread(thread)` The method is java.util.concurrent.locks.AbstractOwnableSynchronizer#setExclusiveOwnerThread Class method :

/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
// Set exclusive thread 
exclusiveOwnerThread = thread;
}

acquire(int arg) The method is java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire Methods :

/**
* Get resources in exclusive mode , Ignore interrupt
*
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
/*
Access to resources
1. Try to get resources state
2. Add waiting node Node, The waiting queue is created
3. Get resources from the queue , Infinite loop
4. If the thread is interrupted, set the interrupt state
*/
if (!tryAcquire(arg)) {
// Create the node of the current thread exclusively , And added to the end of the queue 
Node node = addWaiter(Node.EXCLUSIVE);
// Access to resources 
if (acquireQueued(node, arg)) {
// If getting resources fails , The thread is interrupted 
selfInterrupt();
}
}
}

Logic and acquireShared(int arg) The method is similar to .

  1. Execute first tryAcquire(arg) Method attempts to get the resource ;
  2. If the resource acquisition is successful , If the fetch fails , Then execute addWaiter(Node.EXCLUSIVE) Method , Create a synchronization queue node , And put it at the end of the queue , However, it is different from the synchronous queue node in shared mode , It is an exclusive node ;
  3. Then perform acquireQueued(node, arg) Method , Access to resources .

among tryAcquire(arg) Method , It's in ReentrantLock Asynchronism of NonfairSync Has been rewritten ,java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire:

 protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Access to resources in an unfair way
*
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
// Current thread 
final Thread current = Thread.currentThread();
// resources 
int c = getState();
// Resources for 0, It means that no one has access to resources 
if (c == 0) {
// CAS Set resource status 
if (compareAndSetState(0, acquires)) {
// If the resource is set successfully, the , Set exclusive thread 
setExclusiveOwnerThread(current);
return true;
}
}
// Resources are acquired by them 
else if (current == getExclusiveOwnerThread()) {
// If it is the resource obtained by the current thread , Let resources add one 
int nextc = c + acquires;
if (nextc < 0) // overflow
{
throw new Error("Maximum lock count exceeded");
}
// Set resource status 
setState(nextc);
return true;
}
// Other cases will be returned directly false, Indicates that the resource acquisition failed 
return false;
}

Its internal call nonfairTryAcquire(int acquires) Method , Logic is :

  1. Get the synchronization state , If the state is 0, Is to CAS The value of the action setting state , If the setting is successful , Re execution setExclusiveOwnerThread(current) Method to set the exclusive thread , Method end return true;
  2. If the synchronization state is not 0, perhaps CAS operation failed , Then judge whether the current thread monopolizes the thread . If the thread is an exclusive thread , Indicates that the thread has got the status , The status value is increased ( The meaning of reentering is embodied here ,), Then set the new status value ( There's no need to CAS operation , Because it got the status , Don't worry about being modified, you can modify it directly ,state yes volatile Embellished , A modification of it can be immediately perceived by other threads ), Method end return true;
  3. If none of the above conditions are met , Then return directly false, Method end .

And then look at acquireQueued(Node node, int arg) Method , Look at its implementation java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued:

/**
* Acquire resources in exclusive mode , Joining operation
*
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// Infinite loop acquisition 
for (;;) {
// Get the previous node 
final Node p = node.predecessor();
// Try to get resources 
if (p == head && tryAcquire(arg)) {
// Set head node 
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// Acquisition failure , Set node state 
// Blocking threads and checking for interrupts , After the jam , It needs to be released or interrupted by the precursor node , To continue 
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// If it fails, cancel the acquisition of resources 
if (failed)
cancelAcquire(node);
}
}

Again , The method here is the same as that mentioned above Semphore When you see AQS Of doAcquireSharedInterruptibly(int arg) The method logic is very similar , The logic is :

  1. Get the precursor node of the node , If the precursor node is equal to the head node , Indicates that there are no queued nodes , Then perform tryAcquire(arg) Method to get resources ;
  2. tryAcquire(arg) Method to obtain resources successfully , Set the node as the head node of the synchronization queue , Then the method ends , return true;
  3. If the precursor node of a node is not a head node , Or an attempt to get resources failed , execute shouldParkAfterFailedAcquire(p, node) Method to update the waiting state of the node and determine whether it is to be blocked ;
  4. If it needs to be blocked , execute parkAndCheckInterrupt() Block the thread . The thread needs to wait for other threads to wake up or interrupt the thread .

Such a ReentrantLock The lock acquisition process is over , In retrospect , If a thread wants to acquire lock resources , Try to get it first , If the acquisition is successful, set the exclusive thread ; If the fetch fails , Then build a synchronization queue node in exclusive mode of the thread , Add the node to the end of the synchronization queue , Then try to get resources or block , Until it is awakened or interrupted by another thread , Then repeat the above operations of getting resources or blocking .

5.3 ReentrantLock Release the lock

ReentrantLock The operation to release the lock is unlock() Method ,java.util.concurrent.locks.ReentrantLock#unlock The implementation is as follows :

public void unlock() {
sync.release(1);
}

It calls the synchronizer's release(int arg) Method ,java.util.concurrent.locks.AbstractQueuedSynchronizer#release The implementation is as follows :

/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// Try to free resources 
if (tryRelease(arg)) {
// Release successful , obtain CLH Wait for the head node in the queue 
Node h = head;
// Judge that it is not empty , And its waiting state is not 0
if (h != null && h.waitStatus != 0) {
// Wake up the successor node 
unparkSuccessor(h);
}
return true;
}
return false;
}

The logic of releasing resources is also the same releaseShared(int arg) similar :

  1. Execute first tryRelease(arg) Try to free resources ;
  2. If the release fails, it will be returned directly false, Method end .
  3. If the release is successful , Get synchronization queue header node , Judge whether it is not empty , And its waiting state is not 0, Then execute unparkSuccessor(h) Method to wake up the subsequent waiting nodes , Then return true.

tryRelease(arg) Method in ReentrantLock Of Sync The synchronizer is implemented as java.util.concurrent.locks.ReentrantLock.Sync#tryRelease:

protected final boolean tryRelease(int releases) {
// Release resource status 
// Current resource minus resource to release 
int c = getState() - releases;
// Checks whether the current thread is exclusive to the thread 
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
// whether 
boolean free = false;
// The released resource status is 0
if (c == 0) {
// Indicates that the resource is idle 
free = true;
// Set the exclusive thread to null
setExclusiveOwnerThread(null);
}
// Set resource status 
setState(c);
return free;
}

Logic :

  1. Get synchronization resources , Then subtract to get the new value ;
  2. Determine whether the current thread is exclusive , If not, an exception is thrown , Describes illegal operations by other threads ;
  3. Determine whether the new status value is 0, If 0 The resource is idle , Empty exclusive threads . This is mainly for ReentrantLock Many times lock It must correspond to the same number of times unlock Method , Otherwise, resources will not be released .
  4. Finally, set the new synchronization resource status value , Method end .

5.4 ReentrantLock Summary

ReentrantLock We have learned the process of getting and releasing locks , It's an exclusive acquisition of resources , When a thread acquires a lock , Other threads cannot get it , At the same time, it supports multiple locking , Of course, it also needs the same number of unlocking operations . When a thread wants to acquire the lock resource ( Default unfair lock situation ), Try to get the resource first , If the fetch fails , Create the synchronization queue exclusive mode node of the current thread , And added to the end of the synchronization queue , Then try to get resources or block operations , Until another thread releases the lock resource or interrupts it , The thread will then perform its own operation to acquire the lock resource .

6. ConditionObject Conditions of the object

up to now , We have learned that AQS Characteristics : fair / Unfair 、 Monopoly / share 、 Line up in the same team , Then there is a conditional queue feature .

This conditional queue is special , It is also a queue , Same as synchronization queue Node Node class , But it's a single linked list . It has been used. nextWaiter Field to represent the next waiting node .

Its nodes are defined by the AQS Inner class in ConditionObject To create , The node of the conditional queue shares the AQS Properties of instances ( Synchronize resources 、 Threads ).

The working mechanism of conditional queue is : The nodes of the conditional queue are converted to the nodes of the synchronous queue , Synchronization queue node get resource operation .

ConditionObject The class diagram of is :

ConditionObject

It mainly passes await() and signal() Related methods are used to block and wake up threads .

6.1 ConditionObject Example


public static void main(String[] args) throws InterruptedException {
// Create a lock 
ReentrantLock lock = new ReentrantLock();
// Create a conditional object on a lock 
Condition c1 = lock.newCondition();
// t1 Thread call condition wait 
new Thread(() -> {
// Lock 
lock.lock();
System.out.println(Thread.currentThread().getName() + ": Lock !");
try {
System.out.println(Thread.currentThread().getName() + ": Too tired , I'll go to bed first , Wake me up later !");
// Let conditions wait 
c1.await();
System.out.println(Thread.currentThread().getName() + ": I wake up !");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": Unlock !");
// Unlock 
lock.unlock();
}
}, "t1").start();
Thread.sleep(1000);
// t2 Thread call condition wake up 
new Thread(() -> {
// Lock 
lock.lock();
System.out.println(Thread.currentThread().getName() + ": Lock !");
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + ": start to eat , Get up !");
// Conditional arousal 
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": Unlock !");
// Unlock 
lock.unlock();
}
}, "t2").start();
}

The above code creates a reentry lock , Create a conditional object from a lock , One thread t1 To lock , Call the wait operation of the condition object , Finally release the lock ; Another thread calls the lock , Then call the wake-up operation of the conditional object. , Finally release the lock .

The output result is :

t1: Lock !
t1: Too tired , I'll go to bed first , Wake me up later !
t2: Lock !
t2: start to eat , Get up !
t2: Unlock !
t1: I wake up !
t1: Unlock !

This means ,t1 Program first acquire lock , Then the conditional object is called. await() Method , Put the thread into a blocked state , Subsequent threads t2 Get lock , Then the conditional object is called. signal() Wake up the way ,t2 Release the lock ,t1 The thread wakes up and performs the rest of its operations .

6.2 Conditional object wait

Let's look at the creation process of conditional objects ,lock.newCondition() Method ,java.util.concurrent.locks.ReentrantLock#newCondition:

public Condition newCondition() {
return sync.newCondition();
}

It also calls the synchronizer's create condition method ,java.util.concurrent.locks.ReentrantLock.Sync#newCondition:

final ConditionObject newCondition() {
return new ConditionObject();
}

Then take a look at it await() Method ,java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await():

/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
// Determine whether the thread is interrupted 
if (Thread.interrupted()) {
throw new InterruptedException();
}
// Add a conditional wait queue node 
Node node = addConditionWaiter();
// Release node resources completely , Returns the saved resource state 
int savedState = fullyRelease(node);
// Interrupt mode 
int interruptMode = 0;
// Infinite cycle , Determine whether the conditional waiting node exists or not CLH In the synchronization queue 
while (!isOnSyncQueue(node)) {
// If it doesn't exist CLH In the synchronization queue , Block the current thread 
LockSupport.park(this);
// The current thread is awakened , Check interrupt and wireless wait 
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// Here we are , Indicates that the thread has been awakened 
// To acquire exclusive mode resources 
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
// If the resource acquisition is successful , And the interrupt mode is not THROW_IE
// Set the interrupt mode to REINTERRUPT
interruptMode = REINTERRUPT;
}
// Check the node again , Clean up the condition waiting node of cancelled state 
if (node.nextWaiter != null) // clean up if cancelled
{
// Delete the condition waiting node that has been cancelled 
unlinkCancelledWaiters();
}
// If the interrupt mode is not 0, The report is interrupted 
if (interruptMode != 0) {
// Report interrupt thread 
reportInterruptAfterWait(interruptMode);
}
}
/**
* Add a conditional wait to wait right .
*
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
// Gets the last conditional wait node 
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// If the last condition has been cancelled , Then clear it 
if (t != null && t.waitStatus != Node.CONDITION) {
// Clear waiters for cancellation status 
unlinkCancelledWaiters();
// Reacquire the last waiting node 
t = lastWaiter;
}
// Add a conditional queue , Its waiting state is Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
// Assign a value to the first waiting node 
firstWaiter = node;
} else {
// Add a new node to the last waiting node 
t.nextWaiter = node;
}
// Assign a value to the last waiting node 
lastWaiter = node;
return node;
}
/**
* Call release with current state value ; Return to saved state . Cancel the node and throw an exception if it fails .
*
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
// Get synchronizer resource status 
int savedState = getState();
// Release resources 
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// If the release resource fails in the graph , Set the waiting state of the node to cancel state Node.CANCELLED
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}

The main logic here is :

  1. First check whether the thread is interrupted , If it is interrupted, an exception will be thrown ;
  2. perform addConditionWaiter() Method , Create a conditional queue node new Node(Thread.currentThread(), Node.CONDITION) , Add the node to the end of the conditional queue ;
  3. perform fullyRelease() Method , Release resources , The synchronization state is released , At the same time, delete the synchronization queue node corresponding to the current thread ;
  4. perform isOnSyncQueue(node) Determine whether the node is not a synchronous queue node , If it is not a synchronous queue node , Then block , Waiting to be awakened or interrupted by another thread .
  5. If the thread is awakened , Is to perform acquireQueued(node, saveState) Access to resources ;
  6. Check the node again , Clean up the condition waiting node of cancelled state ;
  7. Handle interrupt information if necessary .

The logic here can be understood as , The synchronization state thread is currently obtained , If the current thread is in the synchronization queue, remove the node , Then create a conditional wait queue node , And put it at the end of the condition queue , Then block the current thread , Try to get the synchronization resource again when awakened .

6.3 Conditional object wake up

Let's take a look at signal() Wake up the way ,java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal:

/**
* The thread that will wait the longest ( If there is ) From the waiting queue of the condition to the queue with the lock .
*
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// Gets the first conditional wait node 
Node first = firstWaiter;
if (first != null) {
// Perform the condition of wake-up header to wait for the node 
doSignal(first);
}
}
private void doSignal(Node first) {
// Infinite loop 
do {
// Gets the next wait of the first conditional wait , Assign a value to the first condition wait 
if ( (firstWaiter = first.nextWaiter) == null) {
// If it is empty , Description the condition queue is empty , Set the last conditional wait variable to empty as well 
lastWaiter = null;
}
// Set the next wait attribute of conditional wait to empty 
first.nextWaiter = null;
// Here is the head node of the conditional wait queue 
// Convert conditional wait node to CLH Synchronize queue nodes and join CLH In the synchronization queue 
// If the conversion node fails , Then, the firstWaiter The node of is assigned to first
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// Set the waiting state of the node , from Node.CONDITION Set to 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
return false;
}
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// Add conditional wait node to CLH The tail of the synchronization queue , Returns the precursor node of the node 
Node p = enq(node);
// Get the waiting state of the precursor node 
int ws = p.waitStatus;
// If the node wait state is greater than 0, That is, cancel status , Or by CAS Set the waiting state of the precursor node to SIGNAL operation failed , Then wake up the node 
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
// Only the waiting state of the precursor node is the cancel state , Or set the waiting state of the precursor node to Node.SIGNAL When the failure , The thread that wakes the current node will be executed 
LockSupport.unpark(node.thread);
}
return true;
}

The main logic here is , Get condition queue , Remove head node , Then it turns the node into a synchronous queue node , Put to the end of the synchronization queue , Wake up the thread of the node at the same time , Return method .

The wake-up thread starts again to get the synchronization resources .

The workflow of the condition object is as follows :

 Conditional wake up node conversion process

stay JDK There are a lot of conditional object functions in , such as ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque Blocking queues and so on .

7. Summary

Of course JDK Of JUC There are many other synchronization tool classes in the package that use AQS To complete the synchronization function , Their core principles are the same , They are synchronized state resources by operation , By means of synchronous queue and conditional wait queue, the thread that obtains resources is queued .

Only to understand AQS In order to have a deeper understanding of concurrency related components .

8. Reference material

  • java The art of concurrent programming
  • JDK 1.8 Source code

The above code has been uploaded to my GitHub Yes .

版权声明
本文为[goats]所创,转载请带上原文链接,感谢

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云