Springboot实现多个线程池动态配置

Springboot实现多个线程池动态配置

码农世界 2024-05-22 后端 58 次浏览 0个评论

1、配置文件新增每个线程池的基本参数配置(我这里使用了nacos配置)

thread-pool.user-thread.corePoolSize=1
thread-pool.user-thread.maxPoolSize=1
thread-pool.user-thread.keepAliveSeconds=120
thread-pool.user-thread.queueCapacity=1
thread-pool.school-thread.corePoolSize=2
thread-pool.school-thread.maxPoolSize=2
thread-pool.school-thread.keepAliveSeconds=60
thread-pool.school-thread.queueCapacity=2

2、创建线程池类读取配置文件参数

新建一个线程池用到的相关常量或枚举(我这里用的是常量)

/**
 * 线程池常量
 */
public class ThreadPoolConstants {
    /**
     * 用户线程前缀
     */
    public final static String USER_THREAD_PREFIX = "user-thread";
    /**
     * 学校线程前缀
     */
    public final static String SCHOOL_THREAD_PREFIX = "school-thread";
    /**
     * 线程池bean后缀名
     */
    public final static String THREA_BEAN_SUFFIX = "-exector-bean";
    /**
     * 运行线程名称后缀
     */
    public final static String RUNNING_THREAD_SUFFIX = "-pool-task-";
    /**
     * 线程参数配置-核线程数
     */
    public final static String THREAD_POOL_CORE_SIZE = "corePoolSize";
    /**
     * 线程参数配置-最大线程数
     */
    public final static String THREAD_POOL_MAX_SIZE = "maxPoolSize";
    /**
     * 线程参数配置-线程存活时长
     */
    public final static String THREAD_POOL_KEEP_ALIVE = "keepAliveSeconds";
    /**
     * 线程参数配置-队列长度
     */
    public final static String THREAD_POOL_QUEUE_CAPACITY = "queueCapacity";
}

线程池抽象类

/**
 * 线程池抽象类
 */
@Data
public abstract class AbstractExecutorPool {
    private int corePoolSize;
    private int maxPoolSize;
    private int keepAliveSeconds;
    private int queueCapacity;
}

用户线程池

/**
 * 用户线程池参数类
 */
@Component
@ConfigurationProperties(prefix = "thread-pool.user-thread")
@Data
public class UserThreadPool extends AbstractExecutorPool {
    /**
     * 线程池前缀名称:user-thread-pool-task-
     */
    private final String threadNamePrefix = ThreadPoolConstants.USER_THREAD_PREFIX + ThreadPoolConstants.RUNNING_THREAD_SUFFIX;
}

学校线程池

/**
 * 学校线程池参数类
 */
@Component
@ConfigurationProperties(prefix = "thread-pool.school-thread")
@Data
public class SchoolThreadPool extends AbstractExecutorPool {
    /**
     * 线程池前缀名称:school-thread-pool-task-
     */
    private final String threadNamePrefix = ThreadPoolConstants.SCHOOL_THREAD_PREFIX + ThreadPoolConstants.RUNNING_THREAD_SUFFIX;
}

3、创建线程池配置类

创建配置类 ThreadPoolConfig,负责创建线程池

ThreadPoolExecutor 类中提供的拒绝策略,也可以自定义策略。

(1)AbortPolicy 默认策略,队列满时抛出异常RejectedExecutionException

(2)DiscardOldestPolicy 去除队列中最早的任务,将新任务放入队列

(3)DiscardPolicy 直接丢掉任务

(4)CallerRunsPolicy 队列满时,主线程执行任务

(5)自定义处理策略

/**
 * 线程池配置类
 */
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
    @Autowired
    private UserThreadPool userThreadPool;
    @Autowired
    private SchoolThreadPool schoolThreadPool;
    /**
     * 创建用户线程池
     * beanName: "user-thread-exector-bean"
     */
    @Bean(name = ThreadPoolConstants.USER_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX)
    public ThreadPoolTaskExecutor userThreadExector() {
        return initExcutor(userThreadPool, userThreadPool.getThreadNamePrefix(), (r, executor) -> {
            log.info("userThreadExector队列已满,根据业务自行处理。。。");
        });
    }
    /**
     * 创建学校线程池
     * beanName: "school-thread-exector-bean"
     */
    @Bean(name = ThreadPoolConstants.SCHOOL_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX)
    public ThreadPoolTaskExecutor schoolThreadExector() {
        return initExcutor(schoolThreadPool, schoolThreadPool.getThreadNamePrefix(),(r, executor) -> {
            log.info("schoolThreadExector队列已满,根据业务自行处理。。。");
        });
    }
    /**
     * 初始化线程池
     * @param abstractExecutorPool
     * @param threadName
     * @param rejectedExecutionHandler
     * @return
     */
    private ThreadPoolTaskExecutor initExcutor(AbstractExecutorPool abstractExecutorPool, String threadName, RejectedExecutionHandler rejectedExecutionHandler){
        // 创建线程池并设置参数
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(abstractExecutorPool.getCorePoolSize());
        threadPool.setMaxPoolSize(abstractExecutorPool.getMaxPoolSize());
        threadPool.setKeepAliveSeconds(abstractExecutorPool.getKeepAliveSeconds());
        threadPool.setQueueCapacity(abstractExecutorPool.getQueueCapacity());
        threadPool.setThreadNamePrefix(threadName);
        threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
        return threadPool;
    }
}

4、springboot启动类加上@EnableAsync

5、使用线程池

/**
 * 测试线程池service
 */
@Service
public class TestThreadPoolService {
    @Autowired
    @Qualifier(ThreadPoolConstants.USER_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX)
    private ThreadPoolTaskExecutor userTheadExector;
    @Autowired
    @Qualifier(ThreadPoolConstants.SCHOOL_THREAD_PREFIX + ThreadPoolConstants.THREA_BEAN_SUFFIX)
    private ThreadPoolTaskExecutor schoolThreadExector;
    /**
     * 测试线程
     */
    public void testUserThread(){
        userTheadExector.execute(() ->{
            try {
              	// 设置睡眠时间让线程阻塞便于观察
                Thread.sleep(5000);
                System.out.println(Thread.currentThread().getName() + "执行testUserThread业务完毕.....................");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    /**
     * 测试线程
     */
    public void testSchoolThread(){
        schoolThreadExector.execute(() ->{
            try {
              	// 设置睡眠时间让线程阻塞便于观察
                Thread.sleep(3000);
                System.out.println(Thread.currentThread().getName() + "执行testSchoolThread业务完毕.......................");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    public String getThreadPoolInfo()
    {
        return "user-thread-pool:"+ "corePoolSize:" + userTheadExector.getCorePoolSize() + "maxPoolSize:"+ userTheadExector.getMaxPoolSize() + "keepAliveSeconds:"+userTheadExector.getKeepAliveSeconds()+"\n" +
               "school-thread-pool:"+ "corePoolSize:" + schoolThreadExector.getCorePoolSize() + "maxPoolSize:"+ schoolThreadExector.getMaxPoolSize() + "keepAliveSeconds:"+schoolThreadExector.getKeepAliveSeconds();
    }
}

6、接口调用查看线程池执行情况

@RestController
@RequestMapping("/thread")
public class TestThreadPoolController {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    TestThreadPoolService testThreadPoolService;
    @RequestMapping("/test")
    public String get(){
        testThreadPoolService.testUserThread();
        testThreadPoolService.testSchoolThread();
        return "success";
    }
}

通过控制台查看打印信息

Springboot实现多个线程池动态配置

7、把线程池配置放到nacos,实现动态修改线程池的配置

nacos新建线程池专用的配置文件,注意推荐使用properties类型的配置文件,yaml类型的会读取不全配置信息的问题

Springboot实现多个线程池动态配置

8、spingboot项目中指定nacos配置的线程池配置文件

Springboot实现多个线程池动态配置

10、新增nacos配置文件修改监听器NacosListener,监听nacos配置的线程池配置文件的变化,监听到配置修改后重新设置到对应的业务线程池参数

/**
 * nacos配置中心监听器
 */
@Component
public class NacosListener implements ApplicationRunner {
    @Autowired
    NacosConfigManager nacosConfigManager;
    @Autowired
    NacosConfigProperties nacosConfigProperties;
    @Autowired
    ApplicationContext applicationContext;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        //配置修改监听
        nacosConfigManager.getConfigService().addListener("user-service-threadpool-config.properties", nacosConfigProperties.getGroup(),
            new PropertiesListener(){
                @Override
                public void innerReceive(Properties properties) {
                    System.out.println("nacos修改线程池配置,线程池重新设置!");
                    // 读取线程配置
                    Set strings = properties.stringPropertyNames();
                    // 获取对应业务线程池
                    ThreadPoolTaskExecutor threadPoolTaskExecutor;
                    for (String str : strings){
                        // 获取业务线程池bean对象
                        threadPoolTaskExecutor = (ThreadPoolTaskExecutor)applicationContext.getBean(str.substring(str.indexOf(".")+1, str.lastIndexOf(".")) + ThreadPoolConstants.THREAD_EXECTOR_BEAN_SUFFIX);
                        // 核心线程数
                        if (str.contains(ThreadPoolConstants.THREAD_POOL_CORE_SIZE)){
                            threadPoolTaskExecutor.setCorePoolSize(Integer.parseInt(properties.getProperty(str)));
                        }
                        // 最大线程数
                        if (str.contains(ThreadPoolConstants.THREAD_POOL_MAX_SIZE)){
                            threadPoolTaskExecutor.setMaxPoolSize(Integer.parseInt(properties.getProperty(str)));
                        }
                        // 线程存活时长
                        if (str.contains(ThreadPoolConstants.THREAD_POOL_KEEP_ALIVE)){
                            threadPoolTaskExecutor.setKeepAliveSeconds(Integer.parseInt(properties.getProperty(str)));
                        }
                        // 队列长度
                        if (str.contains(ThreadPoolConstants.THREAD_POOL_QUEUE_CAPACITY)){
                            threadPoolTaskExecutor.setQueueCapacity(Integer.parseInt(properties.getProperty(str)));
                            threadPoolTaskExecutor.initialize();
                        }
                    }
                }
        });
    }
}

11、测试能否动态配置各业务线程池

Springboot实现多个线程池动态配置

我这里的配置user线程池配置最多只能执行4个任务,超过则提示,school线程池最多只能执行3个任务,超过则提示,我在页面调接口创建线程执行分别执行user线程池和school线程池,连续请求4次,预期结果应该是user线程池可以正常执行,school线程池则会提示

Springboot实现多个线程池动态配置

去nacos中修改一下线程池的配置,把user线程修改成最多只有2个任务执行,school线程最多只有4个任务执行

Springboot实现多个线程池动态配置

连续请求4次,预期结果和执行结果一致,代表动态修改线程池配置已经生效了

Springboot实现多个线程池动态配置

转载请注明来自码农世界,本文标题:《Springboot实现多个线程池动态配置》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,58人围观)参与讨论

还没有评论,来说两句吧...

Top