CompletionServiceAPI的使用

    科技2022-07-13  137

    CompletionService 介绍

    接口CompletionService 是以异步的方式一边产生新任务,一边处理完成的任务结果,可以将任务执行和任务处理分离开,使用submit()执行任务,task()取得已完成的任务,并按照任务的时间顺序处理结果。

    使用CompletionService 解决Future阻塞的问题 Future take() throws InterruptedException ;方法会先将执行完的任务Future对象返回,这样可以部分优化Future的线程阻塞问题,但是接口中存在没有执行完的任务则cs.take().get()还是会阻塞线程。

    public class MyCallable implements Callable<String> { private String name; private long sleepValue; public MyCallable(String name, long sleepValue) { super(); this.name = name; this.sleepValue = sleepValue; } @Override public String call() throws Exception { System.out.println(name); Thread.sleep(sleepValue); return "result call "+name; } } public static void main(String[] args) { MyCallable call1 = new MyCallable("任务1", 4000); MyCallable call2 = new MyCallable("任务2", 3000); MyCallable call3 = new MyCallable("任务3", 2000); List<Callable> list = new ArrayList<Callable>(); list.add(call1); list.add(call2); list.add(call3); ThreadPoolExecutor poll = new ThreadPoolExecutor(5, 7, 5, TimeUnit.SECONDS, new LinkedBlockingDeque()); CompletionService cs = new ExecutorCompletionService(poll); list.forEach(m -> { cs.submit(m); }); try { for (int i = 0; i < list.size(); i++) { System.out.println("打印 " + i + " 返回值"); System.out.println(cs.take().get()); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

    测试结果 任务1 任务3 任务2 打印 0 返回值 result call 任务3 打印 1 返回值 result call 任务2 打印 2 返回值 result call 任务1

    Future poll() 方法获取并删除已完成任务的Future,如果不存在则返回null;方法不会阻塞线程。 Future poll(long timeout, TimeUnit unit) throws InterruptedException;等待指定的时间之内获取任务执行结果,获取到值立即向下执行,如果超时返回null也继续向下执行。

    public static void main(String[] args) { ExecutorService exce = Executors.newCachedThreadPool(); ExecutorCompletionService cs =new ExecutorCompletionService(exce); cs.submit(new Callable(){ public String call() throws Exception{ Thread.sleep(3000); System.out.println("开始执行任务"+" "+System.currentTimeMillis()); return "call ruselt 100"; } }); System.out.println(cs.poll()+" "+System.currentTimeMillis()); }

    测试结果 null 1601793073623 开始执行任务 1601793076624

    CompletionService 的异常情况 只调用task()和poll()方法不会抛出异常,再继续调用get()方法才可能出现异常。

    public class MyCallableA implements Callable<String> { @Override public String call() throws Exception { System.out.println("B start " + System.currentTimeMillis()); Thread.sleep(2000); int i = 1/0; System.out.println("B end " + System.currentTimeMillis()); return "result call B"; } } public class MyCallableB implements Callable<String> { @Override public String call() throws Exception { System.out.println("A start " + System.currentTimeMillis()); Thread.sleep(1000); System.out.println("A end " + System.currentTimeMillis()); return "result call A"; } } public static void main(String[] args) { MyCallableA a = new MyCallableA(); MyCallableB b = new MyCallableB(); Executor ex = Executors.newCachedThreadPool(); CompletionService com = new ExecutorCompletionService(ex); com.submit(a); com.submit(b); try { System.out.println("task 1 " + com.take().get()); System.out.println("task 2 " + com.take().get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("main end"); }

    com.take() 测试结果 B start 1601794183008 A start 1601794183008 A end 1601794184009 task 1 java.util.concurrent.FutureTask@55f96302 task 2 java.util.concurrent.FutureTask@3d4eac69 main end

    com.take().get() 测试结果 B start 1601794325963 A start 1601794325963 A end 1601794326965 task 1 result call A java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at CompletionService.ExceptionTest.main(ExceptionTest.java:20) Caused by: java.lang.ArithmeticException: / by zero at CompletionService.MyCallableA.call(MyCallableA.java:11) at CompletionService.MyCallableA.call(MyCallableA.java:1) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.Executors R u n n a b l e A d a p t e r . c a l l ( U n k n o w n S o u r c e ) a t j a v a . u t i l . c o n c u r r e n t . F u t u r e T a s k . r u n ( U n k n o w n S o u r c e ) a t j a v a . u t i l . c o n c u r r e n t . T h r e a d P o o l E x e c u t o r . r u n W o r k e r ( U n k n o w n S o u r c e ) a t j a v a . u t i l . c o n c u r r e n t . T h r e a d P o o l E x e c u t o r RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor RunnableAdapter.call(UnknownSource)atjava.util.concurrent.FutureTask.run(UnknownSource)atjava.util.concurrent.ThreadPoolExecutor.runWorker(UnknownSource)atjava.util.concurrent.ThreadPoolExecutorWorker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) main end

    Processed: 0.015, SQL: 8