1.使用背景
在日常开发的项目中,当访问其他人的接口较慢或者做耗时任务(IO等操作)时,不想程序一直卡在耗时任务上,想程序能够并行执行,我们可以使用多线程来并行的处理任务,也可以使用spring提供的异步处理方式 。
2.创建AsyncTask类,在里面添加3个@Async注解的task:
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.AsyncResult;import org.springframework.stereotype.Component;import java.util.Random;import java.util.concurrent.Future;/** * Created by Jiacheng on 2017/8/28. */@Componentpublic class AsyncTask { private static Random random =new Random(); protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async public FuturedoTaskOne() throws Exception { System.out.println("开始做任务一"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任务一,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("任务一完成"); } @Async public Future doTaskTwo() throws Exception { System.out.println("开始做任务二"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任务二,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("任务二完成"); } @Async public Future doTaskThree() throws Exception { System.out.println("开始做任务三"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任务三,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("任务三完成"); }}
3.进行测试
import ncu.jerry.orangeplus.common.base.Result;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.ResponseBody;import java.util.concurrent.Future;/** * Created by Jiacheng on 2017/8/28. * * 异步操作测试类 */@Controller@RequestMapping("/async")public class AsyncController { @Autowired private AsyncTask task; @RequestMapping(value = "/task", method = RequestMethod.GET) @ResponseBody public Result task() throws Exception { long start = System.currentTimeMillis(); Futuretask1 = task.doTaskOne(); Future task2 = task.doTaskTwo(); Future task3 = task.doTaskThree(); while (true) { if (task1.isDone() && task2.isDone() && task3.isDone()) { // 三个任务都调用完成,退出循环等待 break; } Thread.sleep(1000); } long end = System.currentTimeMillis(); System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒"); return new Result<>(end - start); }}
测试结果
开始做任务二开始做任务一开始做任务三完成任务二,耗时:3355毫秒完成任务一,耗时:3879毫秒完成任务三,耗时:4246毫秒任务全部完成,总耗时:5008毫秒
task执行没有自定义的Executor,使用缺省的TaskExecutor 。
前面是最简单的使用方法。如果想使用自定义的Executor,可以按照如下几步来:
1、新建Executor配置类,加入@EnableAsync注解:
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;import java.util.concurrent.ThreadPoolExecutor;/** * Created by Jiacheng on 2017/8/28. * * Executor配置类 */@Configuration@EnableAsyncpublic class ExecutorConfig { /** Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 10; /** Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 10; @Bean public Executor mySimpleAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("MySimpleExecutor-"); executor.initialize(); return executor; } @Bean public Executor myAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("MyExecutor-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }}
这里定义了两个不同的Executor,第二个重新设置了pool已经达到max size时候的处理方法;同时指定了线程名字的前缀。
2、自定义Executor的使用:
把上面自定义Executor的方法名,加入@Async注解当中。
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.AsyncResult;import org.springframework.stereotype.Component;import java.util.Random;import java.util.concurrent.Future;/** * Created by Jiacheng on 2017/8/28. */@Componentpublic class AsyncTask { private static Random random =new Random(); protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async("mySimpleAsync") public FuturedoTask1() throws InterruptedException{ logger.info("Task1 started."); long start = System.currentTimeMillis(); Thread.sleep(5000); long end = System.currentTimeMillis(); logger.info("Task1 finished, time elapsed: {} ms.", end-start); return new AsyncResult<>("Task1 accomplished by Async-mySimpleAsync!"); } @Async("myAsync") public Future doTask2() throws InterruptedException{ logger.info("Task2 started."); long start = System.currentTimeMillis(); Thread.sleep(3000); long end = System.currentTimeMillis(); logger.info("Task2 finished, time elapsed: {} ms.", end-start); return new AsyncResult<>("Task2 accomplished by Async-myAsync!"); }}
3、测试用例及结果:
/** * Created by Jiacheng on 2017/8/28. * * AsyncTask @Test */public class TestAsync { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private AsyncTask asyncTask; @Test public void AsyncTaskTest() throws Exception { asyncTask = new AsyncTask(); Futuretask1 = asyncTask.doTask1(); Future task2 = asyncTask.doTask2(); while(true) { if(task1.isDone() && task2.isDone()) { logger.info("Task1 result: {}", task1.get()); logger.info("Task2 result: {}", task2.get()); break; } Thread.sleep(1000); } logger.info("All tasks finished."); }}
Connected to the target VM, address: '127.0.0.1:54066', transport: 'socket'[TestNG] Running: C:\Users\Jiacheng\.IntelliJIdea2017.1\system\temp-testng-customsuite.xml16:33:34.926 [main] INFO ncu.jerry.orangeplus.async.AsyncTask - Task1 started.16:33:39.934 [main] INFO ncu.jerry.orangeplus.async.AsyncTask - Task1 finished, time elapsed: 5000 ms.16:33:39.941 [main] INFO ncu.jerry.orangeplus.async.AsyncTask - Task2 started.16:33:42.941 [main] INFO ncu.jerry.orangeplus.async.AsyncTask - Task2 finished, time elapsed: 3000 ms.16:33:42.941 [main] INFO ncu.jerry.orangeplus.async.TestAsync - Task1 result: Task1 accomplished by Async-mySimpleAsync!16:33:42.941 [main] INFO ncu.jerry.orangeplus.async.TestAsync - Task2 result: Task2 accomplished by Async-myAsync!16:33:42.941 [main] INFO ncu.jerry.orangeplus.async.TestAsync - All tasks finished.Disconnected from the target VM, address: '127.0.0.1:54066', transport: 'socket'===============================================Default SuiteTotal tests run: 1, Failures: 0, Skips: 0===============================================
可见,两个task使用了不同的自定义线程池TaskExecutor了。
源代码: