ThreadPoolExecuto 线程池 最深刻理解

    科技2022-08-10  106

    1. 线程池的理解:

    为什么需要线程池: 线程可以复用,线程任务执行完毕以后,cpu 调度 以后可以复用,不用频繁创建开启线程

    代码1:

    public class Main2 { public static void main(String[] args) throws InterruptedException { /* 参数1:核心线程数,默认启动线程数 * 参数2: 最大启动线程数 * 参数3,参数4, 线程不够的时候,启动的线程,如果60s 内没有在次执行Runnable,那么 被回收 * 参数5: 如果任务超过最大线程数,没有对应的线程来执行,那么放入队列中 */ ExecutorService executorService = new ThreadPoolExecutor(1, 2,60, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(3)); for (int i = 0; i < 5; i++) { executorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("线程:"+ Thread.currentThread().getName()); } }); } } }

    运行结果:正确执行

    线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1

    如何理解上面代码:   核心线程是1,有5个任务  任务1交给核心线程执行,那么还有4个任务,  阻塞队列长度是3,3个任务放入阻塞队列中,  最大线程是2,那么还可以启动一个线程, 剩下的一个任务交给这个线程A执行,  等核心线程  和 线程A执行完成任务,从队列中 取出 任务交替 执行

      代码2 :

    public class Main2 { public static void main(String[] args) throws InterruptedException { /* 参数1:核心线程数,默认启动线程数 * 参数2: 最大启动线程数 * 参数3,参数4, 线程不够的时候,启动的线程,如果60s 内没有在次执行Runnable,那么 被回收 * 参数5: 如果任务超过最大线程数,没有对应的线程来执行,那么放入队列中 */ ExecutorService executorService = new ThreadPoolExecutor(1, 2,60, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(3)); for (int i = 0; i < 6; i++) { executorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("线程:"+ Thread.currentThread().getName()); } }); } } }

    运行结果: 报错

    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.denganzhi.bb.Main2$1@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]     at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)     at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)     at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)     at com.denganzhi.bb.Main2.main(Main2.java:22) 线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1

    如何理解上面代码:   核心线程是1,有6个任务  任务1交给核心线程执行,那么还有5个任务,  阻塞队列长度是3,3个任务放入阻塞队列中,  最大线程是2,那么还可以启动一个线程, 剩下的一个任务交给这个线程A执行,  那么此时还有一个任务B,无法处理,报错.....  前面5个任务,由核心线程  和 线程A执行完成任务,从队列中 取出 任务交替 执行

    2.   系统API 提供线程池

    // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 60L, TimeUnit.SECONDS, // new SynchronousQueue<Runnable>()); Executors.newCachedThreadPool(); // 核心线程数是0 , 最大线程数 Integer.MAX_VALUE // 核心线程数是 6 ,最大线程数是 6 // return new ThreadPoolExecutor(6, 6, // 0L, TimeUnit.MILLISECONDS, // new LinkedBlockingQueue<Runnable>()); Executors.newFixedThreadPool(6);

      3. Callable  任务,返回值线程

    callable的基本使用:

    public class TestThread implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(5000); return "ni hao"; } public static void main(String[] args) throws InterruptedException, ExecutionException { TestThread tt= new TestThread(); FutureTask<String> ft=new FutureTask<>(tt); Thread t= new Thread(ft); // 启动线程 t.start(); System.out.println(System.currentTimeMillis()); // 阻塞 String str= ft.get(); System.out.println(str); } }

    线程池中使用 Callable 通过 submit  阻塞,逐一执行,输出

    public class TestThread{ public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service= Executors.newFixedThreadPool(5); for (int i = 0; i <10; i++) { Future<String> f= service.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return Thread.currentThread().getName(); } }); // 阻塞,逐一执行,输出 String str= f.get(); System.out.println(str); } service.shutdown(); } }

    把执行结果装入List ,一次输出5个线程执行结果:

    public class TestThread{ public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service= Executors.newFixedThreadPool(5); List<Future> list=new ArrayList<Future>(); for (int i = 0; i <10; i++) { Future<String> f= service.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return Thread.currentThread().getName(); } }); // 阻塞,逐一执行,输出 list.add( f); } for (int i = 0; i < list.size(); i++) { System.out.println(list.get(i).get()); } } }

     4.  Callable  源码分析

    源码分析:

    public class TestThread implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(5000); return "ni hao"; } public static void main(String[] args) throws InterruptedException, ExecutionException { TestThread tt= new TestThread(); FutureTask<String> ft=new FutureTask<>(tt); Thread t= new Thread(ft); // 启动线程 t.start(); System.out.println(System.currentTimeMillis()); // 阻塞 String str= ft.get(); System.out.println(str); } }

     源码分析:      FutureTask<String> ft=new FutureTask<>(tt);         Thread t= new Thread(ft);          // 启动线程         t.start();   内部调用  FutureTask 的 run 方法 , 此时 state = NEW  就是 0 

    public void run() { // 此时 state == NEW if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //这里 回调 线程中 重写的 call 方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } }

       set(result);

    protected void set(V v) { private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; // 这里 state 从 0 - 1 -2 也就 state = COMPLETING -> NORMAL if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }

        // 阻塞  内部死循环          String str= ft.get();

    public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 这里死循环,返回 NORMAL s = awaitDone(false, 0L); return report(s); } // 返回 s , 就是 f.get()的 返回值 @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

    阻塞内部回调原理, 值得状态变化 : 

    public class TestThread{ public static void main(String[] args) throws InterruptedException, ExecutionException { showA showa=new showA(); showa.showGradle(new gradleA() { @Override public void showA() { System.out.println("xxx"); } }); } } class showA{ interface gradleA{ void showA(); } gradleA gradlaa; public void showGradle( gradleA gradla) throws InterruptedException{ for ( ; ; ) { Thread.sleep(3000); gradla.showA(); return; } } }

     

    Processed: 0.030, SQL: 8