ExecutorService API的使用

    科技2022-07-15  121

    T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException; 方法取得第一个完成任务的返回结果,具有线程阻塞特性,当第一个任务完成后会调用interrupt()方法中断其他任务,所有其他任务可以根据 if(Thread.currentThread().isTnterrupt()==true) 来判断任务是否继续执行。 1 无 if(Thread.currentThread().isTnterrupt()==true)判断,获得第一个运行结果后其他任务继续执行。 2 有 if(Thread.currentThread().isTnterrupt()==true)判断,获得第一个值后,调用线程的任务就会中断执行,虽然抛出 IntrerruptException()异常,但是主线程是无法捕获的,需要在Callable中进行异常的显式捕获。

    public class MycallableInterruptA implements Callable<String> { @Override public String call() throws Exception { System.out.println("A start "+System.currentTimeMillis()); for(int i=0;i<10;i++) { Math.random(); System.out.println("A 中 "+(i+1)+" 在运行"); } System.out.println("A end "+System.currentTimeMillis()); return "call result A"; } } public class MycallableInterruptB implements Callable<String> { @Override public String call() throws Exception { System.out.println("B start "+System.currentTimeMillis()); for(int i=0;i<200;i++) { Math.random(); System.out.println("B 中 "+(i+1)+" 在运行"); } System.out.println("B end "+System.currentTimeMillis()); return "call result B"; } } public static void main(String[] args) { List list = new ArrayList(); list.add(new MycallableInterruptA()); list.add(new MycallableInterruptB()); ExecutorService ex = Executors.newCachedThreadPool(); try { String str = (String) ex.invokeAny(list); System.out.println("取得返回值 "+str); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

    测试结果 没有判断 A运行完毕返回结果,B任务还是在执行 A 中 9 在运行 A 中 10 在运行 A end 1601800731341 B start 1601800731341 B 中 1 在运行 B 中 2 在运行 取得返回值 call result A B 中 3 在运行 B 中 4 在运行

    public class MycallableInterruptB2 implements Callable<String> { @Override public String call() throws Exception { System.out.println("B2 start "+System.currentTimeMillis()); for(int i=0;i<200;i++) { if(Thread.currentThread().isInterrupted()==false) { Math.random(); System.out.println("B2 中 "+(i+1)+" 在运行"); }else { System.out.println("B2 线程中断"); throw new InterruptedException(); } } System.out.println("B2 end "+System.currentTimeMillis()); return "call result B2"; } }

    测试结果 有判断,A任务执行完后,B任务中断执行 A start 1601800865998 B2 start 1601800865998 B2 中 1 在运行 A 中 1 在运行 B2 中 2 在运行 A 中 2 在运行 A end 1601800866000 B2 中 20 在运行 B2 中 21 在运行 B2 中 22 在运行 B2 线程中断 取得返回值 call result A

    invokeAny() 与执行慢的任务异常 能获取到执行快任务的结果,执行慢的任务线程异常时线程会中断,不显式捕获异常控制台什么都没有。

    public class MycallableA implements Callable<String> { @Override public String call() throws Exception { System.out.println("A start "+System.currentTimeMillis()); for(int i=0;i<5;i++) { Thread.sleep(100); System.out.println("A 中 "+(i+1)+" 在运行"); } System.out.println("A end "+System.currentTimeMillis()); return "call result A"; } } public class MycallableB implements Callable<String> { @Override public String call() throws Exception { try { System.out.println("B start "+System.currentTimeMillis()); for(int i=0;i<20;i++) { Thread.sleep(100); System.out.println("B 中 "+(i+1)+" 在运行"); } int i =1/0; System.out.println("B end "+System.currentTimeMillis()); }catch (Exception e) { System.out.println("A 显示捕获异常 "); e.printStackTrace(); throw e; } return "call result B"; } } public static void main(String[] args) { List list = new ArrayList(); list.add(new MycallableA()); list.add(new MycallableB()); ExecutorService ex = Executors.newCachedThreadPool(); try { String str = (String) ex.invokeAny(list); System.out.println("取得返回值 "+str); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

    显式捕获异常并抛出 测试结果 A start 1601802307343 B start 1601802307343 A 中 1 在运行 A 中 5 在运行 A end 1601802307846 取得返回值 call result A A 显示捕获异常 java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at executorService.MycallableB.call(MycallableB.java:12) at executorService.MycallableB.call(MycallableB.java:1) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.Executors R u n n a b l e A d a p t e r . c a l l ( U n k n o w n S o u r c e ) a t j a v a . u t i l . c o n c u r r e n t . F u t u r e T a s k . r u n ( U n k n o w n S o u r c e ) a t j a v a . u t i l . c o n c u r r e n t . T h r e a d P o o l E x e c u t o r . r u n W o r k e r ( U n k n o w n S o u r c e ) a t j a v a . u t i l . c o n c u r r e n t . T h r e a d P o o l E x e c u t o r RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor RunnableAdapter.call(UnknownSource)atjava.util.concurrent.FutureTask.run(UnknownSource)atjava.util.concurrent.ThreadPoolExecutor.runWorker(UnknownSource)atjava.util.concurrent.ThreadPoolExecutorWorker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

    不显式捕获异常并抛出 测试结果 A start 1601802475582 B start 1601802475583 A 中 5 在运行 A end 1601802476086 B 中 5 在运行 取得返回值 call result A

    invokeAny() 与执行快的任务异常 1 执行快的任务没有显式捕获异常,会返回执行慢的任务结果; 2 有显式吧捕获但没有抛出, 会返回执行快的任务结果(主线程没有收到线程的异常信息,以为是正确结果)。 3 有显式捕获并且抛出异常,会返回任务执行慢的结果。

    public class MycallableA implements Callable<String> { @Override public String call() throws Exception { try { System.out.println("A start "+System.currentTimeMillis()); for(int i=0;i<5;i++) { Thread.sleep(100); System.out.println("A 中 "+(i+1)+" 在运行"); } int i = 1/0; System.out.println("A end "+System.currentTimeMillis()); }catch (Exception e) { System.out.println("A 显示捕获异常 "); } return "call result A"; } } public class MycallableB implements Callable<String> { @Override public String call() throws Exception { System.out.println("B start "+System.currentTimeMillis()); for(int i=0;i<20;i++) { Thread.sleep(100); System.out.println("B 中 "+(i+1)+" 在运行"); } System.out.println("B end "+System.currentTimeMillis()); return "call result B"; } } public static void main(String[] args) { List list = new ArrayList(); list.add(new MycallableA()); list.add(new MycallableB()); ExecutorService ex = Executors.newCachedThreadPool(); try { String str = (String) ex.invokeAny(list); System.out.println("取得返回值 "+str); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

    捕获没有抛出 测试结果 A start 1601802779224 B start 1601802779224 A 中 1 在运行 A 显示捕获异常 取得返回值 call result A

    捕获并且抛出 测试结果 A start 1601803057506 B start 1601803057506 B 中 5 在运行 A 中 5 在运行 A 显示捕获异常 B 中 20 在运行 B end 1601803059513 取得返回值 call result B

    invokeAny() 与全部任务异常 返回最后一个任务执行的异常信息并输出。

    T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 在指定时间内获取到第一个执行完的任务结果,如果出现超时则会抛出超时异常,任务会继续执行,并在执行完返回结果,所有这里应该结合 if(Thread.currentThread().isTnterrupt()==true)判断是否中断线程。 如果任务超时又异常则会抛出超时异常,和运行中的异常并中断线程。超时异常都是在主线程中捕获的。

    List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException; 返回所有的执行结果,具有线程阻塞特性,所有任务都返回才会向下执行。

    invokeAll() 快的正常慢的异常 能取到没有异常的任务结果,有异常的会抛出异常信息。说明invokeAll()对Callable的异常是可以处理的。

    public class MycallableA implements Callable<String> { @Override public String call() throws Exception { System.out.println("A start "+System.currentTimeMillis()); for(int i=0;i<5;i++) { Thread.sleep(100); System.out.println("A 中 "+(i+1)+" 在运行"); } System.out.println("A end "+System.currentTimeMillis()); return "call result A"; } } public class MycallableB implements Callable<String> { @Override public String call() throws Exception { System.out.println("B start "+System.currentTimeMillis()); for(int i=0;i<20;i++) { Thread.sleep(100); System.out.println("B 中 "+(i+1)+" 在运行"); } int i =1/0; System.out.println("B end "+System.currentTimeMillis()); return "call result B"; } } public static void main(String[] args) { List list = new ArrayList(); list.add(new MycallableA()); list.add(new MycallableB()); ExecutorService ex = Executors.newCachedThreadPool(); try { List<Future<String>> str = ex.invokeAll(list); for(Future<String> m:str) { System.out.println("取得返回值 "+m.get()); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

    测试结果 A start 1601803851477 B start 1601803851477 A 中 1 在运行 B 中 5 在运行 A end 1601803851980 B 中 6 在运行 B 中 20 在运行 取得返回值 call result A java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at executorService.AnyTest.main(AnyTest.java:22) Caused by: java.lang.ArithmeticException: / by zero at executorService.MycallableB.call(MycallableB.java:15) at executorService.MycallableB.call(MycallableB.java:1) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

    invokeAll() 快的异常慢的正常 会一个结果也拿不到,抛出异常信息,因为快的任务完成后第一次for循环遍历结果的时候就会抛出异常,而不再进入后面的循环,所以一个结果也拿不到。如果invokeAll()全部出现异常输出也是一样的。 测试结果‘ A start 1601804150270 B start 1601804150271 B end 1601804152290 java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at executorService.AnyTest.main(AnyTest.java:22) Caused by: java.lang.ArithmeticException: / by zero at executorService.MycallableA.call(MycallableA.java:15) at executorService.MycallableA.call(MycallableA.java:1) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

    List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException; 全部任务在指定时间内没有完成,则抛出异常。

    invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)先慢后快 ’一个任务结果也获取不到,在第一次循环是就会抛出异常CancellationException,不进行后面的循环,而不是invokeAll()的超时异常。要是全部超时输出结果也跟这个一样。

    public class MycallableA implements Callable<String> { @Override public String call() throws Exception { System.out.println("A start "+System.currentTimeMillis()); for(int i=0;i<5;i++) { Thread.sleep(100); System.out.println("A 中 "+(i+1)+" 在运行"); } System.out.println("A end "+System.currentTimeMillis()); return "call result A"; } } public class MycallableB implements Callable<String> { @Override public String call() throws Exception { System.out.println("B start "+System.currentTimeMillis()); for(int i=0;i<20;i++) { Thread.sleep(100); System.out.println("B 中 "+(i+1)+" 在运行"); } System.out.println("B end "+System.currentTimeMillis()); return "call result B"; } } public static void main(String[] args) { List list = new ArrayList(); list.add(new MycallableA()); list.add(new MycallableB()); ExecutorService ex = Executors.newCachedThreadPool(); try { List<Future<String>> str = ex.invokeAll(list,1,TimeUnit.SECONDS); for(int i=0;i<str.size();i++) { System.out.println("第"+ (i+1) + " 次循环"); System.out.println("取得返回值 "+str.get(i).get()); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

    测试结果 B start 1601804819858 A start 1601804819858 A 中 5 在运行 A end 1601804820361 B 中 6 在运行 B 中 7 在运行 B 中 8 在运行 B 中 9 在运行 第1 次循环 Exception in thread “main” java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at executorService.AnyTest.main(AnyTest.java:24)

    invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)先快后慢 能获取到快的任务结果,后面慢的因为超时获取不到抛出CancellationException异常。 测试结果 A start 1601805029905 B start 1601805029905 A end 1601805030408 B 中 9 在运行 第1 次循环 取得返回值 call result A 第2 次循环 Exception in thread “main” java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at executorService.AnyTest.main(AnyTest.java:24)

    Processed: 0.016, SQL: 8