Fork-Join API

    科技2022-08-17  103

    RecursiveAction

    RecursiveAction 执行任务没有返回值,只执行一次。

    分解任务

    public class MyRecursiveAction extends RecursiveAction { private int beg; private int end; public MyRecursiveAction(int beg, int end) { super(); this.beg = beg; this.end = end; } @Override protected void compute() { if(end-beg>2) { int middle = (beg+end)/2; MyRecursiveAction left = new MyRecursiveAction(beg, middle); MyRecursiveAction right = new MyRecursiveAction(middle+1,end); this.invokeAll(left,right); }else { System.out.println("分解组合 "+beg+" "+end); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new MyRecursiveAction(1,10)); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

    测试结果 分解组合 4 5 分解组合 1 3 分解组合 9 10 分解组合 6 8

    RecursiveTask

    RecursiveTask执行的任务具有返回值;使用get()或者join()获取返回结果,在执行多任务时任务之间的运行是异步的。 get()方法执行获取结果时,子任务中出现异常,主线程中是可以进行捕获到处理的,join()方法时直接抛出异常。

    public class MyRecursiveTask extends RecursiveTask<Integer>{ @Override protected Integer compute() { System.out.println(Thread.currentThread().getName()+"执行 MyRecursiveTask"); String s = null; s.toString(); return 100; } } public static void main(String[] args) { MyRecursiveTask task = new MyRecursiveTask(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> task1 = pool.submit(task); try { System.out.println("result "+task1.join()); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("InterruptedException 子线程异常处理"); } System.out.println("main end"); }

    join 异常抛出 ForkJoinPool-1-worker-1执行 MyRecursiveTask Exception in thread “main” java.lang.NullPointerException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source) at java.util.concurrent.ForkJoinTask.reportException(Unknown Source) at java.util.concurrent.ForkJoinTask.join(Unknown Source) at fork.join.MyRecursiveTaskTest.main(MyRecursiveTaskTest.java:18) Caused by: java.lang.NullPointerException at fork.join.MyRecursiveTask.compute(MyRecursiveTask.java:11) at fork.join.MyRecursiveTask.compute(MyRecursiveTask.java:1) at java.util.concurrent.RecursiveTask.exec(Unknown Source) at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source) at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

    get()异常捕获处理 ForkJoinPool-1-worker-1执行 MyRecursiveTask java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.ForkJoinTask.get(Unknown Source) at fork.join.MyRecursiveTaskTest.main(MyRecursiveTaskTest.java:18) Caused by: java.lang.NullPointerException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source) … 2 more Caused by: java.lang.NullPointerException at fork.join.MyRecursiveTask.compute(MyRecursiveTask.java:11) at fork.join.MyRecursiveTask.compute(MyRecursiveTask.java:1) at java.util.concurrent.RecursiveTask.exec(Unknown Source) at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source) at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) ExecutionException 子线程异常处理 main end

    RecursiveTask 实现字符串累加

    public class MyRecusiveTaskString extends RecursiveTask<String> { private int beg; private int end; public MyRecusiveTaskString(int beg, int end) { super(); this.beg = beg; this.end = end; } @Override protected String compute() { if (end - beg > 2) { int middle = (beg + end) / 2; MyRecusiveTaskString left = new MyRecusiveTaskString(beg, middle); MyRecusiveTaskString right = new MyRecusiveTaskString(middle + 1, end); this.invokeAll(left, right); return left.join() + right.join(); } else { System.out.println("分解组合 " + beg + " " + end); String s = ""; for (int i = beg; i <= end; i++) { s = s + "" + i; } return s; } } } public static void main(String[] args) { MyRecusiveTaskString task = new MyRecusiveTaskString(1,14); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<String> task1 = pool.submit(task); try { System.out.println("result "+task1.get()); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("InterruptedException 子线程异常处理"); } catch (ExecutionException e) { e.printStackTrace(); System.out.println("ExecutionException 子线程异常处理"); } System.out.println("main end"); }

    测试结果 分解组合 1 2 分解组合 12 14 分解组合 3 4 分解组合 8 9 分解组合 5 7 分解组合 10 11 result 1234567891011121314 main end

    execute() 方法没有返回值,但是可以通过RecusiveTask获取返回值。 void execute(Runnable task); void execute(ForkJoinTask<?> task);

    submit()方法有返回值,通过ForkJoinTask获取返回值,也可以通过RecusiveTask获取返回值。 ForkJoinTask submit(ForkJoinTask task); ForkJoinTask submit(Callable task) ForkJoinTask<?> submit(Runnable task) ForkJoinTask submit(Runnable task, T result);

    List<Future> invokeAll(Collection<? extends Callable> tasks)方法具有阻塞特性,会将所有任务都执行完成后再返回结果。

    void shutdown(); 方法不具有线程中断效果,提交任务后再执行shutdown方法,任务依然会被执行,但是执行了shutdown后再次提交任务进程会抛出异常(RejectedExecutionException)并且销毁,正在运行的任务线程也会被销毁。

    List shutdownNow();方法会中断线程,需要结合Thread.currentTherad.isInterrupted()==true判断线程是否中断,但是执行了shutdownNow后再次提交任务进程会抛出异常(RejectedExecutionException)并且销毁,正在运行的任务线程也会被销毁。

    boolean isTerminated();任务池是否被终止; boolean isTerminating();任务池是否正在终止;shutdown方法关闭pool池之前方法返回值一直是false;

    boolean isShutdown();任务池是否终止; boolean awaitTermination(long timeout, TimeUnit unit);等待任务池被销毁的最长时间,具有阻塞特性,需要结合shutdown方法使用。 T invoke(ForkJoinTask task);执行任务,方法具有阻塞特性,直接将值返回,不需要再次get; ForkJoinTask API boolean isCompletedAbnormally(); 判断任务是否出现异常; boolean isCompletedNormally(); 判断任务是否正常执行完毕; Throwable getException();返回报错异常;

    监控pool任务池的相关API int getParallelism(); 获得并行数量,与CPU内核数相关。 int getPoolSize();获得任务池大小。 int getQueuedSubmissionCount();获得已经提交但没有执行的任务数量。 long getQueuedTaskCount() ; 获得任务总个数。 boolean hasQueuedSubmissions();判断队列中是否还有未执行的任务。 int getActiveThreadCount();获取活动线程总数。 long getStealCount();获取偷窃的任务个数。 int getRunningThreadCount();获取正在执行,不在阻塞状态的线程个数。 boolean isQuiescent(); 判断任务池是否是静止未执行任务的状态。

    Processed: 0.011, SQL: 9