线程池原理

    科技2022-07-11  94

    线程池

    java线程池java线程池介绍线程池作用统一管理复用线程控制并发数量 线程池原理线程池的七个参数workQueuethreadFactoryhandler 复用线程使用线程池的注意事项六种线程池的使用由ThreadPoolExecutor 创建newCachedThreadPool(可缓存线程的线程池)newSingleThreadExecutor(单线程的线程池)newFixedThreadPool(固定数目的线程池) 由ScheduledThreadPoolExecutor 创建ScheduledThreadPool(定时或周期的线程池)SingleThreadScheduledExecutor(定时或周期的单线程线程池) JKD7后出现: ForkJoinPool 如何定义线程池参数实战练习最后

    java线程池

    java线程池介绍

    ​ 我们虽然可以只使用new来创建一个线程,然后直接调用start()方法就可以实现,那为啥还要使用线程池呢?

    线程池作用

    统一管理

    ​ 线程池说到底就是一个线程调度系统,其中存在一个调度线程,这个调度线程用于管理布控整个线程池里的各种任务和事务。比如:创建线程、销毁线程、任务队列管理以及线程队列管理

    复用线程

    ​ 这是线程池最大的优势,由于创建和销毁线程的开销巨大,所以线程池的出现就是为了能实现线程的复用。在很大程度上节约了机器资源

    控制并发数量

    ​ 指的是使用线程池可以控制同时运行的线程数量。如果线程太少不利于处理,线程太多又会造成切换线程有上下文的开销,反而让机器吞吐量下降。

    线程池原理

    ​ 线程池原理如下图所示:

    执行逻辑说明: 判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关

    其中涉及到几个核心概念:

    核心线程: 线程池中有两类线程:核心线程和非核心线程。 核心线程默认情况下创建后,就会一直存在于线程池中。非核心线程如果长时间闲置,就会被销毁 任务队列: 等待队列,维护着等待执行的Runnable任务对象,是一个线程安全的阻塞队列 线程池满: 核心线程+非核心线程的总数达到了线程池设定的阈值 拒绝策略: 线程池满后,表示当前线程池没有能力去处理更多的任务,所以在创建线程池的时候,可以指定这个拒绝策略

    线程池的七个参数

    // 七个参数的构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

    ​ 如图所示的就是线程池的七个参数,前面几个比较好理解

    corePoolSize:核心线程的最大值maximumPoolSize:线程总数最大值keepAliveTime:非核心线程空闲时间unit:非核心线程空闲时间的单位
    workQueue

    ​ 任务队列,是一个线程安全阻塞队列BlockingQueue,这是一个接口,有很多实现。

    ​ 任务队列也是线程池用来控制并发数量的关键。常见的阻塞队列的实现有一下几种:

    LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的大小;SynchronousQueue:同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。DelayQueue:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。

    一般来说用LinkedBlockingQueue和ArrayBlockingQueue的场景较多。选择哪个在于你想不想限制任务队列的数量。

    threadFactory

    ​ 创建线程的工厂,用于批量创建线程,统一在常见线程时设置一些参数,如线程名称,是否守护线程,线程的优先级等。

    ​ ThreadFactory也是一个接口,如果不指定,会使用DefaultThreadFactory新建一个默认的线程工厂

    ​ 很多时候我们会自己实现一个ThreadFactory,在里面指定线程名称的前缀,这样在排查问题的时候容易定位到问题的来源。

    handler

    ​ 拒绝处理策,当线程数量大于最大线程数的时候,就会采用拒绝处理策略,一共有四种拒绝处理策略:

    ThreadpoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

    复用线程

    ​ 上文中我们提到线程池的三大好处:统一管理、复用线程、控制并发线程数量。

    ​ 其中统一管理体现在threadfactory,控制并发线程数量体现在workQueue。那么线程池是如何复用线程的呢?

    ​ ThreadPoolExecutor在创建线程的时候,会将线程封装成工作线程worker,并放入工作线程组,然后这个worker反复从阻塞队列中拿任务去执行。这个Worker是一个内部类,它继承了AQS,实现了Runnable。

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ // 省略 }

    ​ 这里说的“工作线程组”不是前面提到的任务队列workQueue而是一个HashSet:

    private final HashSet<Worker> workers = new HashSet<Worker>();

    ​ 这个worker在创建后,就会去任务队列里不断拿新的任务出来,然后调用这个任务的run()方法。

    ​ 我们通过线程池的execute(Runnable command)方法,扔进线程池的线程,并没有向我们平时创建线程一样,新建一个Thread,然后用start来启动,而是由一个个worker直接调用run()方法去执行的,这样就达到了复用线程的目的。

    使用线程池的注意事项

    ​ 注意参数。每一个参数都需要仔细考量,尤其是核心线程数量、最大线程的数量、非核心线程存活时间。

    ​ 它需要你考虑到方方面面,尤其是你的程序不只一个线程池的时候。而且这跟你的任务数量也有一定的关系,所以最好提前做好预估和调研。

    ​ 核心线程不要太多,一般是CPU核心数量的2倍即可。

    ​ 绝大多数时候其实是核心线程在工作,只有当任务队列满之后,才会启动非核心线程。所以任务队列是有讲究的,如果你使用基于链表的阻塞队列,那它的最大长度是Integer.MAX_VALUE,大量的任务堆积可能会导致OOM。

    ​ 所以在任务数量可以大概预估的时候,尤其是执行一些自己写的task之类的程序,比较推荐用基于数组的阻塞队列,限制一下阻塞队列的长度。这样超过长度的,就可以启动一些临时线程去处理,加大系统的吞吐量。

    ​ 拒绝策略也很重要,如果不是很重要的任务,可以直接丢弃掉。如果任务比较重要,会影响到应用的主要逻辑,那还是抛一下异常比较好。

    ​ JDK提供了一个创建线程池的工具类Executors,提供了一些静态方法用于方便地创建一些特殊的线程池。它其实也是调用的ThreadPoolExecutor的构造方法,只是封装了一下,看起来更语义化。

    ​ 其实如果你了解了线程池的原理,可以看看这几个静态方法的源码,看看它们分别是用的什么参数,对自己以后配置线程池参数也有一些参考价值。

    六种线程池的使用

    由ThreadPoolExecutor 创建

    FixedThreadPoolCachedThreadPoolSingleThreadExecutor

    Executors创建返回ThreadPoolExecutor对象的方法共有三种:

    newCachedThreadPool(可缓存线程的线程池)
    public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 适用于耗时短的任务、任务处理速度>任务提交速度。就不会造成不断创建新线程。资源不足容易造成OOM
    newSingleThreadExecutor(单线程的线程池)
    public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 只有一个线程适用场景:用于所有任务都需要按被提交的顺序依次执行的场景。
    newFixedThreadPool(固定数目的线程池)
    public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 在资源有限的时候容易引起OOM异常。线程数量固定,比较适合耗时较长的任务,避免频繁回收和分配线程

    由ScheduledThreadPoolExecutor 创建

    ScheduledThreadPoolSingleThreadScheduledExecutor
    ScheduledThreadPool(定时或周期的线程池)
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } 适用场景:定时或周期性执行任务,有三个重要方法。 ScheduledExecutorService service = Executors.newScheduledThreadPool(10); // 延迟指定时间后执行一次任务(这里是 10s 后执行完任务,结束) service.schedule(new Task(), 10, TimeUnit.SECONDS); // 以固定的频率执行任务(表示第一次延时后每次延时多长时间执行一次),第二个参数是第一次延迟的时间,第三个参数是周期 service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS); // 类似于第二个,区别在于周期的定义。第二个方法的周期是以任务开始时间为起始时间计时,而这个是以任务结束的时间为起始时间 service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
    SingleThreadScheduledExecutor(定时或周期的单线程线程池)
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } 只有一个线程且支持定时、周期功能。很明显是 ScheduledThreadPool 和 SingleThreadExecutor 的结合体。适用于对执行顺序有要求,且需要定时或周期执行的任务

    JKD7后出现: ForkJoinPool

    如何定义线程池参数

    CPU密集型——线程池的大小推荐为CPU+1IO密集型——CPU数量*(1+线程等待时间/线程CPU时间)混合型——将任务分成CPU密集型和IO密集型,然后后分别使用不同的线程池去处理,从而使每个线程池可以根据各自的工作负载来调整

    CPU的数量可以根据Runtime.availableProcessors方法来获取

    阻塞队列——推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生拒绝策略——默认使用的是AbortPolicy拒绝策略,直接在程序中抛出RejectedExecutionException异常(由于不是运行时异常,不强制Catch)。

    实战练习

    ​ 我们自定义一个线程池,然后通过for循环连续创建10个任务并打印线程的执行信息

    public class TaskTest { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,6,5L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(4)); for(int i = 0 ;i< 10 ;i++){ threadPoolExecutor.execute(()->{ System.out.println("测试线程池:"+Thread.currentThread().getName()+"==="+threadPoolExecutor.toString()); }); } } }

    运行结果:

    因为我们设置的时候corePoolSize =3.maximumPoolSize=6,workQueue大小为4。

    ​ 从运行结果中我们可以发现:总共创建了6个线程来执行完成了10个任务。其实很好理解,c=3个核心线程执行了3个任务,然后4个任务在队列中等待核心线程执行,最后额外创建了e=3个线程执行了剩下的3个任务,总创建的线程数就是 c + e = 6 <= 6(最大线程数)。 ​

    最后

    如果觉得看完有收获,希望能给我点个赞,这将会是我更新的最大动力,感谢各位的支持欢迎各位关注我的公众号【java冢狐】,专注于java和计算机基础知识,保证让你看完有所收获,不信你打我如果看完有不同的意见或者建议,欢迎多多评论一起交流。感谢各位的支持以及厚爱。
    Processed: 0.054, SQL: 8