什么是 Fork / Join 该框架是一个工具 , 通过分而治之的方式尝试将所有可用的处理器内核使用起来帮助加速并行处理
原理简述 :
该算法会把分解的小任务放在多个双端队列中,而线程在队列的头和尾部都可获取任务。
当有线程把当前负责队列的任务处理完之后,它还可以从那些还没有处理完的队列的尾部窃取任务来处理
Fork / Join 线程池 :
把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率
参考 @ blog.csdn.net/tyrroo/arti…
RecursiveTask 是一种 ForkJoinTask 的递归实现 , 例如可以用于计算斐波那契数列 :
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
RecursiveTask 继承了 ForkJoinTask 接口 ,其内部有几个主要的方法:
// Node 1 : 返回结果 , 存放最终结果
V result;
// Node 2 : 抽象方法 compute , 用于计算最终结果
protected abstract V compute();
// Node 3 : 获取最终结果
public final V getRawResult() {
return result;
}
// Node 4 : 最终执行方法 , 这里是需要调用具体实现类compute
protected final boolean exec() {
result = compute();
return true;
}
常见使用方式:
@
public class ForkJoinPoolService extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; //阀值
private int start;
private int end;
public ForkJoinPoolService(Integer start, Integer end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
ForkJoinPoolService leftTask = new ForkJoinPoolService(start, middle);
ForkJoinPoolService rightTask = new ForkJoinPoolService(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完,并得到其结果
Integer rightResult = rightTask.join();
Integer leftResult = leftTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}
}
// 前提 : 需要继承 RecursiveTask<Integer> 类 , 且实现 compute 方法
public class ForkJoinPoolReferenceService extends RecursiveTask<Integer> {
private File file;
private Integer salt;
public ForkJoinPoolReferenceService(File file, Integer salt) {
this.file = file;
this.salt = salt;
}
@Override
protected Integer compute() {
return ForkFileUtils.read(file, salt);
}
}
// 方式一 : Fork Join 方式
ForkJoinPoolReferenceService rt = new ForkJoinPoolReferenceService(files.get(0), i);
rt.fork();
result = result + rt.join();
// 方式二 : submit 方式
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(rt);
result = result + forkJoinTask.get();
ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。
作用 : ForkJoinPool为来自非ForkJoinTask客户端的提交提供入口点,以及管理和监视操作,最原始的任务都要交给它才能处理 .
主要功能包括 :
备注 :
ForkJoinPool不同于其他类型的ExecutorService,主要是因为它使用了窃取工作:池中的所有线程都试图找到并执行提交到池中的任务和/或其他活动任务创建的任务(如果没有工作,最终会阻塞等待工作)。
ForkJoinPool 基础用法
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 这是一个继承 ForkJoinTask 的类
ForkJoinPoolService countTask = new ForkJoinPoolService(1, 200);
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);
System.out.println(forkJoinTask.get());
当大多数任务衍生出其他子任务时,以及当许多小任务从外部客户端提交到池时,这使得高效处理成为可能。
特别是当在构造函数中将asyncMode设置为true时,ForkJoinPool s也可能适合用于从未连接的事件风格任务。
ForkJoin 前知识点 : ForkJoinWorkerThread
ForkJoinWorkerThread 为 fork/join里面真正干活的"工人",本质是一个线程 ,其里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。
ForkJoinWorkerThread 依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。
// Node 1 : 内部属性 , 一个是当前工作线程池
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
// Node 2 : WorkQueue 对象 , WorkQueue 是 ForkJoinPool 内部类
// 1 支持工作窃取和外部任务提交的队列
// 2 可以避免多个工作队列实例或多个队列数组共享缓存线
// 3 双端队列
static final class WorkQueue {
// TODO : 后续有必要会深入了解该类
}
// Node 3 : run 方法
Step 1 : 判断workQueue 是否为空
Step 2 : pool.runWorker(workQueue) 将 workQueue 加入 pool 池
// Node 4 : InnocuousForkJoinWorkerThread
private static final ThreadGroup innocuousThreadGroup =createThreadGroup();
?- 正如之前猜测的 , 线程组果然用在了这里
ForkJoinPool 代码深入
// Node 1 : 内部构建线程方式
static final class DefaultForkJoinWorkerThreadFactory
M- ForkJoinWorkerThread newThread(ForkJoinPool pool)
?- 通过 new ForkJoinWorkerThread(pool) 返回新 Thread
// Node 2 : 内部属性
ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
?- 用于创建 ForkJoinWorkerThread ,可以被重写
RuntimePermission modifyThreadPermission;
?- 该对象用于控制启用和杀掉线程的权限
static final ForkJoinPool common;
?- 公共静态池
volatile long ctl;
?- 主要用于判断状态
// Node 3 : 默认属性
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L;
?- 线程的初始超时值(以纳秒为单位)
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;
?- 空闲超时时间
private static final int DEFAULT_COMMON_MAX_SPARES = 256
?- 在静态初始化期间commonMaxSpares的初始值
private static final int SPINS = 0
?- 在阻塞前自旋等待的次数
// Node 4 : 主要方法
> tryAddWorker :
主要1 :add = U.compareAndSwapLong(this, CTL, c, nc); CAS 判断状态
主要2 : createWorker(); 创建工作
> WorkQueue registerWorker(ForkJoinWorkerThread wt)
主要1 : WorkQueue w = new WorkQueue(this, wt);
主要2 : 推入 workQueues
> runWorker : 运行一个 worker
重点1 : t = scan(w, r) : 扫描并试图窃取一个顶级任务
重点2 : w.runTask(t); : 执行给定的任务和任何剩余的本地任务
> scan : 窃取逻辑
// TODO
> <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
// Node 5 :构造函数
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)
- parallelism : 可并行级别 , 即可并行运行的线程数量
- factory : 线程工厂
- handler : 异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获
- asyncMode : 工作模式类型 , 存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式
总体来说还是不够深入 , 包括其中的性能 , invoke 实际上都还没有测试 , 实际上 ForkJoinPool 源码深入都不到一成 , 但是看源码看的有点头疼了 ,先这样了 , 后续会尽力把他完善清楚
阅读量:2014
点赞量:0
收藏量:0