1 先定义线程方法,实现Callable接口
@Slf4j
public class QueryThread implements Callable<List<QueryResult>> {
private QueryParam param;
private QueryService queryService;
public QueryThread(QueryParam param, QueryService queryService) {
this.param = param;
this.queryService = queryService;
}
@Override
public List<QueryResult> call() throws Exception {
log.info("query thread, pageNo: [{}]", param.getPageNo());
long start = System.currentTimeMillis();
List<QueryResult> result = queryService.query(param);
log.info("thread [{}] spend time: [{}]ms", param.getPageNo(), System.currentTimeMillis() - start);
}
}
需要注意的是,我之前将“queryService.query(param)”查询操作放在构造函数里面执行,发现由于传入的queryService是个单例,会导致线程串行执行。
2 创建线程池,加入线程任务
@Slf4j
public class ThreadTest {
@Resource
private QueryService queryService;
public List<QueryResult> query(QueryParam queryParam, int maxPage) {
List<QueryResult> restults = new ArrayList<>();
// 线程任务
List<Callable<List<QueryResult>>> tasks = new ArrayList<>();
for (int i = 1; i <= maxPage; i++) {
// 一定要创建param的深拷贝对象作为传参,否则传入到线程中的对象是同一个,页码就会以最后的赋值为准,所有线程查询同一页数据
// 这里的BeanCopyUtils.convert()方法是自定义的,不用找了o(^-^)o
QueryParam tmp = BeanCopyUtils.convert(queryParam, QueryParam.class);
tmp.setPageNo(i);
Callable<List<QueryResult>> callable = new QueryThread(tmp, queryService);
tasks.add(callable);
}
// 线程池,指定大小
ExecutorService executorService = Executors.newFixedThreadPool(15);
try {
// 启动所有线程任务
List<Future<List<QueryResult>>> futures = executorService.invokeAll(tasks);
// 合并查询结果
if (!futures.isEmpty()) {
for (Future<List<QueryResult>> f : futures) {
List<QueryResult> result = f.get();
if (!result.isEmpty()) {
restults.addAll(result);
}
}
}
} catch (Exception e) {
log.error("query exception:", e);
} finally {
executorService.shutdown();
}
return restults;
}
}
3 总结
Callable定义线程方法,Future用以接收线程执行结果。
总耗时差不多是线程中耗时最多的那个线程花费的时间。