1、配置文件新增每个线程池的基本参数配置(我这里使用了nacos配置)
thread-pool.user-thread.corePoolSize=1 thread-pool.user-thread.maxPoolSize=1 thread-pool.user-thread.keepAliveSeconds=120 thread-pool.user-thread.queueCapacity=1 thread-pool.school-thread.corePoolSize=2 thread-pool.school-thread.maxPoolSize=2 thread-pool.school-thread.keepAliveSeconds=60 thread-pool.school-thread.queueCapacity=2
2、创建线程池类读取配置文件参数
新建一个线程池用到的相关常量或枚举(我这里用的是常量)
/** * 线程池常量 */ public class ThreadPoolConstants { /** * 用户线程前缀 */ public final static String USER_THREAD_PREFIX = "user-thread"; /** * 学校线程前缀 */ public final static String SCHOOL_THREAD_PREFIX = "school-thread"; /** * 线程池bean后缀名 */ public final static String THREA_BEAN_SUFFIX = "-exector-bean"; /** * 运行线程名称后缀 */ public final static String RUNNING_THREAD_SUFFIX = "-pool-task-"; /** * 线程参数配置-核线程数 */ public final static String THREAD_POOL_CORE_SIZE = "corePoolSize"; /** * 线程参数配置-最大线程数 */ public final static String THREAD_POOL_MAX_SIZE = "maxPoolSize"; /** * 线程参数配置-线程存活时长 */ public final static String THREAD_POOL_KEEP_ALIVE = "keepAliveSeconds"; /** * 线程参数配置-队列长度 */ public final static String THREAD_POOL_QUEUE_CAPACITY = "queueCapacity"; }
线程池抽象类
/** * 线程池抽象类 */ @Data public abstract class AbstractExecutorPool { private int corePoolSize; private int maxPoolSize; private int keepAliveSeconds; private int queueCapacity; }
用户线程池
/** * 用户线程池参数类 */ @Component @ConfigurationProperties(prefix = "thread-pool.user-thread") @Data public class UserThreadPool extends AbstractExecutorPool { /** * 线程池前缀名称:user-thread-pool-task- */ private final String threadNamePrefix = ThreadPoolConstants.USER_THREAD_PREFIX + ThreadPoolConstants.RUNNING_THREAD_SUFFIX; }
学校线程池
/** * 学校线程池参数类 */ @Component @ConfigurationProperties(prefix = "thread-pool.school-thread") @Data public class SchoolThreadPool extends AbstractExecutorPool { /** * 线程池前缀名称:school-thread-pool-task- */ private final String threadNamePrefix = ThreadPoolConstants.SCHOOL_THREAD_PREFIX + ThreadPoolConstants.RUNNING_THREAD_SUFFIX; }
3、创建线程池配置类
创建配置类 ThreadPoolConfig,负责创建线程池
ThreadPoolExecutor 类中提供的拒绝策略,也可以自定义策略。
(1)AbortPolicy 默认策略,队列满时抛出异常RejectedExecutionException
(2)DiscardOldestPolicy 去除队列中最早的任务,将新任务放入队列
(3)DiscardPolicy 直接丢掉任务
(4)CallerRunsPolicy 队列满时,主线程执行任务
(5)自定义处理策略
/** * 线程池配置类 */ @Configuration @EnableAsync @Slf4j public class ThreadPoolConfig { @Autowired private UserThreadPool userThreadPool; @Autowired private SchoolThreadPool schoolThreadPool; /** * 创建用户线程池 * beanName: "user-thread-exector-bean" */ @Bean(name = ThreadPoolConstants.USER_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX) public ThreadPoolTaskExecutor userThreadExector() { return initExcutor(userThreadPool, userThreadPool.getThreadNamePrefix(), (r, executor) -> { log.info("userThreadExector队列已满,根据业务自行处理。。。"); }); } /** * 创建学校线程池 * beanName: "school-thread-exector-bean" */ @Bean(name = ThreadPoolConstants.SCHOOL_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX) public ThreadPoolTaskExecutor schoolThreadExector() { return initExcutor(schoolThreadPool, schoolThreadPool.getThreadNamePrefix(),(r, executor) -> { log.info("schoolThreadExector队列已满,根据业务自行处理。。。"); }); } /** * 初始化线程池 * @param abstractExecutorPool * @param threadName * @param rejectedExecutionHandler * @return */ private ThreadPoolTaskExecutor initExcutor(AbstractExecutorPool abstractExecutorPool, String threadName, RejectedExecutionHandler rejectedExecutionHandler){ // 创建线程池并设置参数 ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); threadPool.setCorePoolSize(abstractExecutorPool.getCorePoolSize()); threadPool.setMaxPoolSize(abstractExecutorPool.getMaxPoolSize()); threadPool.setKeepAliveSeconds(abstractExecutorPool.getKeepAliveSeconds()); threadPool.setQueueCapacity(abstractExecutorPool.getQueueCapacity()); threadPool.setThreadNamePrefix(threadName); threadPool.setRejectedExecutionHandler(rejectedExecutionHandler); return threadPool; } }
4、springboot启动类加上@EnableAsync
5、使用线程池
/** * 测试线程池service */ @Service public class TestThreadPoolService { @Autowired @Qualifier(ThreadPoolConstants.USER_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX) private ThreadPoolTaskExecutor userTheadExector; @Autowired @Qualifier(ThreadPoolConstants.SCHOOL_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX) private ThreadPoolTaskExecutor schoolThreadExector; /** * 测试线程 */ public void testUserThread(){ userTheadExector.execute(() ->{ try { // 设置睡眠时间让线程阻塞便于观察 Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + "执行testUserThread业务完毕....................."); } catch (InterruptedException e) { e.printStackTrace(); } }); } /** * 测试线程 */ public void testSchoolThread(){ schoolThreadExector.execute(() ->{ try { // 设置睡眠时间让线程阻塞便于观察 Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + "执行testSchoolThread业务完毕......................."); } catch (InterruptedException e) { e.printStackTrace(); } }); } public String getThreadPoolInfo() { return "user-thread-pool:"+ "corePoolSize:" + userTheadExector.getCorePoolSize() + "maxPoolSize:"+ userTheadExector.getMaxPoolSize() + "keepAliveSeconds:"+userTheadExector.getKeepAliveSeconds()+"\n" + "school-thread-pool:"+ "corePoolSize:" + schoolThreadExector.getCorePoolSize() + "maxPoolSize:"+ schoolThreadExector.getMaxPoolSize() + "keepAliveSeconds:"+schoolThreadExector.getKeepAliveSeconds(); } }
6、接口调用查看线程池执行情况
@RestController @RequestMapping("/thread") public class TestThreadPoolController { Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired TestThreadPoolService testThreadPoolService; @RequestMapping("/test") public String get(){ testThreadPoolService.testUserThread(); testThreadPoolService.testSchoolThread(); return "success"; } }
通过控制台查看打印信息
7、把线程池配置放到nacos,实现动态修改线程池的配置
nacos新建线程池专用的配置文件,注意推荐使用properties类型的配置文件,yaml类型的会读取不全配置信息的问题
8、spingboot项目中指定nacos配置的线程池配置文件
10、新增nacos配置文件修改监听器NacosListener,监听nacos配置的线程池配置文件的变化,监听到配置修改后重新设置到对应的业务线程池参数
/** * nacos配置中心监听器 */ @Component public class NacosListener implements ApplicationRunner { @Autowired NacosConfigManager nacosConfigManager; @Autowired NacosConfigProperties nacosConfigProperties; @Autowired ApplicationContext applicationContext; @Override public void run(ApplicationArguments args) throws Exception { //配置修改监听 nacosConfigManager.getConfigService().addListener("user-service-threadpool-config.properties", nacosConfigProperties.getGroup(), new PropertiesListener(){ @Override public void innerReceive(Properties properties) { System.out.println("nacos修改线程池配置,线程池重新设置!"); // 读取线程配置 Setstrings = properties.stringPropertyNames(); // 获取对应业务线程池 ThreadPoolTaskExecutor threadPoolTaskExecutor; for (String str : strings){ // 获取业务线程池bean对象 threadPoolTaskExecutor = (ThreadPoolTaskExecutor)applicationContext.getBean(str.substring(str.indexOf(".")+1, str.lastIndexOf(".")) + ThreadPoolConstants.THREAD_EXECTOR_BEAN_SUFFIX); // 核心线程数 if (str.contains(ThreadPoolConstants.THREAD_POOL_CORE_SIZE)){ threadPoolTaskExecutor.setCorePoolSize(Integer.parseInt(properties.getProperty(str))); } // 最大线程数 if (str.contains(ThreadPoolConstants.THREAD_POOL_MAX_SIZE)){ threadPoolTaskExecutor.setMaxPoolSize(Integer.parseInt(properties.getProperty(str))); } // 线程存活时长 if (str.contains(ThreadPoolConstants.THREAD_POOL_KEEP_ALIVE)){ threadPoolTaskExecutor.setKeepAliveSeconds(Integer.parseInt(properties.getProperty(str))); } // 队列长度 if (str.contains(ThreadPoolConstants.THREAD_POOL_QUEUE_CAPACITY)){ threadPoolTaskExecutor.setQueueCapacity(Integer.parseInt(properties.getProperty(str))); threadPoolTaskExecutor.initialize(); } } } }); } }
11、测试能否动态配置各业务线程池
我这里的配置user线程池配置最多只能执行4个任务,超过则提示,school线程池最多只能执行3个任务,超过则提示,我在页面调接口创建线程执行分别执行user线程池和school线程池,连续请求4次,预期结果应该是user线程池可以正常执行,school线程池则会提示
去nacos中修改一下线程池的配置,把user线程修改成最多只有2个任务执行,school线程最多只有4个任务执行
连续请求4次,预期结果和执行结果一致,代表动态修改线程池配置已经生效了
还没有评论,来说两句吧...