Dubbo中的EagerThreadPool的简单分析

learncat 2021-01-21 20:41:11
简单 Dubbo 分析 eagerthreadpool


如果翻阅Dubbo的代码,发现其内部有一个ThreadPool接口,抽象了各种线程池。其中,有一个线程池实现比较特殊:EagerThreadPool。

Eager是的英文意思是渴望的、热心的意思。这个线程池简单直译一下,就是热心的线程池。这个线程池看起来比较有趣,在分析这个线程池之前,先介绍JDK自带的线程池。

 

JDK自带的线程池,可以通过Executors.newXXX的方式,快速创建出线程池(一些手册会不建议这么使用,主要是Executors内部的实现中,线程池队列默认都是无限长的,无限长的队列在极端情况下,容易造成内存无限扩充,从而造成OOM的问题,而且挤压太长时间的任务队列,也会让用户体验不好)。Executors内部其实也是采用new ThreadPoolExecutor的方式创建线程池的。

 

简单讨论JDK的线程池的时候,核心概念有几个:corePoolSize;maxPoolSize;BlockingQueue;RejectedExecutionHandler,简单解释含义就是:

  • corePoolSize:默认情况下,线程池使用的线程数
  • maxPoolSize:线程池所能使用的最大线程数
  • BlockingQueue:如果提交的任务大于线程数,新提交的任务将可能被存储在一个阻塞队列里。
  • RejectedExecutionHandler:无法向阻塞队列里插入的时候,会触发这个策略。

 

在以上含义的基础上,介绍JDK自带的线程池的工作原理,这里会用【富士康的流水线】做一个类比的解释,这里没有任何调侃工人的意思,只是想以此做一个类比,方便还不了解的朋友快速理解原理。

一个线程池就像是富士康的一条生产线,这个生产线上有一条环形皮带(就是BlockingQueue,如果没有设置容量,可以认为是无限容量的皮带,这里长度设置为可以放100个箱子),皮带两遍站了N个工人(corePoolSize),旁边还有M个待命的工人(maxPoolSize-corePoolSize),在皮带的上方有一个传感器(RejectedExecutionHandler),负责控制皮带的运行。当皮带运行起来的时候,工人从皮带上取下箱子,并用胶带封好,放到一边,而封胶带是需要时间的。那么,JDK的默认工作模式如下:

  • 皮带上零零星星的放上一两个箱子的时候,因为数量远小于N个工人的数量,所以很快就被人拿走,以至于皮带上几乎看不到滞留的箱子。这时候:N个工人里可以有些人偷懒。
  • 继续向皮带上增加箱子,一直增加到N个工人的每人手上都拿着一个箱子在封胶带。此时:老板稍微满意了,N个工人没一个偷懒的,但是老板还不满足,继续向皮带上放箱子。
  • 此时,N个工人都在用胶带封各自的箱子,皮带上再增加的箱子没人理会了,这些箱子只能留在皮带上转圈圈,也就是提交的任务无法立刻执行,只能存储在BlockingQueue里。如果此时,不再继续向皮带上放箱子,那么N个工人里,空出手来的工人会取下皮带上的箱子,直至流水线上的箱子被处理完毕。反之,如果继续向皮带上放箱子,皮带上挤压的箱子会越来越多。
  • 如果皮带上的箱子越来越多,就会达到一个容量极限,例如:这里达到了最大值100个箱子。此时,皮带上已经没有空间放箱子了,此时,老板就从休息室发动那M个待命的工人(创建线程出来),生产线快满了,你们快到皮带旁边。
  • M个待命的工人,马上赶到皮带旁边,开始接手新放的箱子。此时有两个结局:新来的M个待命工人战斗力很强,马上就包装完很多箱子,只要皮带上不满,这M个待命工人就可以休息了,在休息够一定的时间后,这M个待命工人就可以离开皮带,回到休息室(释放这部分临时创建的线程)。另外一个结局是:箱子太多了,再加了M个待命工人还是不够,此时,再加箱子,完全处理不了了,触发了生产线的传感器(RejectedExecutionHandler策略),根据策略决定如何处理这些放不下的箱子。

 

由上面的类比可以看出,在默认情况下,JDK的线程池调度策略是一个非常贪婪的工厂老板。他默认情况下,只会使用最小的人手来维持运转,也就是我们设定的corePoolSize。即使突然来了大量的任务,他也不会立刻增加人手,而是先把任务放在队列里等待,让这最小的人手想办法处理完最多的任务。如果真的是队列都要满了,他才会赶快另外招募一批人手(就是maxPoolSize-corePoolSize)来紧急处理。而一旦队列不满了,新招的人手空闲一阵,他立刻就辞退这些临时人手(maxPooleSize出来的线程,会在空闲一段时间后,被回收)。总之,JDK这个老板的默认策略就是:越小的线程开销越好,多余的人一旦闲了,就让你滚蛋

 

但是这种策略,不是任何场景下都适用了,特别是RPC通信框架之类的。我们想象一下场景,假设RPC的业务处理线程池里corePoolSize是10,maxPoolSize是40,任务队列长度是100,那么此时:

如果突然有30个RPC请求过来,而且这30个RPC业务比较耗时,此时只有10个RPC请求在响应并执行,剩下的20个RPC请求还在任务队列里。因为任务队列是100,还没有满呢,所以不会创建出额外的线程来处理。需要挤压了100个RPC请求,才会开始创建新的线程,来处理这些RPC业务,这是不可接受的。

那么此时,有一个办法解决,就是把corePoolSize和maxPoolSize都改成40。但是这种情况下,就会浪费资源,因为40个线程是后备的最大线程数量,平时是不会有这么大的量的。但是如果corePoolSize==maxPoolSize,那么此时即使所有的线程都是空闲的,也无法清理回收来释放资源,因为corePoolSize的是不回收的。

 

 

那么,有没有一种线程池,可以实现如下功能,在corePoolSize是10,maxPoolSize是40,任务队列长度是100时:

  • 如果有20个请求过来,corePoolSize的10个线程不够的时候,立刻再创建出10个线程来,立刻处理这20个请求。
  • 当同时有50个请求过来,创建的线程已经超过maxPoolSize的40的时候,再把处理不了的10个放在任务队列里。
  • 当请求变少的时候,maxPoolSize创建出来的那30个额外线程再释放掉,释放资源。

本文要探讨的EagerThreadPool就是解决这种问题的线程池,在Dubbo中使用的时候,可以针对性的高效地处理RPC请求。

EagerThreadPool主要由EagerThreadPool、EagerThreadPoolExecutor、TaskQueue 配合实现这种功能,具体的逻辑分析如下:

1.ThreadPoolExecutor的调度策略

2.Dubbo自定义的TaskQueue设计

3.Dubbo的EagerThreadPoolExecutor和TaskQueue的配合

 

1.ThreadPoolExecutor的调度策略

以ThreadPoolExecutor的execute提交任务的方法分析

 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果线程池运行的任务数量小于corePoolSize,直接由core线程运行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池运行的任务数量大于等于corePoolSize,尝试向任务队列暂存提交的任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果任务队列拒绝存储新提交的任务,尝试创建maxPoolSize里剩余的线程,直接执行
else if (!addWorker(command, false))
//如果maxPoolSize也用光了,走reject拒绝策略
reject(command);
}

 

上图的JDK的调度逻辑,跟我们类比的富士康的流水线的逻辑是一模一样的。但是这种逻辑需求不满足Dubbo这种RPC通信的场景,看看Dubbo如何处理。

 

2.Dubbo自定义的TaskQueue设计

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
//和dubbo自己的EagerThreadPoolExecutor 深度配合,两者合作实现这种调度
private EagerThreadPoolExecutor executor;
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
//覆盖JDK默认的offer方法,融入了 EagerThreadPoolExecutor 的属性读取
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
//如果当前运行中的任务数比线程池中当前的线程总数还小,就不管了,每个任务一个线程,管够,直接走JDK的原来逻辑
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
//能走到这里,说明当前运行的任务数是大于线程池当前的线程数的;说明会有任务没有线程可用,需要处理这种情况
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
//如果发现当前线程池的数量还没有到最大的maxPoolSize,返回false; 告知 ThreadPoolExecutor ,插入到任务队列失败。
//这一步,需要先配合EagerThreadPoolExecutor.execute 一块看,EagerThreadPoolExecutor和TaskQueue是深度配合的
//EagerThreadPoolExecutor.execute的主逻辑是super.execute(command); 那么又回到 ThreadPoolExecutor.execute的调度逻辑
//结合上面ThreadPoolExecutor.execute的调度逻辑,我们想一想什么时候,会调用Queue的offer方法
//是的,当EagerThreadPoolExecutor.execute执行的时候,发现corePoolSize已经满了,会先把任务offer添加到任务队列里,如果任务队列满了,拒绝添加,那么线程池,会马上开始尝试创建新的线程。
//这里直接返回false,就是强制线程池立刻马上创建线程。 return false; } // 能走到这里,说明当前的线程数,已经到到了maxPoolSize了,这时候也没有什么花招了,只能调用原始的offer逻辑,真的向任务队列插入。 return super.offer(runnable); } } //虽然 简单看起来,逻辑是通的,但是还是有些细节要处理,具体看 EagerThreadPoolExecutor的设计。

 

3.Dubbo的EagerThreadPoolExecutor和TaskQueue的配合

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
submittedTaskCount.incrementAndGet();
try {
//调用线程池的原始execute,配合自定义的TaskQueue,实现如果corePoolSize满了,offer到taskQueue返回false,强制创建线程
super.execute(command);
} catch (RejectedExecutionException rx) {
//这里要再次尝试retryOffer,再次尝试把任务插入到任务队列里,是考虑到 TaskQueue带来的副作用。结合下面的 ThreadPoolExecutor.execute看,这里跳过
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
public class ThreadPoolExecutor{
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//当corePoolSize不够的时候,workQueue其实就是Dubbo的TaskQueue,这里强制返回了false,不走这个if的分支
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//走了这个else分支后,强制创建线程。但是一旦达到maxPoolSize后,创建失败,会进入reject流程
//其实,这时候,我们不想走reject流程。因为上面的Dubbo的TaskQueue是强制返回false,以此来推动立刻创建新线程出来。
//可是,如果真的失败了,还是要再加入任务队列等待,而不是直接reject拒绝。这就是为什么上面要 catch (RejectedExecutionException rx)
//上面的catch (RejectedExecutionException rx)里面,尝试TaskQueue.retryOffer 这里真的放不进去了,说明真的该执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
}

 

整体看下来,有种Dubbo在欺骗ThreadPoolExecutor的感觉,这种欺骗又产生了副作用,只好又在EagerThreadPoolExecutor里 catch (RejectedExecutionException rx)来缓解这种问题。但是整体上来看,这种策略非常有效,可以快速扩容线程,让RPC的通信延迟更低,响应更快,而且空闲时又不消耗大量的资源。设计的非常巧妙,可以在自己业务的特定场景下,也采用类似的方案。

 

版权声明
本文为[learncat]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/learncat/p/14310339.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云