SpringBoot异步线程,父子线程数据传递的5种姿势

姿势1:ThreadLocal+TaskDecorator用户工具类
/** *使用ThreadLocal存储共享的数据变量,如登录的用户信息 */public class UserUtils {private staticfinalThreadLocal<String> userLocal=new ThreadLocal<>();public staticString getUserId(){return userLocal.get();}public static void setUserId(String userId){userLocal.set(userId);}public static void clear(){userLocal.remove();}}复制代码自定义CustomTaskDecorator
/** * 线程池修饰类 */public class CustomTaskDecorator implements TaskDecorator {@Overridepublic Runnable decorate(Runnable runnable) {// 获取主线程中的请求信息(我们的用户信息也放在里面)String robotId = UserUtils.getUserId();System.out.println(robotId);return () -> {try {// 将主线程的请求信息,设置到子线程中UserUtils.setUserId(robotId);// 执行子线程,这一步不要忘了runnable.run();} finally {// 线程结束,清空这些信息,否则可能造成内存泄漏UserUtils.clear();}};}}复制代码ExecutorConfig在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor")public Executor asyncServiceExecutor() {log.info("start asyncServiceExecutor----------------");//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//使用可视化运行状态的线程池ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//增加线程池修饰类executor.setTaskDecorator(new CustomTaskDecorator());//增加MDC的线程池修饰类//executor.setTaskDecorator(new MDCTaskDecorator());//执行初始化executor.initialize();log.info("end asyncServiceExecutor------------");return executor;}复制代码AsyncServiceImpl/*** 使用ThreadLocal方式传递* 带有返回值* @throws InterruptedException*/@Async("asyncServiceExecutor")public CompletableFuture<String> executeValueAsync2() throws InterruptedException {log.info("start executeValueAsync");System.out.println("异步线程执行返回结果......+");log.info("end executeValueAsync");return CompletableFuture.completedFuture(UserUtils.getUserId());}复制代码Test2Controller/*** 使用ThreadLocal+TaskDecorator的方式* @return* @throws InterruptedException* @throws ExecutionException*/@GetMApping("/test2")public String test2() throws InterruptedException, ExecutionException {UserUtils.setUserId("123456");CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();String s = completableFuture.get();return s;}复制代码姿势2:RequestContextHolder+TaskDecorator自定义CustomTaskDecorator
/** * 线程池修饰类 */public class CustomTaskDecorator implements TaskDecorator {@Overridepublic Runnable decorate(Runnable runnable) {// 获取主线程中的请求信息(我们的用户信息也放在里面)RequestAttributes attributes = RequestContextHolder.getRequestAttributes();return () -> {try {// 将主线程的请求信息,设置到子线程中RequestContextHolder.setRequestAttributes(attributes);// 执行子线程,这一步不要忘了runnable.run();} finally {// 线程结束,清空这些信息,否则可能造成内存泄漏RequestContextHolder.resetRequestAttributes();}};}}复制代码ExecutorConfig在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());
@Bean(name = "asyncServiceExecutor")public Executor asyncServiceExecutor() {log.info("start asyncServiceExecutor----------------");//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//使用可视化运行状态的线程池ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//增加线程池修饰类executor.setTaskDecorator(new CustomTaskDecorator());//增加MDC的线程池修饰类//executor.setTaskDecorator(new MDCTaskDecorator());//执行初始化executor.initialize();log.info("end asyncServiceExecutor------------");return executor;}复制代码


推荐阅读