最近公司在搞大促压测,需要根据压测情况来配置线程池的参数。来达到最佳的效果。 由于一个机器现在是多实例部署的情况,所以理想化的线程数=cpu数*2的情况已经不适用了,需要根据压测的实际情况来调整。 所以写了一个通过外置配置文件(使用了阿里的diamond,当然也可以使用springCloud的config server)来达到不重启应用的情况下,动态的调整线程池的参数。 先贴上我的配置文件:一个非常简单的json字符串
{ "coolPoolSize":"8", "maxPoolSize":"16", "maxWaitQueueSize":"1024", "keepAliveTime":"5", "rejectedHandler":"AbortPolicy" }对于动态修改线程池的问题 我这里只修改了 核心线程数,最大线程池数,任务队列长度,超时时间,拒绝策略这5个配置项。
然后对于修改线程池这块改动过一版。 原先是打算设置一个静态的线程池对象EXECTOR,然后当监听到配置文件发生变动,直接改变EXECTOR的指向 相关伪代码
//指针指向新的线程池 EXECUTOR = new ThreadPoolExecutor( corePoolSize ,maxPoolSize ,keepAliveTime ,TimeUnit.MILLISECONDS ,BLOCK_QUEUE ,FACTORY ,rejectedHandler);但是这里存在一个隐患的点。如果采用了这套方案,若此时流量太大,我们去改变线程池的配置的时候,会在内存中生成两个线程池,另一个线程池得把任务执行完毕之后才会停止。双倍机器的负荷,容易产生隐患。
ThreadPoolExecutor其实已经提供了相关set方法,可以实时修改线程池的参数
EXECUTOR.setCorePoolSize(corePoolSize); EXECUTOR.setMaximumPoolSize(maxPoolSize); EXECUTOR.setKeepAliveTime(keepAliveTime,TimeUnit.MILLISECONDS); EXECUTOR.setRejectedExecutionHandler(rejectedHandler);但是却没有调整任务队列长度的能力,但是任务队列的长度也是需要考虑到的一个点,如果上来就是无界队列,是会存在oom的隐患的。
于是我又写了一点方法,提供了调整任务队列长度的能力 从这个角度出发,我们可以先关闭之前的线程池,然后把之前的任务搬到新的线程池中来执行。
相关方法
//动态调整线程池参数 private static void dynamicAdjustThreadPool(String content) { Map<String,String> config = null; try { config = (Map<String, String>) JSON.parse(content); } catch (Exception e) { log.error("parse to hashmap error content = {} ",content,e); return; } //核心线程数 Integer corePoolSize = getCorePoolSizeFromConfig(config); //最大线程数 Integer maxPoolSize = getMaxPoolSizeFromConfig(config); //超时时间 Integer keepAliveTime = getKeepAliveTimeFromConfig(config); //任务队列长度 Integer maxWaitQueueSize = getMaxWaitQueueSizeFromConfig(config); //拒绝策略 RejectedExecutionHandler rejectedHandler = getRejectedHandlerFromConfig(config); if (EXECUTOR==null){ EXECUTOR = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxWaitQueueSize), FACTORY, rejectedHandler); }else { //队列容量发生变化的时候。需要重新new一个新的线程池,再把老的任务搬迁过去 if (CURRENT_CAPACITY!=null && !CURRENT_CAPACITY.equals(maxWaitQueueSize)){ adjustIfNewCapacity(corePoolSize, maxPoolSize, keepAliveTime, maxWaitQueueSize, rejectedHandler); }else { //当队列长度没变化时 adjustIfOldCapacity(corePoolSize,maxPoolSize,keepAliveTime,rejectedHandler); } } //更新当前容量 CURRENT_CAPACITY = maxWaitQueueSize; } /** * 当队列长度发生变化时 * EXECUTOR指向新的线程池,并且把老的线程池中的任务搬迁过来 */ private static void adjustIfNewCapacity(Integer corePoolSize, Integer maxPoolSize, Integer keepAliveTime, Integer maxWaitQueueSize , RejectedExecutionHandler rejectedHandler){ //取一个变量指向老线程池 ThreadPoolExecutor oldExecutor = EXECUTOR; //新任务队列 BlockingQueue<Runnable> newTaskQueue = new LinkedBlockingQueue<>(maxWaitQueueSize); //EXECUTOR指向新线程池 此时新的任务将进入newTaskQueue EXECUTOR = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, newTaskQueue, FACTORY, rejectedHandler); oldExecutor.shutdown(); //如果未关闭 if (!oldExecutor.isTerminated()) { BlockingQueue<Runnable> oldExecutorQueue = oldExecutor.getQueue(); //从老队列搬迁任务到新队列(和原线程池一起消费BLOCK_QUEUE) int loopLimited = 0;//循环上限次数 while (oldExecutorQueue.size() != 0 && loopLimited < maxWaitQueueSize) { loopLimited++; Runnable task = null; try { task = oldExecutorQueue.poll(keepAliveTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { continue; } if (task != null) { //任务搬迁(如果超过了BLOCK_QUEUE的容量 add方法会丢弃多余的任务) newTaskQueue.add(task); } } } } /** * 当队列长度没变化时 */ private static void adjustIfOldCapacity(Integer corePoolSize,Integer maxPoolSize,Integer keepAliveTime,RejectedExecutionHandler rejectedHandler){ EXECUTOR.setCorePoolSize(corePoolSize); EXECUTOR.setMaximumPoolSize(maxPoolSize); EXECUTOR.setKeepAliveTime(keepAliveTime,TimeUnit.MILLISECONDS); EXECUTOR.setRejectedExecutionHandler(rejectedHandler); }全部代码
@Slf4j public class ThreadPoolDiamond { private static final String GROUP_ID = "***"; private static final String DATA_ID = "*******"; /** * 默认核心线程数 */ private static final Integer DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors()*2; /** * 默认最大线程数 */ private static final Integer DEFAULT_MAX_POOL_SIZE = 16; /** * 默认等待时间 */ private static final Integer DEFAULT_KEEP_ALIVE_TIME = 5; /** * 默认等待队列的最大数量 */ private static final Integer DEFAULT_MAX_WAIT_QUEUE_SIZE = 1024; /** * 默认拒绝策略 */ private static final RejectedExecutionHandler DEFAUALT_REJECTED_HANDLER = new ThreadPoolExecutor.AbortPolicy(); private static final ThreadFactory FACTORY = r -> new Thread(r,"ThreadPoolDiamond".concat("-thread-%d")); /** * diamond相关配置项 */ private static final String CORE_POOL_SIZE = "coolPoolSize"; private static final String MAX_POOL_SIZE = "maxPoolSize"; private static final String KEEP_ALIVE_TIME = "keepAliveTime"; private static final String MAX_WAIT_QUEUE_SIZE = "maxWaitQueueSize"; private static final String REJECTED_HANDLER = "rejectedHandler"; private static final Map<String,RejectedExecutionHandler> REJECTED_HANDLER_MAP = new HashMap<>(); static { //当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。 REJECTED_HANDLER_MAP.put("AbortPolicy",new ThreadPoolExecutor.AbortPolicy()); //当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。 REJECTED_HANDLER_MAP.put("CallerRunsPolicy",new ThreadPoolExecutor.CallerRunsPolicy()); //当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。 REJECTED_HANDLER_MAP.put("DiscardPolicy",new ThreadPoolExecutor.DiscardPolicy()); //当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。 REJECTED_HANDLER_MAP.put("DiscardOldestPolicy",new ThreadPoolExecutor.DiscardOldestPolicy()); } /** * 当前阻塞队列的容量 */ private static Integer CURRENT_CAPACITY; /** * 线程池 */ private static ThreadPoolExecutor EXECUTOR ; //这里的静态代码块是监听diamond中间件的,如果配置文件变化了,就会调用dynamicAdjustThreadPool() static { try { String content = Diamond.getConfig(DATA_ID, GROUP_ID, 5000L); log.error("content={}", content); if (StringUtils.isNotEmpty(content)) { dynamicAdjustThreadPool(content); } } catch (IOException e) { log.error("diamond get config error dataId={}, groupId = {}", DATA_ID, GROUP_ID, e); } Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListener() { @Override public void receiveConfigInfo(String configInfo) { if (StringUtils.isNotEmpty(configInfo)) { dynamicAdjustThreadPool(configInfo); } } @Override public Executor getExecutor() { return null; } }); } public static ExecutorService getThreadPool(){ return EXECUTOR; } //动态调整线程池参数 private static void dynamicAdjustThreadPool(String content) { Map<String,String> config = null; try { config = (Map<String, String>) JSON.parse(content); } catch (Exception e) { log.error("parse to hashmap error content = {} ",content,e); return; } //核心线程数 Integer corePoolSize = getCorePoolSizeFromConfig(config); //最大线程数 Integer maxPoolSize = getMaxPoolSizeFromConfig(config); //超时时间 Integer keepAliveTime = getKeepAliveTimeFromConfig(config); //任务队列长度 Integer maxWaitQueueSize = getMaxWaitQueueSizeFromConfig(config); //拒绝策略 RejectedExecutionHandler rejectedHandler = getRejectedHandlerFromConfig(config); if (EXECUTOR==null){ EXECUTOR = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxWaitQueueSize), FACTORY, rejectedHandler); }else { //队列容量发生变化的时候。需要重新new一个新的线程池,再把老的任务搬迁过去 if (CURRENT_CAPACITY!=null && !CURRENT_CAPACITY.equals(maxWaitQueueSize)){ adjustIfNewCapacity(corePoolSize, maxPoolSize, keepAliveTime, maxWaitQueueSize, rejectedHandler); }else { //当队列长度没变化时 adjustIfOldCapacity(corePoolSize,maxPoolSize,keepAliveTime,rejectedHandler); } } //更新当前容量 CURRENT_CAPACITY = maxWaitQueueSize; } /** * 当队列长度发生变化时 * EXECUTOR指向新的线程池,并且把老的线程池中的任务搬迁过来 */ private static void adjustIfNewCapacity(Integer corePoolSize, Integer maxPoolSize, Integer keepAliveTime, Integer maxWaitQueueSize , RejectedExecutionHandler rejectedHandler){ //取一个变量指向老线程池 ThreadPoolExecutor oldExecutor = EXECUTOR; //新任务队列 BlockingQueue<Runnable> newTaskQueue = new LinkedBlockingQueue<>(maxWaitQueueSize); //EXECUTOR指向新线程池 此时新的任务将进入newTaskQueue EXECUTOR = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, newTaskQueue, FACTORY, rejectedHandler); oldExecutor.shutdown(); //如果未关闭 if (!oldExecutor.isTerminated()) { BlockingQueue<Runnable> oldExecutorQueue = oldExecutor.getQueue(); //从老队列搬迁任务到新队列(和原线程池一起消费BLOCK_QUEUE) int loopLimited = 0;//循环上限次数 while (oldExecutorQueue.size() != 0 && loopLimited < maxWaitQueueSize) { loopLimited++; Runnable task = null; try { task = oldExecutorQueue.poll(keepAliveTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { continue; } if (task != null) { //任务搬迁(如果超过了BLOCK_QUEUE的容量 add方法会丢弃多余的任务) newTaskQueue.add(task); } } } } /** * 当队列长度没变化时 */ private static void adjustIfOldCapacity(Integer corePoolSize,Integer maxPoolSize,Integer keepAliveTime,RejectedExecutionHandler rejectedHandler){ EXECUTOR.setCorePoolSize(corePoolSize); EXECUTOR.setMaximumPoolSize(maxPoolSize); EXECUTOR.setKeepAliveTime(keepAliveTime,TimeUnit.MILLISECONDS); EXECUTOR.setRejectedExecutionHandler(rejectedHandler); } /** * 获取配置的核心线程数 */ public static Integer getCorePoolSizeFromConfig(Map<String,String> configMap){ return Optional.ofNullable(configMap) .map(config->config.get(CORE_POOL_SIZE)) .map(Integer::parseInt) .orElse(DEFAULT_CORE_POOL_SIZE); } /** * 获取配置的最大线程数 */ public static Integer getMaxPoolSizeFromConfig(Map<String,String> configMap){ return Optional.ofNullable(configMap) .map(config->config.get(MAX_POOL_SIZE)) .map(Integer::parseInt) .orElse(DEFAULT_MAX_POOL_SIZE); } /** * 获取配置的默认等待时间 */ public static Integer getKeepAliveTimeFromConfig(Map<String,String> configMap){ return Optional.ofNullable(configMap) .map(config->config.get(KEEP_ALIVE_TIME)) .map(Integer::parseInt) .orElse(DEFAULT_KEEP_ALIVE_TIME); } /** * 获取配置的最大任务队列的长度 */ public static Integer getMaxWaitQueueSizeFromConfig(Map<String,String> configMap){ return Optional.ofNullable(configMap) .map(config->config.get(MAX_WAIT_QUEUE_SIZE)) .map(Integer::parseInt) .orElse(DEFAULT_MAX_WAIT_QUEUE_SIZE); } /** * 获取配置的拒绝策略 */ public static RejectedExecutionHandler getRejectedHandlerFromConfig(Map<String,String> configMap){ return Optional.ofNullable(configMap) .map(config->config.get(REJECTED_HANDLER)) .map(REJECTED_HANDLER_MAP::get) .orElse(DEFAUALT_REJECTED_HANDLER); } }如果有错误请指出。