Java high concurrency BlockingQueue important implementation class

Entry station 2021-01-23 01:30:26
java high concurrency blockingqueue important


ArrayBlockingQueue

Bounded blocking queues , Inside is an array , There's a boundary, which means : The capacity is limited , Must be initialized , Specify its capacity size , Store data in a first in first out way , The latest one is on the opposite end , The first object to be removed is in the head .
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** Queue element */
final Object[] items;
/** The location of the next read operation , poll, peek or remove */
int takeIndex;
/** The location of the next write operation , offer, or add */
int putIndex;
/** Element quantity */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
* It uses a ReentrantLock And the corresponding two Condition To achieve .
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/** Specify size */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* Specify capacity size and access policy
* @param fair Specifies whether the exclusive lock is fair or unfair . The throughput of unfair locks is relatively high , Fair locking ensures that the longest waiting thread gets the lock every time ;
*/
public ArrayBlockingQueue(int capacity, boolean fair) {}
/**
* Specified capacity 、 Specifies the access policy and initially contains the elements in a given collection
* @param c The elements in this collection are added to the queue during method construction
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {}
}
  • ArrayBlockingQueue Putting data in producers and getting data in consumers , They all share a lock object , It also means that they can't really run in parallel . Analyze according to the principle of implementation ,ArrayBlockingQueue Detachable lock is fully available , In order to realize the complete parallel operation of producer and consumer operation . But it's not , because ArrayBlockingQueue It's light enough to write , So as to introduce independent locking mechanism , In addition to the extra complexity of the code , It doesn't take any advantage in performance .
  • We know from the constructor that , Parameters fair Control whether fair lock is used inside the object , Default to unfair lock .
  • items、takeIndex、putIndex、count And so on are not used volatile modification , This is because accessing these variables ( Get... By method ) It's all in the lock , There's no visibility problem , Such as size().
  • There's another exclusive lock lock It is used to lock the access and operation , This results in only one thread being able to access both in and out of the queue at the same time .

Put Source code analysis

/** Join the team */
public void put(E e) throws InterruptedException {
//e by null, Throw out NullPointerException abnormal
checkNotNull(e);
// Acquire exclusive lock
final ReentrantLock lock = this.lock;
/**
* lockInterruptibly()
* Get lock , Unless the current thread is interrupted
* If the lock is not occupied by another thread and returns immediately , Set the lock count to 1.
* If the current thread has saved the lock , Then the hold count will be incremented 1, This method returns immediately .
* If the lock is held by another thread , The current thread will be disabled for thread scheduling , And it's dormant
*
*/
lock.lockInterruptibly();
try {
// Empty queue
while (count == items.length)
// Carry out conditional wait processing
notFull.await();
// Joining operation
enqueue(e);
} finally {
// Release the lock
lock.unlock();
}
}
/** The real team */
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// Get current element
final Object[] items = this.items;
// Press the next Insert Index to add elements
items[putIndex] = x;
// Calculate the subscript that the next element should hold , It can be understood as a circular queue
if (++putIndex == items.length)
putIndex = 0;
count++;
// Arousing consumers
notEmpty.signal();
}
Here, because the lock is placed before the operation of the shared variable , So there is no memory invisibility problem , The shared variables obtained after locking are all obtained from main memory , Not in CPU The value in the cache or register , After releasing the lock, the modified shared variable value will be refreshed to the main memory .

In addition, the queue is implemented using a circular array , So there's something special about calculating the subscript of the next element . in addition insert After the call notEmpty.signal(); To activate the call notEmpty.await(); Put in after blocking notEmpty Thread of condition queue .

Take Source code analysis

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// There's something special here
if (itrs != null)
// Keep the elements in the queue consistent with those in the iterator
itrs.elementDequeued();
notFull.signal();
return x;
}
Take Operation and Put The operation is very similar
// Iterators of this class , All iterators share data , Queue changes affect all iterators
transient Itrs itrs = null; // It holds all iterators created so far .
/**
* Shared data between iterators and their queues , Allow changes to update iterators when queue elements are deleted .
*/
class Itrs {
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
// The number in the queue is 0 When , The queue is empty , All iterators are cleaned up and removed
queueIsEmpty();
//takeIndex The subscript of is 0, It means the queue is finished at the end , Back to the head
else if (takeIndex == 0)
takeIndexWrapped();
}
/**
* What to do when the queue is empty
* 1. Notify all iterators that the queue is empty
* 2. Clear all weak references , And leave the iterator empty
*/
void queueIsEmpty() {}
/**
* take takeIndex Package as 0
* And inform all iterators , And delete any objects that have expired ( Personal understanding is an empty object )
* That is to say, in a direct way Blocking When the queue goes out , Data synchronization in iterators , Keep the elements in the queue consistent with those in the iterator .
*/
void takeIndexWrapped() {}
}

Itrs The timing of iterator creation

// I know from here , stay ArrayBlockingQueue This method is called in the object , To generate this object
// Then it can be understood as , As long as this method is not called , be ArrayBlockingQueue Object Itrs Object is empty
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
Itr() {
// This is where it's made
//count be equal to 0 When , The iterator created is a useless iterator , It can be removed directly , Get into detach Pattern .
// Otherwise, the read position of the current queue is given to the iterator as the next element ,cursor Store the location of the next element .
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
}
}

Code demonstration

package com.rumenz.task;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @className: BlockingQuqueExample
* @description: TODO Class description
* @author: mac
* @date: 2021/1/20
**/
public class BlockingQueueExample {
private static volatile Boolean flag=false;
public static void main(String[] args) {
BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
try{
blockingQueue.put(1);
Thread.sleep(2000);
blockingQueue.put(3);
flag=true;
}catch (Exception e){
e.printStackTrace();
}
});
executorService.execute(()->{
try {
while (!flag){
Integer i = (Integer) blockingQueue.take();
System.out.println(i);
}
}catch (Exception e){
e.printStackTrace();
}
});
executorService.shutdown();
}
}

LinkedBlockingQueue

Blocking queue based on linked list , through ArrayBlockingQueue similar , It also maintains a data buffer queue inside ( The queue consists of a linked list ), When the producer puts a data into the queue , Queues get data from producers , And cache it inside the queue , And the producer immediately returns , Only when the queue buffer reaches the maximum capacity (LinkedBlockingQueue This value can be specified through the constructor ), To block the queue , Until the consumer consumes a piece of data from the queue , Producers will be awakened , On the contrary, the processing of consumers is based on the same principle .

LinkedBlockingQueue The reason why it can efficiently handle concurrent data , It also uses independent locks for producers and consumers to control data synchronization , This also means that in the case of high concurrency, producers and consumers can operate the data in the queue in parallel , To improve the concurrency of the entire queue .

If you construct a LinkedBlockingQueue object , Without specifying the capacity size ,LinkedBlockingQueue A capacity similar to infinite size will be defaulted Integer.MAX_VALUE, In this case , If the speed of the producer is faster than that of the consumer , Maybe not until the queue is full , It's possible that the system memory has been exhausted .

LinkedBlockingQueue Is a blocked queue that uses a linked list to complete queue operations . A linked list is a one-way linked list , Instead of a two-way list .

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// The capacity of the queue , Specify the size or as the default Integer.MAX_VALUE
private final int capacity;
// Number of elements
private final AtomicInteger count = new AtomicInteger();
// Queue head node , Always satisfied with head.item==null
transient Node<E> head;
// The tail node of the queue , Always satisfied with last.next==null
private transient Node<E> last;
/** Lock held by take, poll, etc */
// The lock out of the team :take, poll, peek The method of wait read operation needs to get the lock
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
// When the queue is empty , Save the thread executing the queue : If the queue is empty during the read operation , So wait notEmpty Conditions
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
// The lock to join the team :put, offer The method of waiting for the write operation needs to obtain the lock
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
// When the queue is full , Save the thread executing the queue : If the queue is full while writing , So wait notFull Conditions
private final Condition notFull = putLock.newCondition();
// The legendary boundless line
public LinkedBlockingQueue() {}
// The legendary bounded queue
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// The legendary boundless line
public LinkedBlockingQueue(Collection<? extends E> c){}
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - The real successor node
* - This node , It means that the successor is head.next
* - empty , It means there is no successor ( This is the last node )
*/
Node<E> next;
Node(E x) { item = x; }
}
}
Through its constructor , We know that it can be used as an unbounded queue or a bounded queue .

Two locks are used here takeLock and putLock, and Condition Namely notEmpty and notFull, They go together like this .

  • If you need to get (take) An element , Need to get takeLock lock , But getting the lock is not enough , If the queue is now empty , You also need the queue not to be empty (notEmpty) This condition (Condition).
  • If you want to insert (put) An element , Need to get putLock lock , But getting the lock is not enough , If the queue is full at this point , We still need to be dissatisfied (notFull) This condition of (Condition).
As you can see from the constructor above , An empty header node will be initialized here , So when the first element enters the team , There will be two elements in the queue . When reading elements , It is also an element after the header node .count The count value of does not include this head node .

Put Source code analysis

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* Inserts the specified element at the end of this queue , If necessary, , Wait for space to become available .
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// If you're wondering why this is -1, You can see offer Method . This is a sign of success 、 It's just a sign of failure .
int c = -1;
// Package as node node
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// Get lock
putLock.lockInterruptibly();
try {
/** If the queue is full , wait for notFull Satisfy the condition of . */
while (count.get() == capacity) {
notFull.await();
}
// The team
enqueue(node);
// Atomic self increasing
c = count.getAndIncrement();
// If this element enters the team , There is at least one slot available , call notFull.signal() Wake up waiting thread .
// Which threads are waiting for notFull This Condition How about it ?
if (c + 1 < capacity)
notFull.signal();
} finally {
// Unlock
putLock.unlock();
}
// If c == 0, So the queue is empty before this element joins the queue ( barring head Blank nodes ),
// So all the read threads are waiting notEmpty This condition , Waiting to wake up , Do a wake-up operation here
if (c == 0)
signalNotEmpty();
}
/** The link node is at the end of the queue */
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// The code for joining the team is very simple , Will be last Attribute points to this new element , And let the original team come to the end next Point to this element
//last.next = node;
//last = node;
// There's no concurrency here , Because only to get putLock After exclusive lock , To do this
last = last.next = node;
}
/**
* wait for PUT The signal
* Only in take/poll Call in
* in other words : After element joined the team , if necessary , Call this method to wake up the reader thread to read
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();// Wake up the
} finally {
putLock.unlock();
}
}
}

Take Source code analysis

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// First , Need to get to takeLock To get out of the team
takeLock.lockInterruptibly();
try {
// If the queue is empty , wait for notEmpty If this condition is met, the execution will continue
while (count.get() == 0) {
notEmpty.await();
}
//// Out of the team
x = dequeue();
//count Do atomic subtraction 1
c = count.getAndDecrement();
// If this time out of the team , There is at least one element in the queue , So called notEmpty.signal() Wake up other read threads
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
* Out of the team
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
}

And ArrayBlockingQueue contrast

ArrayBlockingQueue It's a shared lock. , Large grain size , The only way to get in and out of the team is 1 Two are executed , Parallel execution is not allowed .LinkedBlockingQueue Is an exclusive lock , Joining and leaving the team can be done in parallel . Of course, it's about reading and writing in parallel , The reading and writing of the two cannot be in parallel . The conclusion is LinkedBlockingQueue It can read and write simultaneously .

ArrayBlockingQueue and LinkedBlockingQueue There's another obvious difference between them , The former does not generate or destroy any additional object instances when inserting or deleting elements , The latter will generate an additional Node object . In a system that requires efficient and concurrent processing of large amounts of data over a long period of time , Its effect on GC There are still some differences in the impact of .

LinkedBlockingQueue Implement a thread to add file objects , Four threads read file objects

package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue {
static long randomTime() {
return (long) (Math.random() * 1000);
}
public static void main(String[] args) {
// Can accommodate 100 File
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
// Thread pool
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("F:\\JavaLib");
// Completion mark
final File exitFile = new File("");
// Number of readings
final AtomicInteger rc = new AtomicInteger();
// Write a number
final AtomicInteger wc = new AtomicInteger();
// Read thread
Runnable read = new Runnable() {
public void run() {
scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isDirectory()
|| pathname.getPath().endsWith(".java");
}
});
for (File one : files)
scanFile(one);
} else {
try {
int index = rc.incrementAndGet();
System.out.println("Read0: " + index + " "
+ file.getPath());
queue.put(file);
} catch (InterruptedException e) {
}
}
}
};
exec.submit(read);
// Four write threads
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new Runnable() {
String threadName = "Write" + NO;
public void run() {
while (true) {
try {
Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = queue.take();
// The queue has no objects
if (file == exitFile) {
// Add again " sign ", So that other threads can exit normally
queue.put(exitFile);
break;
}
System.out.println(threadName + ": " + index + " "
+ file.getPath());
} catch (InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}

Pay attention to WeChat public number :【 Entry station 】, Unlock more knowledge .

版权声明
本文为[Entry station]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210122234000274G.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云