Thread pool in Java

Java old K 2020-11-10 08:26:33
thread pool java


Thread pool is familiar to everyone , Whether it is the usual business development or framework middleware will use , Most of it is based on JDK Thread pool ThreadPoolExecutor Make a package , such as tomcat Thread pool of , Of course, there are also separate development , But it will involve the setting of these core parameters : Number of core threads , Waiting in line , Maximum number of threads , Refusal strategy etc. .

When we first step into the pool, we use the thread group :

  1. The parameter setting of thread pool must be combined with specific business scenarios , distinguish I/O Intensive and CPU Concentrated , If it is I/O Intensive business , Number of core threads ,workQueue Waiting in line , The unreasonable setting of parameters such as the maximum number of threads can not play the role of thread pool , It will affect the existing business
  2. Waiting in line workQueue Fill it up , The newly created thread takes precedence over the new request coming in , Instead of dealing with tasks in the queue , Tasks in the queue can only be executed when the number of core threads is busy . It may cause the task in the queue to wait for a long time , This leads to a queue backlog , In especial I/O Dense scenes
  3. If you need to get the thread execution results in the thread pool , Use future The way , Rejection strategies cannot be used DiscardPolicy, This discarding strategy does not perform the task of the child thread , But it will return future object ( In fact, in this case, we no longer need the result returned by the thread pool ), Then the subsequent code even judged future!=null It's no use , In this case, you will still come to future.get() Method , If get Method does not set the time-out time will cause the block to continue !

The pseudocode is as follows :

// If the thread pool is full , The new request will directly execute the rejection policy
Future<String> future = executor.submit(() -> {
// Business logic , For example, time-consuming operations such as calling a third-party interface are executed in the thread pool
return result;
});
// Main procedure call logic
if(future != null) // If the rejection policy setting is unreasonable, it will go to the following code
future.get( Timeout time ); // The caller blocks waiting for the result to return , Until timeout 

The following will be combined with the actual business situation one by one to analyze .

Of course, some of these problems are caused by insufficient understanding of thread pool , There is also a problem with the thread pool itself .

One . background

The company has an interface that uses thread pools , This function does not rely on the core interface , But there's a certain amount of time-consuming , So put it in the thread pool and execute it in parallel with the main thread , Wait for the task in the thread pool to execute and pass future.get To get the thread execution results in the thread pool , It is then merged into the results of the main process and returned to the front end , The business scenario is simple , The general flow is as follows :

image

The original intention is not to affect the performance of the main process , No increase in overall response time .

But the thread pool used before jdk Of newCachedThreadPool, because sonar Scanning indicates that there is a risk of memory overflow ( The maximum number of threads is Integer.MAX_VALUE) So we used native ThreadPoolExecutor, By specifying the number of core threads and the maximum number of threads , To solve sonar problem .

But the changed thread pool is not suitable for us I/O Intensive business scenarios ( Most of the business is implemented by calling the interface ), The number of core threads set at that time was cpu Check the number ( The online machine is 4 nucleus ), The waiting queue is 2048, The maximum number of threads is cpu Check the number *2, This has led to a series of problems ...

Two .  The screening process

The phenomenon after online is that the overall response time of the interface using thread pool becomes longer , Some even go to 10 Seconds to return data , Through the thread dump Analysis found that a large number of threads are blocked in future.get On the way , as follows :

image

future.get Method will block the current mainstream , Wait for the child thread to return the result within the timeout period , If the timeout has no result, the end of the wait to continue to execute the subsequent code , The timeout setting is the default interface timeout 10 second ( The following has been changed to 200ms), At this point, it can be determined that the total interface time is because the process is stuck in future.get This is the step .

But that's not the root cause ,future It's returned by the thread pool , The pseudocode is as follows :

Future<String> future = executor.submit(() -> {
// Business logic , For example, time-consuming operations such as calling a third-party interface are executed in the thread pool
return result;
});

From the above code, we can see future The reason for no result is that the task submitted to the thread pool has not been executed .

Then why didn't it be implemented ? Continue to analyze the thread pool dump File discovery , The number of threads in the thread pool has reached the maximum number , Full load operation , Pictured :

image

SubThread It's the name of the thread in our own defined thread pool ,8 All threads are runnable state , Indicates that the waiting queue is full of tasks , The queue length previously set is 2048, That is to say, there are 2048 Tasks waiting to be performed , This undoubtedly increases the time consumption of the entire interface .

The execution order of thread pool is : Number of core threads ->  Waiting in line -> Maximum number of threads -> Refusal strategy

If the thread dump If you don't know much about it, you can look at the previous article :Windows How to thread in the environment dump analysis , Although the environment is different, the principle is similar .

It is basically determined that the main reason for the interface time-consuming is that the thread pool setting is unreasonable .

There are also occasional problems , The online log shows that although the thread pool has executed , However, the tasks in the thread pool do not record the running log , The task in the thread pool is to call the interface of another service , Confirm with the person in charge of the other interface that their interface has been called , But our own log did not record the call message , After a further look at the code, it is found that the thread pool denial policy at that time has also been modified , It's not the default to throw an exception and not execute the strategy AbortPolicy, It's set CallerRunsPolicy Strategy , That is, it is handed over to the caller for execution !

image

image

That is to say, when the thread pool reaches the maximum load, the rejection strategy is Let the main process execute the task submitted to the thread pool , In addition to further increasing the time consumption of the entire interface , It also causes the mainstream process to be hang die , The most important thing is that it is impossible to determine at which step the task submitted to the thread pool is executed .

By analyzing the log embedding point, we can infer that the time point of the call should be the method that has already called to record the log , The task in the thread pool is executed when the result is to be returned to the front end , At this point, the logging method has been called , I won't print the log anymore , Moreover, the results returned by subtasks cannot be merged into the main process results , Because the merge main stream result and the thread pool task return method has also been called before , It won't be called back , The general flow is as follows :

image

In fact, this refusal strategy is not suitable for our current business scenarios , Because the tasks in the thread pool are not core tasks , It should not affect the execution of the main process .

3、 ... and . improvement

  1. Adjust thread pool parameters , The number of core threads is based on the online interface QPS Calculation , Maximum number of threads reference line tomcat The maximum number of threads configuration , can cover Live in peak traffic , Keep the queue as small as possible , Avoid squeezing tasks . How to set the number of threads will be explained separately in the following article .
  2. Expand thread pool , Encapsulate native JDK Thread pool ThreadPoolExecutor, Increase the monitoring of thread pool indicators , Including thread pool running state 、 Number of core threads 、 Maximum number of threads 、 The number of waiting tasks 、 Number of tasks completed 、 Thread pool abnormal shutdown and other information , Easy to monitor and locate problems in real time .
  3. Override thread pool deny policy , It is mainly used to record the indicators when the thread pool load is exceeded , And the stack information of the calling thread , It is convenient for investigation and analysis , Interrupt execution by throwing an exception , Avoid quoting future Not for null The problem of .
  4. Adjust reasonably future.get Timeout time , Prevent blocking the main thread for too long .

Thread pool internal process :

image

The code for thread pool monitoring and custom reject policy is as follows , You can use it in combination with your own business scenarios :

package com.javakk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.*;
/**
* Custom thread pool <p>
* 1. Monitor thread pool status and abnormal shutdown <p>
* 2. Monitor the running time of thread pool , such as : The number of waiting tasks 、 Number of tasks completed 、 Task exception information 、 Number of core threads 、 Maximum number of threads, etc <p>
* author: The old K
*/
public class ThreadPoolExt extends ThreadPoolExecutor{
private static final Logger log = LoggerFactory.getLogger(ThreadPoolExt.class);
private TimeUnit timeUnit;
public ThreadPoolExt(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeUnit = unit;
}
@Override
public void shutdown() {
// The thread pool is about to close the event , This method will wait for the executing tasks in the thread pool and the waiting tasks in the queue to be executed before closing
monitor("ThreadPool will be shutdown:");
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
// Thread pool close event immediately , This method immediately closes the thread pool , But it will return the task waiting in the queue
monitor("ThreadPool going to immediately be shutdown:");
// Record discarded tasks , For the time being, just log , Further processing can be done according to the business scenario
List<Runnable> dropTasks = null;
try {
dropTasks = super.shutdownNow();
log.error(MessageFormat.format("ThreadPool discard task count:{0}", dropTasks.size()));
} catch (Exception e) {
log.error("ThreadPool shutdownNow error", e);
}
return dropTasks;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
// Monitor the running time of thread pool
monitor("ThreadPool monitor data:");
}
@Override
protected void afterExecute(Runnable r, Throwable ex) {
if (ex != null) { // Monitor whether the thread execution in the thread pool is abnormal
log.error("unknown exception caught in ThreadPool afterExecute:", ex);
}
}
/**
* Monitor the running time of thread pool , such as : The number of waiting tasks 、 Task exception information 、 Number of tasks completed 、 Number of core threads 、 Maximum number of threads, etc <p>
*/
private void monitor(String title){
try {
// Thread pool monitoring information record , Here we need to pay attention to writing ES The timing of , In particular, the log of multiple threads is merged into the main process
String threadPoolMonitor = MessageFormat.format(
"{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +
"task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +
"thread name:{13}{14}",
System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),
this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),
this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
log.info(threadPoolMonitor);
} catch (Exception e) {
log.error("ThreadPool monitor error", e);
}
}
}

Custom reject policy code :

package com.javakk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.*;
import java.text.MessageFormat;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Custom thread pool denial policy :<p>
* 1. Record the number of core threads in the thread pool , Active number , Completed number and other information , And the stack information of the calling thread , Easy to check <p>
* 2. Throw an exception to interrupt execution <p>
* author: The old K
*/
public class RejectedPolicyWithReport implements RejectedExecutionHandler {
private static final Logger log = LoggerFactory.getLogger(RejectedPolicyWithReport.class);
private static volatile long lastPrintTime = 0;
private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;
private static Semaphore guard = new Semaphore(1);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
String title = "thread pool execute reject policy!!";
String msg = MessageFormat.format(
"{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +
"task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +
"thread name:{13}{14}",
System.lineSeparator(), title, e.getCorePoolSize(), e.getPoolSize(), e.getQueue().size(), e.getActiveCount(),
e.getCompletedTaskCount(), e.getTaskCount(), e.getLargestPoolSize(), e.getMaximumPoolSize(), e.getKeepAliveTime(TimeUnit.SECONDS),
e.isShutdown(), e.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
log.info(msg);
threadDump(); // Record thread stack information, including lock contention information
} catch (Exception ex) {
log.error("RejectedPolicyWithReport rejectedExecution error", ex);
}
throw new RejectedExecutionException("thread pool execute reject policy!!");
}
/**
* Get thread dump Information <p>
* Be careful : This method records all thread and lock information by default, although it is convenient debug, When using, it is better to add switch and interval call , Otherwise, it may increase latency<p>
* 1. Basic information of the current thread :id,name,state<p>
* 2. Stack information <p>
* 3. Lock related information ( You can set not to record )<p>
* Default in log Record <p>
* @return
*/
private void threadDump() {
long now = System.currentTimeMillis();
// every other 10 minute dump once
if (now - lastPrintTime < TEN_MINUTES_MILLS) {
return;
}
if (!guard.tryAcquire()) {
return;
}
// asynchronous dump Thread pool information
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
try {
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
StringBuilder sb = new StringBuilder();
for (ThreadInfo threadInfo : threadMxBean.dumpAllThreads(true, true)) {
sb.append(getThreadDumpString(threadInfo));
}
log.error("thread dump info:", sb.toString());
} catch (Exception e) {
log.error("thread dump error", e);
} finally {
guard.release();
}
lastPrintTime = System.currentTimeMillis();
});
pool.shutdown();
}
@SuppressWarnings("all")
private String getThreadDumpString(ThreadInfo threadInfo) {
StringBuilder sb = new StringBuilder(""" + threadInfo.getThreadName() + """ +
" Id=" + threadInfo.getThreadId() + " " +
threadInfo.getThreadState());
if (threadInfo.getLockName() != null) {
sb.append(" on " + threadInfo.getLockName());
}
if (threadInfo.getLockOwnerName() != null) {
sb.append(" owned by "" + threadInfo.getLockOwnerName() +
"" Id=" + threadInfo.getLockOwnerId());
}
if (threadInfo.isSuspended()) {
sb.append(" (suspended)");
}
if (threadInfo.isInNative()) {
sb.append(" (in native)");
}
sb.append('n');
int i = 0;
StackTraceElement[] stackTrace = threadInfo.getStackTrace();
MonitorInfo[] lockedMonitors = threadInfo.getLockedMonitors();
for (; i < stackTrace.length && i < 32; i++) {
StackTraceElement ste = stackTrace[i];
sb.append("tat " + ste.toString());
sb.append('n');
if (i == 0 && threadInfo.getLockInfo() != null) {
Thread.State ts = threadInfo.getThreadState();
switch (ts) {
case BLOCKED:
sb.append("t- blocked on " + threadInfo.getLockInfo());
sb.append('n');
break;
case WAITING:
sb.append("t- waiting on " + threadInfo.getLockInfo());
sb.append('n');
break;
case TIMED_WAITING:
sb.append("t- waiting on " + threadInfo.getLockInfo());
sb.append('n');
break;
default:
}
}
for (MonitorInfo mi : lockedMonitors) {
if (mi.getLockedStackDepth() == i) {
sb.append("t- locked " + mi);
sb.append('n');
}
}
}
if (i < stackTrace.length) {
sb.append("t...");
sb.append('n');
}
LockInfo[] locks = threadInfo.getLockedSynchronizers();
if (locks.length > 0) {
sb.append("ntNumber of locked synchronizers = " + locks.length);
sb.append('n');
for (LockInfo li : locks) {
sb.append("t- " + li);
sb.append('n');
}
}
sb.append('n');
return sb.toString();
}
}

Source of the article :http://javakk.com/188.html

版权声明
本文为[Java old K]所创,转载请带上原文链接,感谢

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云