Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool

入门小站 2021-01-20 21:42:35
java 组件 扩展 j.u.c forkjointask


Fork/Join框架中两个核心类 ForkJoinTaskForkJoinPool,声明 ForkJoinTask后,将其加入 ForkJoinPool中,并返回一个 Future对象。
  • ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割的子任务会添加到当前工作维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其它工作线程的队列尾部获取一个任务。
  • ForkJoinTask:我们需要使用ForkJoin框架,首先要创建一个ForkJoin任务。它提供在任务中执行Fork()Join()操作的机制,通常情况下不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供以下两个子类。
  • RecursiveAction:用于没有返回值的任务。
  • RecursizeTask:用于有返回值的任务。

image-20210120194025938

Exception

ForkJoinTask在执行的时候可能会抛出异常,但是我们没有办法直接在主线程里捕获异常,所以 ForkJoinTask提供了 isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTaskgetException方法捕获异常。
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** ForkJoinTask运行状态 */
volatile int status; // 直接被ForkJoin池和工作线程访问
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16
static final int SMASK = 0x0000ffff; // short bits for tags
/**
* @Ruturn 任务是否扔出异常或被取消
*/
public final boolean isCompletedAbnormally() {
return status < NORMAL;
}
/**
* 如果计算扔出异常,则返回异常
* 如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null
*/
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
}

ForkJoinPool源码

public class ForkJoinPool extends AbstractExecutorService {
/**
* ForkJoinPool,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了
* 一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希
* 望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
*/
public ForkJoinPool() {
this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
//有多个构造器,这里省略
volatile WorkQueue[] workQueues; // main registry
static final class WorkQueue {
final ForkJoinWorkerThread owner; // 工作线程
ForkJoinTask<?>[] array; // 任务
//传入的是ForkJoinPool与指定的一个工作线程
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
}
}

FrokJoinPool work stealing算法

38f0c9a9a9b072b589aba560a849c18bad5.jpg

ForkJoinPool维护了一组 WorkQueue,也就是工作队列,工作队列中又维护了一个工作线程 ForkJoinWorkerThread与一组工作任务 ForkJoinTask

WorkQueue是一个双端队列Deque(Double Ended Queue),Deque是一种具有队列和栈性质的数据结构,双端队列中的元素可以从两端弹出,其限定插入和删除操作在表的两端进行。

每个工作线程在运行中产生新的任务(通常因为调用了fork())时,会放在工作队列的对尾,并且工作线程在处理自己的工作队列时,使用的是LIFO,也就是说每次从队列尾部取任务来执行。

每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其它工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。

在遇到Join()时,如果需要Join的任务尚未完成,则会优先处理其它任务,并等待其完成。

在没有自己的任务时,也没有任何可以窃取时,则进入休眠。

public class ForkJoinPool extends AbstractExecutorService {
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {}
public <T> ForkJoinTask<T> submit(Callable<T> task) {}
public <T> ForkJoinTask<T> submit(Runnable task, T result) {}
public ForkJoinTask<?> submit(Runnable task) {}
}
ForkJoinPool自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread线程)提交过来的任务,而这些工作队列被称为 submitting queue

ForkJoinTask

任务的操作,重要的是 fork()join()
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* 在当前任务正在运行的池中异步执行此任务(如果适用)
* 或使用ForkJoinPool.commonPool()(如果不是ForkJoinWorkerThread实例)进行异步执行
*/
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
}
fork()做的工作只有一件事,就是把当前任务推入当前线程的工作队列里。

join()的工作就比较复杂,也是join()可以使的线程免于被阻塞的原因。

  • 检查调用join()的线程是否是ForkJoinThread线程。如果不是(例如main线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  • 检查任务的完成状态,如果已经完成,则直接返回结果。
  • 如果任务尚未完成,但是处理自己的工作队列,则完成它。
  • 如果任务已经被其它线程偷走,则这个小偷工作队列的任务以先进先出的方式执行,帮助小偷线程尽快完成join
  • 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要join的任务时,则找到小偷的小偷(递归执行),帮助它完成它的任务。

bc8ab44bbad490edddfe5b6e8febaab2742.jpg

ForkJoinPool.submit方法

public static void main(String[] args) throws ExecutionException, InterruptedException {
//生成一个池
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask task=new ForkJoinExample(1, 100000);
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
Integer sum = submit.get();
System.out.println("最后的结果是:"+sum);
}
每个工作线程自己拥有的工作队列以外, ForkJoinPool自身也拥有工作队列,这些工作队列的作用是用来接收有外部线程(非 ForkJoinPool)提交过来的任务,而这些工作队列被称为 submitting queue

submit()fork()没有本质区别,只是提交对象变成了submitting queue(还有一些初始化,同步操作)。submitting queue和其它work queue一样,是工作线程窃取的对象,因此当其中的任务被一个工作线程成功窃取时,也就意味着提交的任务真正开始进入执行阶段。

关注微信公众号:【入门小站】,解锁更多知识点。

版权声明
本文为[入门小站]所创,转载请带上原文链接,感谢
https://segmentfault.com/a/1190000039042536

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云