合理的多线程编程能够充分的利用CPU,内存,IO,网络等资源。
但是在阿里巴巴开发手册里提到不能显示的创建线程,就是new Thread。这是因为创建和销毁线程需要一定的开销。在很多情况下,创建和销毁线程加起来的时间要大于使用线程的时间。如果大量的创建和销毁线程,将会造成巨大的系统资源浪费。
线程池技术因此诞生。利用线程池,将很好的管理并复用线程,极大的节约了系统资源。但是,阿里巴巴开发手册又明确要求不能使用官方提供的四种线程池。这又是为什么呢?今天,来讲讲线程池
创建线程的7个参数在很大程度上影响线程池运作。所以需要详细知道线程池参数的含义
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }corePoolSize :核心线程数,默认不会被回收。但其实也是会的,只要把allowCoreThreadTimeOut设为true,当核心线程空闲时,也会被回收。 maximumPoolSize:最大线程数。线程池能创建的最大线程。等于核心线程加非核心线程 keepAliveTime :非核心线程的最大存活时间 unit :存活的时间单位 workQueue:存放任务的队列 threadFactory:创建线程的工厂 handler:当线程池没法接收任务,就是线程池满了,工作队列也满了的时候的拒绝策略。
1.在创建线程池后,等待提交过来的任务请求。
2.当有任务请求时,调用exeute()
2.1如果正在运行的线程数量小于线程池的核心线程数corePoolSize,马上创建一个线程运行这个任务2.2如果正在运行的线程数量等于核心线程数corePoolSize,将这个任务放入任务队列里2.3如果任务队列满了,并且正在运行的线程数量小于最大线程数MaximumPool,这时创建非核心线程来运行这个任务2.4如果任务队列满了,正在运行的线程数也已经等于最大线程数,那么线程池会启动饱和拒绝策略3.当一个线程完成任务后,它会从队列里再拿出一个任务执行
4.当非核心线程空闲时间超过KeepAliveTime,会被回收。如果allowCoreThreadTimeOut设置为true,核心线程超过KeepAliveTime,也会被回收。
这里要注意下个优先级,任务队列的优先级要大于非核心线程,是核心线程数满了,然后把任务放入任务队列里,最后才是创建非核心线程。
JDK默认提供了四种拒绝策略: AbortPolicy(默认) DiscardPolicy DiscardOldestPolicy CallerRunsPolicy
这四个其实就ThreadPoolExecutor类里的四个公开的内部静态类。这四个类都实现了RejectedExecutionHandler类。
AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }DiscardPolicy:丢弃任务,但是不抛出异常。不推荐。源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //什么都没做,就是丢弃任务 }DiscardOldestPolicy :抛弃队列里最久的任务,然后把当前任务队列中。源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { //队列的头(最早进去的那个)先弹出来 e.getQueue().poll(); //再调用方法提交任务 e.execute(r); } }CallerRunsPolicy:直接调用run方法,绕过线程池,自己执行。源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }任务队列实际上就是阻塞队列。阻塞队列是JUI并发包下提供的特殊队列。具有以下特点: 当队列是空的时候,向队列获取元素会被阻塞。 当队列是满的时候,往队列插入元素会被阻塞。
所有阻塞队列都实现了BlockingQueue接口 常用阻塞队列如下: ArrayBlockingQueue:由数组构成的有界阻塞队列 LinkedBlockingQueue:由链表构成的有界阻塞队列,但是默认大小为Integer.MAX_VALUE SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列 DelayQueue:使用优先级队列实现的延迟无界阻塞队列 LinkedTransferQueue:有链表构成的无界阻塞队列 LinkedBlockingDeque:由链表构成的双向阻塞队列
JDK官方提供的四种线程池,指的是Executors类的四个静态方法创建的线程池: 1)FixedThreadPool 和 SingleThreadPool。这两种的线程池的阻塞队列都是LinkerBlockingQueue,长度都为Integer.MAX_VALUE,可能会堆积大量的请求,容易发生OOM。源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }2)CacherThreadPool 和 ScheduledThreadPool 这两种的线程池的最大线程数是Integer.MAX_VALUE,可能会创建大量的线程,也容易导致OOM。源码如下
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { //调用的是ScheduledThreadPoolExecutor构造方法,见下方代码 return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor构造方法 public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }这个要根据硬盘的配置的业务场景的确定。
假设是CPU密集型,那么线程数一般是 CPU核心数+1
假如是IO密集型,那么有两种计算方式 方式一:2*CPU核心数 + 1 方式二:CPU核心数/(1-阻塞系数)阻塞系数为0.8-0.9之间。比如一个8核的CPU,线程数就是8/(1-0.9),为80。