为什么需要线程池: 线程可以复用,线程任务执行完毕以后,cpu 调度 以后可以复用,不用频繁创建开启线程
代码1:
public class Main2 { public static void main(String[] args) throws InterruptedException { /* 参数1:核心线程数,默认启动线程数 * 参数2: 最大启动线程数 * 参数3,参数4, 线程不够的时候,启动的线程,如果60s 内没有在次执行Runnable,那么 被回收 * 参数5: 如果任务超过最大线程数,没有对应的线程来执行,那么放入队列中 */ ExecutorService executorService = new ThreadPoolExecutor(1, 2,60, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(3)); for (int i = 0; i < 5; i++) { executorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("线程:"+ Thread.currentThread().getName()); } }); } } }运行结果:正确执行
线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1
如何理解上面代码: 核心线程是1,有5个任务 任务1交给核心线程执行,那么还有4个任务, 阻塞队列长度是3,3个任务放入阻塞队列中, 最大线程是2,那么还可以启动一个线程, 剩下的一个任务交给这个线程A执行, 等核心线程 和 线程A执行完成任务,从队列中 取出 任务交替 执行
代码2 :
public class Main2 { public static void main(String[] args) throws InterruptedException { /* 参数1:核心线程数,默认启动线程数 * 参数2: 最大启动线程数 * 参数3,参数4, 线程不够的时候,启动的线程,如果60s 内没有在次执行Runnable,那么 被回收 * 参数5: 如果任务超过最大线程数,没有对应的线程来执行,那么放入队列中 */ ExecutorService executorService = new ThreadPoolExecutor(1, 2,60, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(3)); for (int i = 0; i < 6; i++) { executorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("线程:"+ Thread.currentThread().getName()); } }); } } }运行结果: 报错
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.denganzhi.bb.Main2$1@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at com.denganzhi.bb.Main2.main(Main2.java:22) 线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1 线程:pool-1-thread-2 线程:pool-1-thread-1
如何理解上面代码: 核心线程是1,有6个任务 任务1交给核心线程执行,那么还有5个任务, 阻塞队列长度是3,3个任务放入阻塞队列中, 最大线程是2,那么还可以启动一个线程, 剩下的一个任务交给这个线程A执行, 那么此时还有一个任务B,无法处理,报错..... 前面5个任务,由核心线程 和 线程A执行完成任务,从队列中 取出 任务交替 执行
callable的基本使用:
public class TestThread implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(5000); return "ni hao"; } public static void main(String[] args) throws InterruptedException, ExecutionException { TestThread tt= new TestThread(); FutureTask<String> ft=new FutureTask<>(tt); Thread t= new Thread(ft); // 启动线程 t.start(); System.out.println(System.currentTimeMillis()); // 阻塞 String str= ft.get(); System.out.println(str); } }线程池中使用 Callable 通过 submit 阻塞,逐一执行,输出
public class TestThread{ public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service= Executors.newFixedThreadPool(5); for (int i = 0; i <10; i++) { Future<String> f= service.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return Thread.currentThread().getName(); } }); // 阻塞,逐一执行,输出 String str= f.get(); System.out.println(str); } service.shutdown(); } }把执行结果装入List ,一次输出5个线程执行结果:
public class TestThread{ public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service= Executors.newFixedThreadPool(5); List<Future> list=new ArrayList<Future>(); for (int i = 0; i <10; i++) { Future<String> f= service.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return Thread.currentThread().getName(); } }); // 阻塞,逐一执行,输出 list.add( f); } for (int i = 0; i < list.size(); i++) { System.out.println(list.get(i).get()); } } }源码分析:
public class TestThread implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(5000); return "ni hao"; } public static void main(String[] args) throws InterruptedException, ExecutionException { TestThread tt= new TestThread(); FutureTask<String> ft=new FutureTask<>(tt); Thread t= new Thread(ft); // 启动线程 t.start(); System.out.println(System.currentTimeMillis()); // 阻塞 String str= ft.get(); System.out.println(str); } }源码分析: FutureTask<String> ft=new FutureTask<>(tt); Thread t= new Thread(ft); // 启动线程 t.start(); 内部调用 FutureTask 的 run 方法 , 此时 state = NEW 就是 0
public void run() { // 此时 state == NEW if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //这里 回调 线程中 重写的 call 方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } }set(result);
protected void set(V v) { private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; // 这里 state 从 0 - 1 -2 也就 state = COMPLETING -> NORMAL if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }// 阻塞 内部死循环 String str= ft.get();
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 这里死循环,返回 NORMAL s = awaitDone(false, 0L); return report(s); } // 返回 s , 就是 f.get()的 返回值 @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }阻塞内部回调原理, 值得状态变化 :
public class TestThread{ public static void main(String[] args) throws InterruptedException, ExecutionException { showA showa=new showA(); showa.showGradle(new gradleA() { @Override public void showA() { System.out.println("xxx"); } }); } } class showA{ interface gradleA{ void showA(); } gradleA gradlaa; public void showGradle( gradleA gradla) throws InterruptedException{ for ( ; ; ) { Thread.sleep(3000); gradla.showA(); return; } } }