Simple analysis of eagerthreadpool in Dubbo

learncat 2021-01-21 20:41:46
simple analysis eagerthreadpool dubbo


If you look through Dubbo Code for , We found that there was a ThreadPool Interface , Abstracts various thread pools . among , There is a thread pool implementation that is quite special :EagerThreadPool.

Eager Yes, it means eager 、 Warm hearted means . This thread pool is a simple literal translation , It's a thread pool . This thread pool looks interesting , Before analyzing this thread pool , Let's first introduce JDK Native thread pool .

 

JDK Native thread pool , Can pass Executors.newXXX The way , Quickly create a routing pool ( Some manuals don't recommend this , Mainly Executors Internal implementation , Thread pool queues are infinite by default , Infinite queues in extreme cases , Easy to cause unlimited memory expansion , As a result OOM The problem of , And squeeze too long a task queue , It also makes the user experience bad ).Executors In fact, it's also used internally new ThreadPoolExecutor To create a thread pool .

 

Brief discussion JDK When you use the thread pool , There are several core concepts :corePoolSize;maxPoolSize;BlockingQueue;RejectedExecutionHandler, The simple explanation means :

  • corePoolSize: By default , The number of threads used by the thread pool
  • maxPoolSize: The maximum number of threads that the thread pool can use
  • BlockingQueue: If the number of tasks submitted is greater than the number of threads , Newly submitted tasks may be stored in a blocking queue .
  • RejectedExecutionHandler: When you can't insert it into the blocking queue , It triggers this strategy .

 

On the basis of the above meaning , Introduce JDK The working principle of the built-in thread pool , It will use 【 Foxconn's assembly line 】 Make an analogy , There's no sense of teasing the workers , I just want to make an analogy , It's convenient for friends who don't know how to understand the principle quickly .

A process pool is like a Foxconn production line , There is a ring belt on this production line ( Namely BlockingQueue, If no capacity is set , It can be thought of as a belt of unlimited capacity , Here the length is set to fit 100 A box ), The belt has stood twice N One worker (corePoolSize), Next to it M A worker on call (maxPoolSize-corePoolSize), There's a sensor on top of the belt (RejectedExecutionHandler), Responsible for controlling the operation of the belt . When the belt is running , The worker took the box off the belt , And seal it with tape , Put aside , And sealing tape takes time . that ,JDK The default working mode of is as follows :

  • When you put one or two boxes on the belt, piecemeal , Because the number is far less than N The number of workers , So it was quickly taken away , So much so that there's hardly any boxes left on the belt . Now :N Some of the workers can be lazy .
  • Keep adding boxes to the belt , Up to N Each worker had a box in his hand and was sealing tape . here : The boss is a little satisfied ,N None of the workers is lazy , But the boss is not satisfied , Keep putting boxes on the belt .
  • here ,N All the workers are sealing their boxes with tape , The extra boxes on the belt are ignored , These boxes can only be left on the belt and rotated in circles , That is, the submitted task cannot be executed immediately , Can only be stored in BlockingQueue in . If at this time , No more boxes on the belt , that N I'm a worker , The workers who are free will take the boxes off the belt , Until the boxes on the assembly line are disposed of . conversely , If you keep putting boxes on the belt , There will be more and more boxes on the belt .
  • If there are more and more boxes on the belt , It will reach a capacity limit , for example : This is the maximum 100 A box . here , There's no room for the box on the belt , here , The boss starts from the lounge M A worker on call ( Create a thread ), The production line is almost full , Get to the belt .
  • M A worker on call , Get to the belt right away , Start taking over the new boxes . There are two endings : incoming M I'm a standby worker. I'm very effective , We're packing a lot of boxes right away , As long as the belt is not full , this M A standby worker can rest , After enough rest , this M A standby worker can leave the belt , Back in the lounge ( Release this part of the temporarily created thread ). Another ending is : There are too many boxes , Plus M A standby worker is not enough , here , Add the box , I can't handle it at all , The sensors that triggered the production line (RejectedExecutionHandler Strategy ), Decide how to deal with these boxes according to the strategy .

 

As can be seen from the above analogy , By default ,JDK Our thread pool scheduling strategy is a very greedy factory owner . By default , Only the smallest number of people will be used to maintain the operation , That's what we set up corePoolSize. Even if all of a sudden there are a lot of tasks , He's not going to increase his staff immediately , Instead, put the task in the queue and wait , Let the smallest person try to handle the most tasks . If the queue is really full , He'll be quick to recruit another team ( Namely maxPoolSize-corePoolSize) To deal with it urgently . And once the queue is full , New recruits are free for a while , He immediately dismissed the temporary staff (maxPooleSize Out of the thread , After a period of leisure , Being recycled ). All in all ,JDK The boss's default strategy is : The smaller the thread overhead, the better , Once the redundant people are free , Just let you go .

 

But this strategy , It doesn't work in any scenario , especially RPC Communication framework and so on . Let's imagine the scene , hypothesis RPC In the business processing thread pool of corePoolSize yes 10,maxPoolSize yes 40, The task queue length is 100, So at this time :

If all of a sudden 30 individual RPC Please come here , And this 30 individual RPC Business is time consuming , At this time only 10 individual RPC The request is responding and executing , The rest 20 individual RPC The request is still in the task queue . Because the task queue is 100, It's not full yet , So no extra threads will be created to handle . It's time to squeeze 100 individual RPC request , To start creating new threads , To deal with this RPC Business , This is unacceptable .

So at this time , There's a solution , Is to put corePoolSize and maxPoolSize Such as 40. But in this case , It's a waste of resources , because 40 Threads are the maximum number of threads that can be backed up , Usually there is not such a large amount of . But if corePoolSize==maxPoolSize, At this point, even if all threads are idle , There's no way to clean up and recycle to free up resources , because corePoolSize It's not recycled .

 

 

that , Is there a thread pool , The following functions can be realized , stay corePoolSize yes 10,maxPoolSize yes 40, The task queue length is 100 when :

  • If there is 20 Please come here ,corePoolSize Of 10 When there are not enough threads , Immediately create 10 Three threads to , Deal with this immediately 20 A request .
  • When there is at the same time 50 Please come here , More than... Threads have been created maxPoolSize Of 40 When , I can't deal with it any more 10 One in the task queue .
  • When there are fewer requests ,maxPoolSize The one you created 30 Release the extra threads , Release resources .

This article will discuss EagerThreadPool It's the thread pool to solve this problem , stay Dubbo When used in , Can be targeted and efficient processing RPC request .

EagerThreadPool Mainly by EagerThreadPool、EagerThreadPoolExecutor、TaskQueue With the realization of this function , The specific logic analysis is as follows :

1.ThreadPoolExecutor The scheduling strategy of

2.Dubbo Self defined TaskQueue Design

3.Dubbo Of EagerThreadPoolExecutor and TaskQueue With

 

1.ThreadPoolExecutor The scheduling strategy of

With ThreadPoolExecutor Of execute Analysis of the method of submitting tasks

 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// If the number of tasks run by the thread pool is less than corePoolSize, Directly by the core Thread running
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// If the number of tasks run by the thread pool is greater than or equal to corePoolSize, Try to stage the submitted task to the task queue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// If the task queue refuses to store the newly submitted task , Try to create maxPoolSize The remaining threads in the , Direct execution
else if (!addWorker(command, false))
// If maxPoolSize It's all used up , go reject Refusal strategy
reject(command);
}

 

The image above JDK The scheduling logic of , The logic as like as two peas of Foxconn's pipeline is exactly the same . But this logic requirement is not satisfied Dubbo such RPC The scene of communication , have a look Dubbo How to deal with it .

 

2.Dubbo Self defined TaskQueue Design

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
// and dubbo Their own EagerThreadPoolExecutor Deep cooperation , The two cooperate to achieve this scheduling
private EagerThreadPoolExecutor executor;
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
// Cover JDK default offer Method , Integrated into EagerThreadPoolExecutor Property reading for
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// If the number of tasks currently running is less than the total number of threads currently in the thread pool , No matter the , One thread per task , Tube is enough , Go directly JDK The original logic of
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// Can walk here , This indicates that the number of tasks currently running is greater than the number of threads currently running in the thread pool ; There will be tasks and no threads available , Need to deal with this situation
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
// If it is found that the current number of thread pools has not reached the maximum maxPoolSize, return false; inform ThreadPoolExecutor , Failed to insert into task queue .
// This step , We need to cooperate first EagerThreadPoolExecutor.execute Let's see ,EagerThreadPoolExecutor and TaskQueue It's deep cooperation
//EagerThreadPoolExecutor.execute The main logic is super.execute(command); So back to ThreadPoolExecutor.execute The scheduling logic of
// Combined with the above ThreadPoolExecutor.execute The scheduling logic of , Let's think about when , Would call Queue Of offer Method
// Yes , When EagerThreadPoolExecutor.execute When it comes to execution , Find out corePoolSize Is already full , I'll start with the task offer Add to task queue , If the task queue is full , Refuse to add , So thread pool , Will start trying to create a new thread right away .
// Go straight back here false, That is to force the thread pool to create threads immediately . return false; } // Can walk here , Describes the current number of threads , It's already arrived maxPoolSize 了 , There are no tricks at this time , You can only call the original offer Logic , Really insert... Into the task queue . return super.offer(runnable); } } // although It looks simple , Logic is universal , But there are still some details to deal with , To be specific, see EagerThreadPoolExecutor The design of the .

 

3.Dubbo Of EagerThreadPoolExecutor and TaskQueue With

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
submittedTaskCount.incrementAndGet();
try {
// Call the original thread pool execute, With custom TaskQueue, Realize if corePoolSize Full of ,offer To taskQueue return false, Force thread creation
super.execute(command);
} catch (RejectedExecutionException rx) {
// Here's to try again retryOffer, Try inserting the task into the task queue again , Taking into account TaskQueue Side effects . Combine the following ThreadPoolExecutor.execute see , Skip here
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
public class ThreadPoolExecutor{
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// When corePoolSize When it's not enough ,workQueue In fact, that is Dubbo Of TaskQueue, Here, forced return false, Don't take this if The branch of
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// Let's go else After the branch , Force thread creation . But once it reaches maxPoolSize after , Create failure , Will enter reject technological process
// Actually , Now , We don't want to go reject technological process . Because of the above Dubbo Of TaskQueue It's forced return false, In order to promote the creation of new threads out immediately .
// But , If it fails , Still want to join the task queue waiting , Not directly reject Refuse . That's why it's up there catch (RejectedExecutionException rx)
// above catch (RejectedExecutionException rx) Inside , Try TaskQueue.retryOffer I really can't put it in here , That means it's really time to execute the denial policy
else if (!addWorker(command, false))
reject(command);
}
}

 

Look at the big picture , Species Dubbo I'm cheating ThreadPoolExecutor The feeling of , There are side effects of this deception , I have to go back to EagerThreadPoolExecutor in catch (RejectedExecutionException rx) To alleviate this problem . But on the whole , This strategy is very effective , You can quickly expand threads , Give Way RPC The communication delay is lower , More responsive , And it doesn't consume a lot of resources when it's idle . The design is very clever , You can do it in a specific scenario of your own business , A similar scheme is adopted .

 

版权声明
本文为[learncat]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210121204051782o.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云