分支合并框架

    科技2025-08-02  6

    分支合并框架ForkJoin

    原理相关类示例

    原理

    Fork:把一个复杂任务进行分拆,大事化小 Join:把分拆任务的结果进行合并

    fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。

    在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。

    为了提供有效的并行执行,fork/join 框架使用了一个名为 ForkJoinPool 的线程池,用于管理 ForkJoinWorkerThread 类型的工作线程。

    ForkJoinPool

    ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。

    工作线程一次只能执行一个任务。

    ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )( 或 deque,发音 deck )。

    这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。

    fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。

    ForkJoinTask< V >

    ForkJoinTask 是 ForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的两个子类中的任意一个

    任务不返回结果 ( 返回 void ) 的 RecursiveAction 返回值的任务的 RecursiveTask 这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。

    我们所要做的,就是继承任意一个类,然后实现 compute() 方法。

    相关类

    ForkJoinPool: 分支合并池 类比=> 线程池

    ForkJoinTask: ForkJoinTask 类比=> FutureTask

    RecursiveTask: 递归任务:继承后可以实现递归(自己调自己)调用的任务

    示例

    package cduck.cn; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; class MyTask extends RecursiveTask<Integer>{ private static final Integer ADJUST_VALUE=10; private int begin; private int end; private int result; public MyTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Integer compute() { if (end-begin<=ADJUST_VALUE){ for (int i=begin;i<=end;i++){ result=result+i; } }else{ int middle=(end+begin)/2; MyTask task01=new MyTask(begin,middle); MyTask task02=new MyTask(middle+1,end); task01.fork(); task02.fork(); result =task01.join()+task02.join(); } return result; } } /** * 分支合并框架 * * ForkJoinPool * ForkJoinTask * RecursiveTask */ public class ForkJoinDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { MyTask myTask=new MyTask(0,100); ForkJoinPool forkJoinPool=new ForkJoinPool(); ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask); System.out.println(submit.get()); forkJoinPool.shutdown(); } }
    Processed: 0.010, SQL: 8