「Java并发编程」从源码分析几道必问线程池的面试题?

Mr.Z 2020-11-10 23:11:59
java 编程 分析 源码 并发


这篇文章我们就来分析下上篇文章的几个小问题

  • 线程池是否区分核心线程和非核心线程?
  • 如何保证核心线程不被销毁?
  • 线程池的线程是如何做到复用的?我们先看最后一个问题一般一个线程执行完任务之后就结束了,Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。如果你对一个已经启动的线程对象再调用一次start方法的话,会产生:IllegalThreadStateException异常,但是Thread的run方法是可以重复调用的。所以这里也会有一个面试经常问到的问题:「Thread类中run()和start()方法的有什么区别?」下面我们就从jdk的源码来一起看看如何实现线程复用的:线程池执行任务的ThreadPoolExecutor#execute方法为入口
 public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
   
     int c = ctl.get();
     // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中
     if (isRunning(c) && workQueue.offer(command)) {
      // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态
         int recheck = ctl.get();
         // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队
         if (! isRunning(recheck) && remove(command))
             reject(command);
         // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务
         else if (workerCountOf(recheck) == 0)
             addWorker(null, false);
     }
     // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务
     // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略
     else if (!addWorker(command, false))
         reject(command);
 } 

「excute」方法主要业务逻辑

  • 如果当前的线程池运行线程小于「coreSize」,则创建新线程来执行任务。
  • 如果当前运行的线程等于「coreSize」或多余「coreSize」(动态修改了coreSize才会出现这种情况),把任务放到阻塞队列中。
  • 如果队列已满无法将新加入的任务放进去的话,则需要创建新的线程来执行任务。
  • 如果新创建线程已经达到了最大线程数,任务将会被拒绝。

addWorker 方法

上述方法的核心主要就是addWorker方法,

private boolean addWorker(Runnable firstTask, boolean core) {
       // 前面还有一部分就省略了。。。。
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    } 

这个方法我们先看看这个「work」类吧

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            runWorker(this);
        } 

「work」类实现了「Runnable」接口,然后run方法里面调用了「runWorker」方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 新增创建
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
             // 判断 task 是否为空,如果不为空直接执行
         // 如果 task 为空,调用 getTask() 方法,从 workQueue 中取出新的 task 执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    } 

这个runwork方法中会优先取worker绑定的任务,如果创建这个worker的时候没有给worker绑定任务,worker就会从队列里面获取任务来执行,执行完之后worker并不会销毁,而是通过while循环不停的执行getTask方法从阻塞队列中获取任务调用task.run()来执行任务,这样的话就达到了线程复用的目的。while (task != null || (task = getTask()) != null) 这个循环条件只要getTask 返回获取的值不为空这个循环就不会终止, 这样线程也就会一直在运行。「那么任务执行完怎么保证核心线程不销毁?非核心线程销毁?」答案就在这个getTask()方法里面

private Runnable getTask() {
  // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true
  // 这个标记非常之重要,下面会说到
  boolean timedOut = false;
  for (;;) {
    // 获取ctl变量值
    int c = ctl.get();
    int rs = runStateOf(c);
    // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP
    // 则操作AQS减少工作线程数量,并且返回null,线程被回收
    // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      // 操作AQS将线程池中的线程数量减一
      decrementWorkerCount();
      return null;
    }
    // 获取线程池中的有效线程数量
    int wc = workerCountOf(c);
    // 如果主动开启allowCoreThreadTimeOut,或者获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的
    // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收
    // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    // 这里说明了两点销毁线程的条件:
    // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize,
    // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了
    // 以上两点满足其一,都可以触发线程超时回收
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      // 尝试用AQS将线程池线程数量减一
      if (compareAndDecrementWorkerCount(c))
        // 减一成功后返回null,线程被回收
        return null;
      // 否则循环重试
      continue;
    }
    try {
      // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      // 如果poll超时获取任务超时了, 将timeOut设置为true
      // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
} 

所以保证线程不被销毁的关键代码就是这一句代码

 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); 

只要timed为false这个workQueue.take()就会一直阻塞,也就保证了线程不会被销毁。timed的值又是通过allowCoreThreadTimeOut和正在运行的线程数量是否大于coreSize控制的。

  • 只要getTask方法返回null 我们的线程就会被回收(runWorker方法会调用processWorkerExit)
  • 这个方法的源码也就解释了为什么我们在创建线程池的时候设置了allowCoreThreadTimeOut =true的话,核心线程也会进行销毁。
  • 通过这个方法我也们可以回答上面那个问题线程池是不区分核心线程和非核心线程的。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。
  • 巨人的肩膀摘苹果

看完三件事️

========

如果你觉得这篇内容对你还蛮有帮助,我想邀请你帮我三个小忙:

点赞,转发,有你们的 『点赞和评论』,才是我创造的动力。

关注公众号 『 Java斗帝 』,不定期分享原创知识。

同时可以期待后续文章ing

版权声明
本文为[Mr.Z]所创,转载请带上原文链接,感谢
https://segmentfault.com/a/1190000037792131

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云