Java thread pool - work queue

Scum man junior four 2021-09-15 03:53:40
java thread pool work queue


Work queue of thread pool

img.png

ArrayBlockingQueue

It is implemented by array , And use re-entry lock ReentrantLock To do concurrency control , Whether adding or reading , You have to get a lock before you can operate It can be seen that... Is used for reading and writing operations ReentrantLock,ArrayBlockingQueue You need to specify a capacity for it

 public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

SynchronousQueue

because SynchronousQueue The source code is more complex , There's a lot of Cas operation ,SynchronousQueue There is no container , So there is no task in it , When a producer thread produces a task When , If there is no corresponding consumer consumption , Then the producer will always block , Until there are consumers .
Icon :
img_1.png  The following code , If we comment out the consumer thread, execute , So where will the producer be blocked all the time

package thread.customthreadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
* test SynchronousQueue
*/
public class SynchronousQueueTest {
private static final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
private static final ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) {
/**
* Provider
*/
service.submit(() -> {
try {
synchronousQueue.put("liu");
}catch (Exception e){
e.printStackTrace();
}
System.out.println("Consumer finished spending");
});
/**
* Consumer
*/
service.submit(() ->{
try {
synchronousQueue.take();
}catch (Exception e){
e.printStackTrace();
}
System.out.println("take over");
});
}
}

LinkedBlockingDeque

LinkedBlockingDeque It's a two-way queue , The bottom layer uses single linked list to realize , Any paragraph can read and write elements , Initializing LinkedBlockingDeque When , We can specify the capacity , Or not specified , If you don't specify , The capacity is Integer.MAX_VALUE,

notes :Deque It's a two terminal queue , and Queue It's a single ended queue , Double ended means that both ends can be read and written , A single end can only enter... From one end , One end goes out (FIFO)
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
package thread.customthreadpool;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingDequeTest {
private static final LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>();
public static void main(String[] args) throws InterruptedException {
deque.put(1);
deque.put(2);
deque.put(3);
deque.put(4);
deque.put(5);
System.out.println(deque);
System.out.println("deque size "+deque.size());
deque.take();
deque.take();
deque.take();
deque.take();
deque.take();
System.out.println(deque);
System.out.println("deque size "+deque.size());
}
}
 

img_2.png

LinkedBlockingQueue

The bottom layer is based on one-way connected table , It's a one-way queue , First in first out (FIFO) characteristic , Used ReentrantLock To do concurrency control , Read and write operations are locked

private final ReentrantLock putLock = new ReentrantLock();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

DelayDeque

DelayDeque It's an infinite line , Added to the DelayDeque The elements of will go through compareTo Method to calculate , Then according to the time Sort , The element at the top of the queue is the earliest to expire , The later it is, the longer it will expire ,DelayDeque Can only accept Delayed Interface type As shown in the figure , The elements in the queue do not follow the first in first out rule , But according to the expiration time
img_3.png

Example
package thread.customthreadpool.delayDeque;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class MyDelayed implements Delayed {
private final String taskName ;
private final long nowTime = System.currentTimeMillis();
private final long expireTime ;
public MyDelayed(String taskName,long expireTime) {
this.taskName = taskName;
this.expireTime = expireTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((nowTime+expireTime) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
MyDelayed myDelayed = (MyDelayed) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "MyDelayed{" +
"taskName='" + taskName + '\'' +
", nowTime=" + nowTime +
", expireTime=" + expireTime +
'}';
}
}
package thread.customthreadpool.delayDeque;
import java.util.concurrent.*;
public class MyDelayQueue {
private static final DelayQueue<MyDelayed> delayQueue = new DelayQueue<>();
private static final ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
service.submit(() -> {
delayQueue.put(new MyDelayed("A-Task",5000));
delayQueue.put(new MyDelayed("B-Task",4000));
delayQueue.put(new MyDelayed("C-Task",3000));
delayQueue.put(new MyDelayed("D-Task",2000));
delayQueue.put(new MyDelayed("E-Task",1000));
});
while (true){
System.out.println(delayQueue.take());
}
}
}
 
result

img_4.png

Application scenarios

1. Meituan takeout order : When we place an order, we don't pay ,30 The order will be automatically cancelled in minutes
2. cache , For certain tasks , Need to be cleaned at a specific time ;
and so on

LinkedTransferQueue

When the consuming thread fetches an element from the queue , If the queue is empty , Then generate one as null The node of , The consumer thread has been waiting , At this point, if the producer thread finds a in the queue null node , It won't join the team , Instead, fill the element into this null Node and wake up the consumer thread , Then the consumer thread takes the element .
LinkedTransferQueue yes SynchronousQueue and LinkedBlockingQueue Integration of , High performance , Because there is no lock operation , SynchronousQueue Cannot store element , and LinkedTransferQueue Can store elements ,

PriorityBlockingQueue

PriorityBlockingQueue Is an unbounded blocking queue , At the same time, it is a queue that supports priority , Read and write operations are based on ReentrantLock, Internal use of heap algorithm to ensure that every time out of the queue is the highest priority element

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

 

版权声明
本文为[Scum man junior four]所创,转载请带上原文链接,感谢
https://javamana.com/2021/09/20210909105741714W.html

  1. SpringMVC入門到進階,惡補這份“阿裏面試寶典”,
  2. 憑借這份《數據結構與算法》核心文檔,騰訊Android面試必問
  3. Spring MVC Starter to Advanced Level, Repair this "Ali interview Dictionary",
  4. Avec ce document de base sur la structure et l'algorithme des données, Tencent Android interview doit demander
  5. Le schéma de principe de Spring MVC, mon Alipay 3 nouilles de pâte de beauté 4 nouilles de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte de pâte
  6. Résumé des connaissances de base de Spring MVC, plus de 500 personnes interviewent Ali,
  7. Java Architect Books Recommendation, Big Factory Java interview Summary detailed Answers!
  8. Solution de capture de paquets https sur la plate - forme Android et analyse des problèmes, Android développe un apprentissage complet
  9. Understanding of contextloaderlistener in spring configuration
  10. How to modify the default banner information when springboot starts
  11. springmvc面试题及答案,Java岗面试12家大厂成功跳槽,
  12. springmvc面试题2020,十年Java编程开发生涯,
  13. springmvc视频百度云,SpringBoot项目瘦身指南,
  14. Spring MVC Video Baidu Cloud, Spring Boot Project Slim Guide,
  15. Spring MVC Interview Question 2020, dix ans de développement de programmation Java,
  16. Spring MVC interview Questions and Answers, Java post interview 12 Big Factory successfully Jumping,
  17. Docker Xiaobai went to the dockerfile analysis and demonstration of actual combat. It was really easy
  18. 【 Cao gong】 deuxième moitié du conteneur Maven IOC: Google guice
  19. Jdk17 New Feature details
  20. Une ligne de code Java implémente l'équipement d'échange dans le jeu
  21. spring教程极客学院,Java这些高端技术只有你还不知道,
  22. Spring事务扩展机制,2021大厂Java知识点总结 面试题解析!
  23. SpringSecurity如何实现加密和解码,Spring事务是如何传播的?
  24. Interviewer: do redis transactions satisfy atomicity?
  25. SpringSecurity如何實現加密和解碼,Spring事務是如何傳播的?
  26. Comment la sécurité printanière implémente - t - elle le chiffrement et le décodage, et comment les transactions printanières se propagent - elles?
  27. Spring transaction extension Mechanism, 2021 Big Plant Java Knowledge point Summary Interview Question Analysis!
  28. Spring Tutorial geek Academy, Java, ces technologies haut de gamme que vous ne connaissez pas encore,
  29. Sword finger offer plan 9 (dynamic programming medium version) -- Java
  30. 2020 Summary - don't worry about the present, don't worry about the future, Java Web interview questions
  31. Huawei cloud guassdb (for redis) released a new version, and the two core features were officially unveiled
  32. Utilisez @ async dans Spring boot pour effectuer des appels asynchrones et accélérer l'exécution des tâches!
  33. Méthodes de fonctionnement des tableaux couramment utilisées en javascript
  34. MySQL practice 45 lectures (21-25) - Notes
  35. Java programmers must master 10 open source tools!
  36. Enterprise Understanding Spring CIO
  37. tomcat常见面试题,聊聊你对分布式锁技术方案的理解,
  38. spring源码视频教程,我是如何收割多家大厂offer的?
  39. spring框架教程推荐,2021最新爱奇艺Java社招面试题目,
  40. tomcat常見面試題,聊聊你對分布式鎖技術方案的理解,
  41. Spring事務擴展機制,2021大廠Java知識點總結 面試題解析!
  42. Des questions d'entrevue communes à Tomcat pour discuter de votre compréhension de la technologie de verrouillage distribué,
  43. Java Development Interview Question with Answers, 2021 Big Factory Java Knowledge point Summary Interview Question Analysis!
  44. Android 400 questions d'entrevue pour vous aider à entrer dans l'usine, un tour pour vous apprendre à comprendre netty
  45. Les développeurs Java, les entretiens techniques avec le magnat d'Ali,
  46. Java + SSM Maotai Liquor e - commerce Platform for Computer Graduation DesignVente d'alcool
  47. spring框架教程推薦,2021最新愛奇藝Java社招面試題目,
  48. Recommandé pour le tutoriel Spring Framework, 2021 dernière question d'entrevue d'embauche de la société aiqiyi Java,
  49. Spring source Video tutoriel, comment récolter plusieurs grandes usines?
  50. Springcloud learning notes (I)
  51. Basic knowledge of hive (III) complete collection of functions in hive
  52. spring源碼視頻教程,我是如何收割多家大廠offer的?
  53. 【微信小程序】,看看最新BTA大厂的Java程序员的招聘技术标准,
  54. 【工作经验分享】,2021最新百度、头条等公司Java面试题目,
  55. 【微信小程序】,看看最新BTA大廠的Java程序員的招聘技術標准,
  56. [Widget Wechat] pour voir les dernières normes techniques de recrutement des programmeurs Java de BTA.
  57. Video | Tencent cloud enterprise MySQL (CDB) - dynamic thread pool function
  58. Upgrade your Linux PC hardware using open source tools
  59. RTFM! How to read (and understand) the magic man pages in Linux
  60. [partage d'expérience de travail], 2021 les dernières questions d'entrevue Java de Baidu, Headlines, etc.