天天看点

【Java】Callable + Future实现多线程查询

1 先定义线程方法,实现Callable接口

@Slf4j
public class QueryThread implements Callable<List<QueryResult>> {

    private QueryParam param;

    private QueryService queryService;

    public QueryThread(QueryParam param, QueryService queryService) {
        this.param = param;
        this.queryService = queryService;
    }

    @Override
    public List<QueryResult> call() throws Exception {
    	log.info("query thread, pageNo: [{}]", param.getPageNo());
        long start = System.currentTimeMillis();
        List<QueryResult> result = queryService.query(param);
        log.info("thread [{}] spend time: [{}]ms", param.getPageNo(), System.currentTimeMillis() - start);
    }
}
           

需要注意的是,我之前将“queryService.query(param)”查询操作放在构造函数里面执行,发现由于传入的queryService是个单例,会导致线程串行执行。

2 创建线程池,加入线程任务

@Slf4j
public class ThreadTest {

    @Resource
    private QueryService queryService;

    public List<QueryResult> query(QueryParam queryParam, int maxPage) {

        List<QueryResult> restults = new ArrayList<>();

        // 线程任务
        List<Callable<List<QueryResult>>> tasks = new ArrayList<>();
        for (int i = 1; i <= maxPage; i++) {
            // 一定要创建param的深拷贝对象作为传参,否则传入到线程中的对象是同一个,页码就会以最后的赋值为准,所有线程查询同一页数据
            // 这里的BeanCopyUtils.convert()方法是自定义的,不用找了o(^-^)o
            QueryParam tmp = BeanCopyUtils.convert(queryParam, QueryParam.class);
            tmp.setPageNo(i);
            Callable<List<QueryResult>> callable = new QueryThread(tmp, queryService);
            tasks.add(callable);
        }

		// 线程池,指定大小
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        try {
            // 启动所有线程任务
            List<Future<List<QueryResult>>> futures = executorService.invokeAll(tasks);
			// 合并查询结果
            if (!futures.isEmpty()) {
                for (Future<List<QueryResult>> f : futures) {
                    List<QueryResult> result = f.get();
                    if (!result.isEmpty()) {
                        restults.addAll(result);
                    }
                }
            }
        } catch (Exception e) {
            log.error("query exception:", e);
        } finally {
            executorService.shutdown();
        }

        return restults;
    }
}
           

3 总结

Callable定义线程方法,Future用以接收线程执行结果。

总耗时差不多是线程中耗时最多的那个线程花费的时间。

继续阅读