0. Catalog
0. Catalog
1. AQS Introduction to
2. AQS Basic knowledge of
3. AQS How to use ?
4. Semaphore
4.1 Semaphore Example
4.2 Semaphore Class structure
4.3 Semphore Get synchronization resources
4.4 Semphore Release synchronization resources
4.5 Semphore Summary
5. ReentrantLock
5.1 ReentrantLock Example
5.2 ReentrantLock Get the lock
5.3 ReentrantLock Release the lock
5.4 ReentrantLock Summary
6. ConditionObject Conditions of the object
6.1 ConditionObject Example
6.2 Conditional object wait
6.3 Conditional object wake up
7. Summary
8. Reference material
Let's learn today AQS.
1. AQS Introduction to
What is? AQS Well ?AQS yes java.util.concurrent.locks.AbstractQueuedSynchronizer Class . that AbstractQueuedSynchronizer What is this class ?
It is an abstract queue synchronizer , Mainly used in java In multithreading programming model , Used to build locks or other synchronization components , To ensure the operation of data security between multithreads, a basic framework of concurrent programming .
stay jdk Medium java.util.concurrent( abbreviation juc) In bag , Many concurrent classes are used AQS class , For example, we are very familiar with ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch These classes are used AQS.
2. AQS Basic knowledge of
In the interpretation of the AQS Before the way of working , We need to understand a few concepts first , They are :
- sync : It's a quilt volatile Keyword modifier int Variable of type , stay AQS Is used to represent resources in ;
- CAS:compareAndSet Compare and exchange , This is an atomic operation , stay AQS It is mainly used to operate the synchronization state . stay JDK1.8 Pass through Unsafe Class to operate , Will call the system's underlying function library to achieve ;
- Thread blocking and wake up : The thread that gets the resource needs to be queued , You need to block and wake up in the queue , Here it is AQS Pass through LockSupport Class park() and unpark() Method to block and wake the thread , In fact, it calls Unsafe Of park、unpark Method ;
- Synchronous queue : stay AQS The thread that gets the resource in the queue will enter the queue and wait , The queue here is the synchronization queue , go by the name of CLH Synchronous queue .CLH It's the initials of three English characters , It's a two-way linked list invented by three people , There is a waiting state in the queue 、 Threads 、 Precursor node 、 These attributes of subsequent nodes ;
- Condition queue : stay AQS There is also a kind of queue called conditional queue , It shares a node class with the synchronization queue above , But its usage scenarios are used in some scenarios that need to meet the conditions . It's a one-way linked list , Has a wait state 、 Threads 、 Next wait node properties ;
- Exclusive mode and sharing mode : When getting resources , It can be declared whether the acquired mode is exclusive or shared . The so-called exclusive means that only one thread is allowed to acquire resources at the same time , The shared mode allows multiple threads to acquire resources at the same time . We can see its usage in the synchronous blocking queue .
- Fair and unfair : Fairness means that when a thread needs to get resources , Whether waiting threads in the same queue or not , They will try to seize resources first , If you don't get the resources, you have to wait in line . Unfairness is when the current thread needs to obtain resources , If there are already queued threads in the synchronization queue , Then the thread will be obediently queued .
The structure of the synchronization queue is shown in the figure :
The structure of the condition queue is shown in the figure below :
I know the concepts above , Let's see AQS How it works .
AQS There's a... Inside int Type of state Member variables to represent the synchronization state , Through the built-in FIFO Queue to complete the queuing of resource acquisition thread . It implements a set of logic to obtain resources and thread queuing , Have access to resources 、 Release resources and other operations , Abstract the methods that need to obtain resources , Through the way of subclass calling the internal synchronization state, the operation of getting and releasing resources can be realized .
3. AQS How to use ?
AQS It is based on the template method pattern , Users need inheritance AQS To display the specified method , And then AQS Combined in the implementation of a custom synchronization component , And call the template method provided by the synchronizer , These template methods will call the methods rewritten by the consumer .
AQS The main way to use is through subclass inheritance , Subclass implements its abstract methods to provide managed synchronization state .
AQS Overridable methods provided , According to the mode of work, it can be divided into two categories :
-
Exclusive mode
- tryAcquire(int arg): Get sync state exclusively , To implement this method, we need to query the current state and judge whether the synchronization state meets the expectation , And then we can move on CAS Set synchronization state .
- tryRelease(int arg): Exclusive release synchronization state , Threads waiting to get synchronization status will have the opportunity to get synchronization status .
- isHeldExclusively(): Is the current synchronizer occupied by threads in exclusive mode , Generally, this method indicates whether it is exclusive to the current thread lock .
-
Sharing mode
- tryAcquireShared(int arg): Shared get synchronization status , Return greater than or equal to 0 Value , Show success , conversely , Acquisition failure .
- tryReleaseShared(int arg): Shared release synchronization state .
When subclasses override these methods , Need to use AQS Provides methods to access and modify the synchronization state , These methods are :
- getState(): Gets the current synchronization status ;
- setState(int newState): Set the current synchronization state ;
- compareAndSetState(int expect, int update): Use CAS Method to set the current state , This method can ensure the atomicity of state setting .
Subclass implementation AQS The way when , It can provide two kinds of access to resources, fair and uneven .
AQS The template methods provided are also divided into two categories according to the working mode :
- Exclusive mode
- void acquire(int arg): Get sync state exclusively , If the current thread gets the synchronization status successfully , The method returns , otherwise , Will enter the synchronization queue and wait , This method will call overridden tryAcquire(int arg) Method .
- void acquireInterruptibly(int arg): And acquire(int arg) identical , But the method responds to interrupts , The current thread does not get the synchronization status and enters the synchronization queue , If the current thread is interrupted , The method throws an interrupt exception and returns .
- boolean tryAcquireNanos(int arg, int nanos): stay acquireInterruptibly(int arg) Based on this, the timeout limit is increased , If the current thread does not get the synchronization status within the timeout period , So it's going to return false, If it is obtained, it will be returned true.
- boolean release(int arg): Exclusive release synchronization state , This method will release the synchronization state , Wake up the thread contained in the first node in the synchronization queue .
- Sharing mode
- void acquireShared(int arg): Shared get synchronization status , If the current thread does not get the synchronization state , Will enter the synchronization queue and wait . The main difference with exclusive acquisition is that multiple threads can get the synchronization state at the same time .
- void acquireSharedInterruptibly(int arg): And acquireShared(int arg) identical , This method responds to an interrupt .
- boolean tryAcquireNanos(int arg, int nanos): stay acquireSharedInterruptibly(int arg) Based on this, the timeout limit is increased .
- boolean releaseShared(int arg): Shared release of resources .
- Collection<Thread> getQueuedThreads(): Gets the collection of threads waiting on the synchronization queue .
4. Semaphore
We go through Semaphore Class AQS How did it work .
Semaphore Semaphore is a synchronization tool class , Its usage is to declare a token with certain resources , Then other threads can get and release these tokens , When there is no token , Threads need to block and wait , Until the token is released by another thread .
Semaphore Class can be used as resource isolation and flow limiting for threads , such as springcloud Medium hystrix One way of resource isolation and current limiting technology in components is to use Semaphore Semaphores achieve .
4.1 Semaphore Example
Let's take a look first Semaphore The use of the class :
public static void main(String[] args) throws InterruptedException {
// Create a semaphore , The number of tokens is 1
Semaphore semaphore = new Semaphore(1);
// Start a thread
Thread t1 = new Thread(() -> {
try {
// Get a token
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " Get a resource ");
// t1 Thread sleep 2 second
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " Released a resource ");
// release token
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, " Threads 1");
// Start thread
t1.start();
// Main thread sleep 1 second
Thread.sleep(1000);
// The main thread gets the token
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " The thread got the resource ");
// The main thread releases the token
semaphore.release();
System.out.println(Thread.currentThread().getName() + " Thread released to resource ");
}
As you can see, the above example is to create a token with the number of 1 The amount of signal , Then start a thread t1,t1 Get token first , Then sleep 2 second , Finally, the token is released ; The main thread is in t1 Sleep after thread starts 1 second ( In order to make t1 The thread got the token ), Then try to get the token .
The result of running the program is :
Threads 1 Get a resource
Threads 1 Released a resource
main The thread got the resource
main Thread released to resource
You can see ,Semaphore Token acquisition operation of semaphore , When the token is used up , Effectively blocking the thread that needs to get the token , Until another thread releases the token .
4.2 Semaphore Class structure
Let's take a look at its class diagram structure :
Semaphore An inheritance is implemented internally AQS The synchronizer Sync, It also provides its subclasses NonfairSync Unfair synchronizer 、FairSync Fair synchronizer .
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
private final Sync sync;
/**
* Rewrote AQS class , It's realized tryAcquireShared、tryReleaseShared class .
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
/**
* @return Get token
*/
final int getPermits() {
return getState();
}
/**
* Unfair access to shared resources
*
* @param acquires
* @return
*/
final int nonfairTryAcquireShared(int acquires) {
// Infinite loop , It is to ensure that the operation under high concurrency can be successful
for (;;) {
// obtain volatile resources
int available = getState();
// Executive resources minus one
int remaining = available - acquires;
// If the resource is less than 0 The exit , If the resource is greater than 0 Is to CAS Set up resources , If the setting fails, the loop will continue , Repeat the above operation
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* Free up shared resources
*
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
// Infinite loop , It is to ensure that the operation under high concurrency can be successful
for (;;) {
// Access to resources
int current = getState();
// Add resources
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS Set up resources
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* Unfair
*
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
/**
* In an unfair way , Try to get shared resources
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// Access to resources in an unfair way
return nonfairTryAcquireShared(acquires);
}
}
/**
* fair
*
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
/**
* Access to shared resources in a fair way
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// Infinite loop
for (;;) {
// Determine whether there is a successor in the waiting queue
if (hasQueuedPredecessors()) {
// There is a queue in front of it , Then return directly -1
return -1;
}
// Operating resources
int available = getState();
// Resource reduction operation
int remaining = available - acquires;
// Result resource less than 0, return
// Result resource is greater than or equal to 0, Conduct CAS Set up , If the operation is successful, the result resource will be returned ;CAS If it fails, continue to perform the above operation again
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public Semaphore(int permits) {
// By default, an unfair synchronizer is created
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
// Create a fair or unfair synchronizer
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
// Get shared mode resources
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
// Access to resources in an uninterrupted manner
sync.acquireShared(1);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void release() {
// Release resources
sync.releaseShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public int availablePermits() {
return sync.getPermits();
}
public int drainPermits() {
return sync.drainPermits();
}
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
public boolean isFair() {
return sync instanceof FairSync;
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
Semaphore Class's default single parameter constructor creates an unfair synchronizer , The unfair synchronizer can reduce the thread context switch when multithreads acquire locks concurrently .
4.3 Semphore Get synchronization resources
Let's analyze first Semaphore Class to get the token's method call chain :
- call Semaphore#acquire() Method ;
- call AbstractQueuedSynchronizer#acquireSharedInterruptibly(1) Method ;
see java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly Method :
/**
* Get resources in shared mode , If interrupted, give up .
*
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// The current thread has been interrupted , Throw an exception
if (Thread.interrupted())
throw new InterruptedException();
// Try to get shared resources , If you return a negative number , Then execute the following to obtain resources in blocking mode
if (tryAcquireShared(arg) < 0) {
// Execution obtains resources in a shared mode interruptible manner
doAcquireSharedInterruptibly(arg);
}
}
This acquireSharedInterruptibly The method mentioned above is AQS Template method provided , It's a way to get synchronization status in shared mode , Will call first tryAcquireShared() Method attempts to get the resource , stay Semaphore Class Sync Subclass NonfairSync、FairSync The synchronizer implements this method .
We mainly analyze the implementation of unfair synchronizer . First NonfairSync It's an unfair synchronizer , It's rewritten AQS Of tryAcquireShared Method , The parent class is called inside Sync Of nonfairTryAcquireShared() Method , Let's look at how it works ,java.util.concurrent.Semaphore.FairSync#tryAcquireShared:
/**
* Access to shared resources in a fair way
*
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
// Infinite loop
for (;;) {
// Determine whether there is a successor in the waiting queue
if (hasQueuedPredecessors()) {
// There is a queue in front of it , Then return directly -1
return -1;
}
// Operating resources
int available = getState();
// Resource reduction operation
int remaining = available - acquires;
// Result resource less than 0, return
// Result resource is greater than or equal to 0, Conduct CAS Set up , If the operation is successful, the result resource will be returned ;CAS If it fails, continue to perform the above operation again
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
Here we use the infinite loop operation , We got the synchronization status first , Then the synchronous state is subtracted , Its method parameters acquires The value in our example is 1, The new resource state value is then determined , If it is less than 0 No resources are available , Then go straight back ; If greater than or equal to 0, Then execute CAS Operation to update the synchronization status value , This is due to the high concurrency of multithreading access , Therefore, the operation may fail , Then perform the above operation again , Until the operation is successful or the synchronization status value is less than 0.
Then, after the method is executed , Then perform AQS Of doAcquireSharedInterruptibly Method , Look at its implementation .java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly:
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Add a waiting node , Put the node at the end of the queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// Infinite loop
for (;;) {
// Get the precursor node of the node
final Node p = node.predecessor();
if (p == head) {
// If the precursor node is equal to the head node ( The head node is an empty node )( The description is that the queue has just been initialized , And it has only one node ), Then try to get the shared resource again
int r = tryAcquireShared(arg);
if (r >= 0) {
// If the resource is obtained , Then set the current node as the head node and propagation mode
setHeadAndPropagate(node, r);
// Set the next node as null, Help garbage collection this node
p.next = null; // help GC
// Setting failure is represented as false
failed = false;
return;
}
}
// Is it blocked when the resource acquisition fails ( Set node state ), And perform blocking and check interrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// Throw an exception
throw new InterruptedException();
}
}
} finally {
if (failed) {
// Something is wrong , Then cancel the resource acquisition
cancelAcquire(node);
}
}
}
/**
* After resource acquisition failure , Check and update node status
*
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// The state of the precursor node
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// Signal status , The node has already set the state , Request release to signal , So that it can be blocked safely
{
return true;
}
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// The state of the precursor node is cancelled , Delete these nodes
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// Let the successor node of the precursor node point to node node
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// Otherwise, the node state is 0 perhaps PROPAGATE(-3) state , Set the previous node status to SIGNAL(-1) state , The waiting state
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* For the current thread and the given pattern ( Monopoly 、 Sharing mode ), To create a queued node node.
*
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// Create a CLH Waiting in line
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// Get tail node variable
Node pred = tail;
if (pred != null) {
// Tail node is not null, Then the node is passed through CAS Put the operation in the tail
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// The tail node is empty , perhaps CAS When the operation fails , Do the following
enq(node);
return node;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// Blocking threads
LockSupport.park(this);
// Check if the thread is broken
return Thread.interrupted();
}
The operation here is :
- First by calling
addWaiter(Node.SHARED)
Method , Create a CLH The node that synchronizes the queue for , And put the node at the end of the synchronization queue and return the node ; - Then get the precursor node of the node , Determine whether the precursor node is the head node . If the precursor node is the head node , Indicates that there is no queued node in front , The current node can get resources , Then call
tryAcquireShared()
Method to get the synchronization state resource ; If the resource acquisition is successful, the method return value is greater than or equal to 0, Then set the current node as the head node , Then the method ends ; - If the precursor node is not a head node , It shows that there are waiting nodes to get resources in front of it . Call
shouldParkAfterFailedAcquire()
Method , Update the waiting state of nodes ( Updated to Node.SIGNAL) And clearing the nodes in the cancelled state , And whether the return should be blocked ( That is, whether the waiting state is Node.SIGNAL); - If it needs to be blocked , Then perform
parkAndCheckInterrupt()
Method , Method internal invocationLockSupport.park(this)
Block the current thread ; - If the blocking is successful , Then the thread will wait for other threads to release resources or be interrupted by other threads to wake it up .
OK, The above set of operations is a thread acquisition Semphore The token ( namely AQS The synchronization state of ) The process of . It implements the trial resource first, and then , If the resource acquisition is successful , If the fetch fails, the current thread is constructed as a CLH Synchronization queue node , Put at the end of the queue , Then thread blocking is performed , Wait for other threads to wake it up or interrupt it .
4.4 Semphore Release synchronization resources
Semphore The release resource of is called release()
Accomplished . Let's look at the calling procedure under it .
public void release() {
// Release resources
sync.releaseShared(1);
}
By calling AQS Synchronizer's releaseShared(int arg)
Method :
/**
* Sharing mode releases resources .
*
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
// Try releasing resources first
if (tryReleaseShared(arg)) {
// Execution release
doReleaseShared();
return true;
}
return false;
}
See, it's called first tryReleaseShared(arg)
Methods and doReleaseShared()
Method .
Semphore Class is overridden in the synchronizer of the tryReleaseShared(arg)
Method ,java.util.concurrent.Semaphore.Sync#tryReleaseShared:
/**
* Free up shared resources
*
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
// Infinite loop , It is to ensure that the operation under high concurrency can be successful
for (;;) {
// Access to resources
int current = getState();
// Add resources
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS Set up resources
if (compareAndSetState(current, next))
return true;
}
}
The operation here is to get the synchronization status , Then let the state value increase , adopt CAS Operation setting synchronization status , Until the status is set successfully .
look down java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared:
/**
* Release shared mode behavior , Set the status of the successor to the signal state , And ensure communication .
*
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// Infinite loop
for (;;) {
// Get the header node
Node h = head;
// The header node is not empty , And the head node is not equal to the tail node
if (h != null && h != tail) {
// The state of the head node
int ws = h.waitStatus;
// If it is SIGNAL state
if (ws == Node.SIGNAL) {
// CAS Set the node status to 0, No more blocking is required
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue; // loop to recheck cases
}
// Wake up the successor node , Let the successor nodes start to compete for resource operations
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// Node status is 0, And through CAS Set the node status to PROPAGATE(-3) Way of communication
continue; // loop on failed CAS
}
}
// The head node did not change , Then exit the loop
if (h == head) // loop if head changed
{
break;
}
}
}
/**
* Wake up the successor node
*
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// Node status
int ws = node.waitStatus;
if (ws < 0) {
// If the node status is less than 0, Set the status to 0
compareAndSetWaitStatus(node, ws, 0);
}
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// Get the next node , The subsequent nodes
Node s = node.next;
// The next node is empty , Or the state of the next node is greater than 0, namely CANCELLED(1) state
if (s == null || s.waitStatus > 0) {
s = null;
// Traverse from the end of the queue , Until the current node is traversed , Find nodes whose node status is not cancelled
for (Node t = tail; t != null && t != node; t = t.prev) {
// Find node status is not CANCELLED(1) Nodes of state
if (t.waitStatus <= 0) {
// This is always looking for , Even if we find it, we will continue to find , Until the predecessor node is empty or the predecessor node is equal to the current node
s = t;
}
}
}
// The following nodes are found
if (s != null) {
// Execute the thread that wakes this successor node
LockSupport.unpark(s.thread);
}
}
This operation is mainly to set the node status of the synchronization queue header node to 0, And then call unparkSuccessor()
Method wakes the waiting node thread . The operation to wake up the thread is to call LockSupport.unpark(Thread)
Method .
After the current node releases resources , The thread that is awakened by the switch will then parkAndCheckInterrupt()
Return in method , Then we start a new round of operation to obtain synchronous resources .
4.5 Semphore Summary
Now let's go through Semphore Class to get and release resources to understand AQS The process of getting and releasing synchronization state in shared mode , This includes when a thread fails to get synchronous resources , Get into AQS Synchronous queue queue blocking 、 The process of being awakened .
So let's go on to see AQS The process of acquiring and releasing synchronous state in exclusive mode .
5. ReentrantLock
ReentrantLock It is an exclusive lock , Its class diagram structure is as follows :
You can see that it also has a static inside AQS Implementation class Sync, There are also fair and unfair synchronizers .
5.1 ReentrantLock Example
Examples of its use are as follows :
public static void main(String[] args) {
// Create unfair lock by default
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
doLock(lock);
}, "t0").start();
doLock(lock);
}
private static void doLock(ReentrantLock2 lock) {
try {
lock.lock();
System.out
.println(Thread.currentThread().getName() + " The thread gets the lock ");
// sleep 1 second
Thread.sleep(1000);
System.out
.println(Thread.currentThread().getName() + " The thread released the lock ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println();
lock.unlock();
}
}
Its output is as follows :
main The thread gets the lock
main The thread released the lock
t0 The thread gets the lock
t0 The thread released the lock
This example creates a ReentrantLock example , By default, it uses an unfair lock , Start a thread to lock and unlock , Then the main thread also locks and unlocks , You can see from the output , When a thread acquires a lock , Other threads block when they acquire locks , Until the thread holding the lock releases the lock .
It guarantees the exclusivity of the lock .
5.2 ReentrantLock Get the lock
ReentrantLock The default construction method for is as follows :
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
It creates an unfair synchronizer by default .
ReentrantLock Lock operation of , Its code is java.util.concurrent.locks.ReentrantLock#lock:
public void lock() {
sync.lock();
}
It will call the synchronizer lock()
Method , Let's look at its implementation in terms of an unfair synchronizer ,java.util.concurrent.locks.ReentrantLock.NonfairSync#lock:
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// Try to get resources first , adopt CAS Resources from 0 Set to 1
if (compareAndSetState(0, 1)) {
// If successful , Set the current thread to the exclusive thread
setExclusiveOwnerThread(Thread.currentThread());
} else {
// Access to resources
acquire(1);
}
}
Here's the logic :
- Try to pass first CAS The synchronization status is changed from 0 Set to 1, If the setting is successful , execute
setExclusiveOwnerThread()
Method , Set the current thread as exclusive ; - If it is not set successfully , Indicates that another thread has changed the synchronization state , Then perform
acquire(int arg)
Method .
``setExclusiveOwnerThread(thread)` The method is java.util.concurrent.locks.AbstractOwnableSynchronizer#setExclusiveOwnerThread Class method :
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
// Set exclusive thread
exclusiveOwnerThread = thread;
}
acquire(int arg)
The method is java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire Methods :
/**
* Get resources in exclusive mode , Ignore interrupt
*
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
/*
Access to resources
1. Try to get resources state
2. Add waiting node Node, The waiting queue is created
3. Get resources from the queue , Infinite loop
4. If the thread is interrupted, set the interrupt state
*/
if (!tryAcquire(arg)) {
// Create the node of the current thread exclusively , And added to the end of the queue
Node node = addWaiter(Node.EXCLUSIVE);
// Access to resources
if (acquireQueued(node, arg)) {
// If getting resources fails , The thread is interrupted
selfInterrupt();
}
}
}
Logic and acquireShared(int arg)
The method is similar to .
- Execute first
tryAcquire(arg)
Method attempts to get the resource ; - If the resource acquisition is successful , If the fetch fails , Then execute
addWaiter(Node.EXCLUSIVE)
Method , Create a synchronization queue node , And put it at the end of the queue , However, it is different from the synchronous queue node in shared mode , It is an exclusive node ; - Then perform
acquireQueued(node, arg)
Method , Access to resources .
among tryAcquire(arg)
Method , It's in ReentrantLock Asynchronism of NonfairSync Has been rewritten ,java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Access to resources in an unfair way
*
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
// Current thread
final Thread current = Thread.currentThread();
// resources
int c = getState();
// Resources for 0, It means that no one has access to resources
if (c == 0) {
// CAS Set resource status
if (compareAndSetState(0, acquires)) {
// If the resource is set successfully, the , Set exclusive thread
setExclusiveOwnerThread(current);
return true;
}
}
// Resources are acquired by them
else if (current == getExclusiveOwnerThread()) {
// If it is the resource obtained by the current thread , Let resources add one
int nextc = c + acquires;
if (nextc < 0) // overflow
{
throw new Error("Maximum lock count exceeded");
}
// Set resource status
setState(nextc);
return true;
}
// Other cases will be returned directly false, Indicates that the resource acquisition failed
return false;
}
Its internal call nonfairTryAcquire(int acquires)
Method , Logic is :
- Get the synchronization state , If the state is 0, Is to CAS The value of the action setting state , If the setting is successful , Re execution
setExclusiveOwnerThread(current)
Method to set the exclusive thread , Method end return true; - If the synchronization state is not 0, perhaps CAS operation failed , Then judge whether the current thread monopolizes the thread . If the thread is an exclusive thread , Indicates that the thread has got the status , The status value is increased ( The meaning of reentering is embodied here ,), Then set the new status value ( There's no need to CAS operation , Because it got the status , Don't worry about being modified, you can modify it directly ,state yes volatile Embellished , A modification of it can be immediately perceived by other threads ), Method end return true;
- If none of the above conditions are met , Then return directly false, Method end .
And then look at acquireQueued(Node node, int arg)
Method , Look at its implementation java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued:
/**
* Acquire resources in exclusive mode , Joining operation
*
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// Infinite loop acquisition
for (;;) {
// Get the previous node
final Node p = node.predecessor();
// Try to get resources
if (p == head && tryAcquire(arg)) {
// Set head node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// Acquisition failure , Set node state
// Blocking threads and checking for interrupts , After the jam , It needs to be released or interrupted by the precursor node , To continue
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// If it fails, cancel the acquisition of resources
if (failed)
cancelAcquire(node);
}
}
Again , The method here is the same as that mentioned above Semphore When you see AQS Of doAcquireSharedInterruptibly(int arg)
The method logic is very similar , The logic is :
- Get the precursor node of the node , If the precursor node is equal to the head node , Indicates that there are no queued nodes , Then perform
tryAcquire(arg)
Method to get resources ; tryAcquire(arg)
Method to obtain resources successfully , Set the node as the head node of the synchronization queue , Then the method ends , return true;- If the precursor node of a node is not a head node , Or an attempt to get resources failed , execute
shouldParkAfterFailedAcquire(p, node)
Method to update the waiting state of the node and determine whether it is to be blocked ; - If it needs to be blocked , execute
parkAndCheckInterrupt()
Block the thread . The thread needs to wait for other threads to wake up or interrupt the thread .
Such a ReentrantLock The lock acquisition process is over , In retrospect , If a thread wants to acquire lock resources , Try to get it first , If the acquisition is successful, set the exclusive thread ; If the fetch fails , Then build a synchronization queue node in exclusive mode of the thread , Add the node to the end of the synchronization queue , Then try to get resources or block , Until it is awakened or interrupted by another thread , Then repeat the above operations of getting resources or blocking .
5.3 ReentrantLock Release the lock
ReentrantLock The operation to release the lock is unlock()
Method ,java.util.concurrent.locks.ReentrantLock#unlock The implementation is as follows :
public void unlock() {
sync.release(1);
}
It calls the synchronizer's release(int arg)
Method ,java.util.concurrent.locks.AbstractQueuedSynchronizer#release The implementation is as follows :
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// Try to free resources
if (tryRelease(arg)) {
// Release successful , obtain CLH Wait for the head node in the queue
Node h = head;
// Judge that it is not empty , And its waiting state is not 0
if (h != null && h.waitStatus != 0) {
// Wake up the successor node
unparkSuccessor(h);
}
return true;
}
return false;
}
The logic of releasing resources is also the same releaseShared(int arg)
similar :
- Execute first
tryRelease(arg)
Try to free resources ; - If the release fails, it will be returned directly false, Method end .
- If the release is successful , Get synchronization queue header node , Judge whether it is not empty , And its waiting state is not 0, Then execute
unparkSuccessor(h)
Method to wake up the subsequent waiting nodes , Then return true.
tryRelease(arg)
Method in ReentrantLock Of Sync The synchronizer is implemented as java.util.concurrent.locks.ReentrantLock.Sync#tryRelease:
protected final boolean tryRelease(int releases) {
// Release resource status
// Current resource minus resource to release
int c = getState() - releases;
// Checks whether the current thread is exclusive to the thread
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
// whether
boolean free = false;
// The released resource status is 0
if (c == 0) {
// Indicates that the resource is idle
free = true;
// Set the exclusive thread to null
setExclusiveOwnerThread(null);
}
// Set resource status
setState(c);
return free;
}
Logic :
- Get synchronization resources , Then subtract to get the new value ;
- Determine whether the current thread is exclusive , If not, an exception is thrown , Describes illegal operations by other threads ;
- Determine whether the new status value is 0, If 0 The resource is idle , Empty exclusive threads . This is mainly for ReentrantLock Many times lock It must correspond to the same number of times unlock Method , Otherwise, resources will not be released .
- Finally, set the new synchronization resource status value , Method end .
5.4 ReentrantLock Summary
ReentrantLock We have learned the process of getting and releasing locks , It's an exclusive acquisition of resources , When a thread acquires a lock , Other threads cannot get it , At the same time, it supports multiple locking , Of course, it also needs the same number of unlocking operations . When a thread wants to acquire the lock resource ( Default unfair lock situation ), Try to get the resource first , If the fetch fails , Create the synchronization queue exclusive mode node of the current thread , And added to the end of the synchronization queue , Then try to get resources or block operations , Until another thread releases the lock resource or interrupts it , The thread will then perform its own operation to acquire the lock resource .
6. ConditionObject Conditions of the object
up to now , We have learned that AQS Characteristics : fair / Unfair 、 Monopoly / share 、 Line up in the same team , Then there is a conditional queue feature .
This conditional queue is special , It is also a queue , Same as synchronization queue Node Node class , But it's a single linked list . It has been used. nextWaiter Field to represent the next waiting node .
Its nodes are defined by the AQS Inner class in ConditionObject To create , The node of the conditional queue shares the AQS Properties of instances ( Synchronize resources 、 Threads ).
The working mechanism of conditional queue is : The nodes of the conditional queue are converted to the nodes of the synchronous queue , Synchronization queue node get resource operation .
ConditionObject The class diagram of is :
It mainly passes await()
and signal()
Related methods are used to block and wake up threads .
6.1 ConditionObject Example
public static void main(String[] args) throws InterruptedException {
// Create a lock
ReentrantLock lock = new ReentrantLock();
// Create a conditional object on a lock
Condition c1 = lock.newCondition();
// t1 Thread call condition wait
new Thread(() -> {
// Lock
lock.lock();
System.out.println(Thread.currentThread().getName() + ": Lock !");
try {
System.out.println(Thread.currentThread().getName() + ": Too tired , I'll go to bed first , Wake me up later !");
// Let conditions wait
c1.await();
System.out.println(Thread.currentThread().getName() + ": I wake up !");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": Unlock !");
// Unlock
lock.unlock();
}
}, "t1").start();
Thread.sleep(1000);
// t2 Thread call condition wake up
new Thread(() -> {
// Lock
lock.lock();
System.out.println(Thread.currentThread().getName() + ": Lock !");
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + ": start to eat , Get up !");
// Conditional arousal
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": Unlock !");
// Unlock
lock.unlock();
}
}, "t2").start();
}
The above code creates a reentry lock , Create a conditional object from a lock , One thread t1 To lock , Call the wait operation of the condition object , Finally release the lock ; Another thread calls the lock , Then call the wake-up operation of the conditional object. , Finally release the lock .
The output result is :
t1: Lock !
t1: Too tired , I'll go to bed first , Wake me up later !
t2: Lock !
t2: start to eat , Get up !
t2: Unlock !
t1: I wake up !
t1: Unlock !
This means ,t1 Program first acquire lock , Then the conditional object is called. await()
Method , Put the thread into a blocked state , Subsequent threads t2 Get lock , Then the conditional object is called. signal()
Wake up the way ,t2 Release the lock ,t1 The thread wakes up and performs the rest of its operations .
6.2 Conditional object wait
Let's look at the creation process of conditional objects ,lock.newCondition()
Method ,java.util.concurrent.locks.ReentrantLock#newCondition:
public Condition newCondition() {
return sync.newCondition();
}
It also calls the synchronizer's create condition method ,java.util.concurrent.locks.ReentrantLock.Sync#newCondition:
final ConditionObject newCondition() {
return new ConditionObject();
}
Then take a look at it await()
Method ,java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await():
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
// Determine whether the thread is interrupted
if (Thread.interrupted()) {
throw new InterruptedException();
}
// Add a conditional wait queue node
Node node = addConditionWaiter();
// Release node resources completely , Returns the saved resource state
int savedState = fullyRelease(node);
// Interrupt mode
int interruptMode = 0;
// Infinite cycle , Determine whether the conditional waiting node exists or not CLH In the synchronization queue
while (!isOnSyncQueue(node)) {
// If it doesn't exist CLH In the synchronization queue , Block the current thread
LockSupport.park(this);
// The current thread is awakened , Check interrupt and wireless wait
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// Here we are , Indicates that the thread has been awakened
// To acquire exclusive mode resources
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
// If the resource acquisition is successful , And the interrupt mode is not THROW_IE
// Set the interrupt mode to REINTERRUPT
interruptMode = REINTERRUPT;
}
// Check the node again , Clean up the condition waiting node of cancelled state
if (node.nextWaiter != null) // clean up if cancelled
{
// Delete the condition waiting node that has been cancelled
unlinkCancelledWaiters();
}
// If the interrupt mode is not 0, The report is interrupted
if (interruptMode != 0) {
// Report interrupt thread
reportInterruptAfterWait(interruptMode);
}
}
/**
* Add a conditional wait to wait right .
*
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
// Gets the last conditional wait node
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// If the last condition has been cancelled , Then clear it
if (t != null && t.waitStatus != Node.CONDITION) {
// Clear waiters for cancellation status
unlinkCancelledWaiters();
// Reacquire the last waiting node
t = lastWaiter;
}
// Add a conditional queue , Its waiting state is Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
// Assign a value to the first waiting node
firstWaiter = node;
} else {
// Add a new node to the last waiting node
t.nextWaiter = node;
}
// Assign a value to the last waiting node
lastWaiter = node;
return node;
}
/**
* Call release with current state value ; Return to saved state . Cancel the node and throw an exception if it fails .
*
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
// Get synchronizer resource status
int savedState = getState();
// Release resources
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// If the release resource fails in the graph , Set the waiting state of the node to cancel state Node.CANCELLED
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}
The main logic here is :
- First check whether the thread is interrupted , If it is interrupted, an exception will be thrown ;
- perform
addConditionWaiter()
Method , Create a conditional queue nodenew Node(Thread.currentThread(), Node.CONDITION)
, Add the node to the end of the conditional queue ; - perform
fullyRelease()
Method , Release resources , The synchronization state is released , At the same time, delete the synchronization queue node corresponding to the current thread ; - perform
isOnSyncQueue(node)
Determine whether the node is not a synchronous queue node , If it is not a synchronous queue node , Then block , Waiting to be awakened or interrupted by another thread . - If the thread is awakened , Is to perform
acquireQueued(node, saveState)
Access to resources ; - Check the node again , Clean up the condition waiting node of cancelled state ;
- Handle interrupt information if necessary .
The logic here can be understood as , The synchronization state thread is currently obtained , If the current thread is in the synchronization queue, remove the node , Then create a conditional wait queue node , And put it at the end of the condition queue , Then block the current thread , Try to get the synchronization resource again when awakened .
6.3 Conditional object wake up
Let's take a look at signal()
Wake up the way ,java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal:
/**
* The thread that will wait the longest ( If there is ) From the waiting queue of the condition to the queue with the lock .
*
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// Gets the first conditional wait node
Node first = firstWaiter;
if (first != null) {
// Perform the condition of wake-up header to wait for the node
doSignal(first);
}
}
private void doSignal(Node first) {
// Infinite loop
do {
// Gets the next wait of the first conditional wait , Assign a value to the first condition wait
if ( (firstWaiter = first.nextWaiter) == null) {
// If it is empty , Description the condition queue is empty , Set the last conditional wait variable to empty as well
lastWaiter = null;
}
// Set the next wait attribute of conditional wait to empty
first.nextWaiter = null;
// Here is the head node of the conditional wait queue
// Convert conditional wait node to CLH Synchronize queue nodes and join CLH In the synchronization queue
// If the conversion node fails , Then, the firstWaiter The node of is assigned to first
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// Set the waiting state of the node , from Node.CONDITION Set to 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
return false;
}
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// Add conditional wait node to CLH The tail of the synchronization queue , Returns the precursor node of the node
Node p = enq(node);
// Get the waiting state of the precursor node
int ws = p.waitStatus;
// If the node wait state is greater than 0, That is, cancel status , Or by CAS Set the waiting state of the precursor node to SIGNAL operation failed , Then wake up the node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
// Only the waiting state of the precursor node is the cancel state , Or set the waiting state of the precursor node to Node.SIGNAL When the failure , The thread that wakes the current node will be executed
LockSupport.unpark(node.thread);
}
return true;
}
The main logic here is , Get condition queue , Remove head node , Then it turns the node into a synchronous queue node , Put to the end of the synchronization queue , Wake up the thread of the node at the same time , Return method .
The wake-up thread starts again to get the synchronization resources .
The workflow of the condition object is as follows :
stay JDK There are a lot of conditional object functions in , such as ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque Blocking queues and so on .
7. Summary
Of course JDK Of JUC There are many other synchronization tool classes in the package that use AQS To complete the synchronization function , Their core principles are the same , They are synchronized state resources by operation , By means of synchronous queue and conditional wait queue, the thread that obtains resources is queued .
Only to understand AQS In order to have a deeper understanding of concurrency related components .
8. Reference material
- java The art of concurrent programming
- JDK 1.8 Source code
The above code has been uploaded to my GitHub Yes .