我们虽然可以只使用new来创建一个线程,然后直接调用start()方法就可以实现,那为啥还要使用线程池呢?
线程池说到底就是一个线程调度系统,其中存在一个调度线程,这个调度线程用于管理布控整个线程池里的各种任务和事务。比如:创建线程、销毁线程、任务队列管理以及线程队列管理
这是线程池最大的优势,由于创建和销毁线程的开销巨大,所以线程池的出现就是为了能实现线程的复用。在很大程度上节约了机器资源
指的是使用线程池可以控制同时运行的线程数量。如果线程太少不利于处理,线程太多又会造成切换线程有上下文的开销,反而让机器吞吐量下降。
线程池原理如下图所示:
执行逻辑说明: 判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关其中涉及到几个核心概念:
核心线程: 线程池中有两类线程:核心线程和非核心线程。 核心线程默认情况下创建后,就会一直存在于线程池中。非核心线程如果长时间闲置,就会被销毁 任务队列: 等待队列,维护着等待执行的Runnable任务对象,是一个线程安全的阻塞队列 线程池满: 核心线程+非核心线程的总数达到了线程池设定的阈值 拒绝策略: 线程池满后,表示当前线程池没有能力去处理更多的任务,所以在创建线程池的时候,可以指定这个拒绝策略 如图所示的就是线程池的七个参数,前面几个比较好理解
corePoolSize:核心线程的最大值maximumPoolSize:线程总数最大值keepAliveTime:非核心线程空闲时间unit:非核心线程空闲时间的单位 任务队列,是一个线程安全阻塞队列BlockingQueue,这是一个接口,有很多实现。
任务队列也是线程池用来控制并发数量的关键。常见的阻塞队列的实现有一下几种:
LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的大小;SynchronousQueue:同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。DelayQueue:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。一般来说用LinkedBlockingQueue和ArrayBlockingQueue的场景较多。选择哪个在于你想不想限制任务队列的数量。
创建线程的工厂,用于批量创建线程,统一在常见线程时设置一些参数,如线程名称,是否守护线程,线程的优先级等。
ThreadFactory也是一个接口,如果不指定,会使用DefaultThreadFactory新建一个默认的线程工厂
很多时候我们会自己实现一个ThreadFactory,在里面指定线程名称的前缀,这样在排查问题的时候容易定位到问题的来源。
拒绝处理策,当线程数量大于最大线程数的时候,就会采用拒绝处理策略,一共有四种拒绝处理策略:
ThreadpoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。 上文中我们提到线程池的三大好处:统一管理、复用线程、控制并发线程数量。
其中统一管理体现在threadfactory,控制并发线程数量体现在workQueue。那么线程池是如何复用线程的呢?
ThreadPoolExecutor在创建线程的时候,会将线程封装成工作线程worker,并放入工作线程组,然后这个worker反复从阻塞队列中拿任务去执行。这个Worker是一个内部类,它继承了AQS,实现了Runnable。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ // 省略 } 这里说的“工作线程组”不是前面提到的任务队列workQueue而是一个HashSet:
private final HashSet<Worker> workers = new HashSet<Worker>(); 这个worker在创建后,就会去任务队列里不断拿新的任务出来,然后调用这个任务的run()方法。
我们通过线程池的execute(Runnable command)方法,扔进线程池的线程,并没有向我们平时创建线程一样,新建一个Thread,然后用start来启动,而是由一个个worker直接调用run()方法去执行的,这样就达到了复用线程的目的。
注意参数。每一个参数都需要仔细考量,尤其是核心线程数量、最大线程的数量、非核心线程存活时间。
它需要你考虑到方方面面,尤其是你的程序不只一个线程池的时候。而且这跟你的任务数量也有一定的关系,所以最好提前做好预估和调研。
核心线程不要太多,一般是CPU核心数量的2倍即可。
绝大多数时候其实是核心线程在工作,只有当任务队列满之后,才会启动非核心线程。所以任务队列是有讲究的,如果你使用基于链表的阻塞队列,那它的最大长度是Integer.MAX_VALUE,大量的任务堆积可能会导致OOM。
所以在任务数量可以大概预估的时候,尤其是执行一些自己写的task之类的程序,比较推荐用基于数组的阻塞队列,限制一下阻塞队列的长度。这样超过长度的,就可以启动一些临时线程去处理,加大系统的吞吐量。
拒绝策略也很重要,如果不是很重要的任务,可以直接丢弃掉。如果任务比较重要,会影响到应用的主要逻辑,那还是抛一下异常比较好。
JDK提供了一个创建线程池的工具类Executors,提供了一些静态方法用于方便地创建一些特殊的线程池。它其实也是调用的ThreadPoolExecutor的构造方法,只是封装了一下,看起来更语义化。
其实如果你了解了线程池的原理,可以看看这几个静态方法的源码,看看它们分别是用的什么参数,对自己以后配置线程池参数也有一些参考价值。
Executors创建返回ThreadPoolExecutor对象的方法共有三种:
CPU的数量可以根据Runtime.availableProcessors方法来获取
阻塞队列——推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生拒绝策略——默认使用的是AbortPolicy拒绝策略,直接在程序中抛出RejectedExecutionException异常(由于不是运行时异常,不强制Catch)。 我们自定义一个线程池,然后通过for循环连续创建10个任务并打印线程的执行信息
public class TaskTest { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,6,5L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(4)); for(int i = 0 ;i< 10 ;i++){ threadPoolExecutor.execute(()->{ System.out.println("测试线程池:"+Thread.currentThread().getName()+"==="+threadPoolExecutor.toString()); }); } } }运行结果:
因为我们设置的时候corePoolSize =3.maximumPoolSize=6,workQueue大小为4。
从运行结果中我们可以发现:总共创建了6个线程来执行完成了10个任务。其实很好理解,c=3个核心线程执行了3个任务,然后4个任务在队列中等待核心线程执行,最后额外创建了e=3个线程执行了剩下的3个任务,总创建的线程数就是 c + e = 6 <= 6(最大线程数)。