分支合并框架ForkJoin
原理相关类示例
原理
Fork:把一个复杂任务进行分拆,大事化小 Join:把分拆任务的结果进行合并
fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。
在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。
为了提供有效的并行执行,fork/join 框架使用了一个名为 ForkJoinPool 的线程池,用于管理 ForkJoinWorkerThread 类型的工作线程。
ForkJoinPool
ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。
工作线程一次只能执行一个任务。
ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )( 或 deque,发音 deck )。
这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。
fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。
ForkJoinTask< V >
ForkJoinTask 是 ForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的两个子类中的任意一个
任务不返回结果 ( 返回 void ) 的 RecursiveAction 返回值的任务的 RecursiveTask 这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。
我们所要做的,就是继承任意一个类,然后实现 compute() 方法。
相关类
ForkJoinPool: 分支合并池 类比=> 线程池
ForkJoinTask: ForkJoinTask 类比=> FutureTask
RecursiveTask: 递归任务:继承后可以实现递归(自己调自己)调用的任务
示例
package cduck
.cn
;
import java
.util
.concurrent
.ExecutionException
;
import java
.util
.concurrent
.ForkJoinPool
;
import java
.util
.concurrent
.ForkJoinTask
;
import java
.util
.concurrent
.RecursiveTask
;
class MyTask extends RecursiveTask<Integer>{
private static final Integer ADJUST_VALUE
=10;
private int begin
;
private int end
;
private int result
;
public MyTask(int begin
, int end
) {
this.begin
= begin
;
this.end
= end
;
}
@Override
protected Integer
compute() {
if (end
-begin
<=ADJUST_VALUE
){
for (int i
=begin
;i
<=end
;i
++){
result
=result
+i
;
}
}else{
int middle
=(end
+begin
)/2;
MyTask task01
=new MyTask(begin
,middle
);
MyTask task02
=new MyTask(middle
+1,end
);
task01
.fork();
task02
.fork();
result
=task01
.join()+task02
.join();
}
return result
;
}
}
public class ForkJoinDemo {
public static void main(String
[] args
) throws ExecutionException
, InterruptedException
{
MyTask myTask
=new MyTask(0,100);
ForkJoinPool forkJoinPool
=new ForkJoinPool();
ForkJoinTask
<Integer> submit
= forkJoinPool
.submit(myTask
);
System
.out
.println(submit
.get());
forkJoinPool
.shutdown();
}
}