今天写了一个业务,用到了spring的多线程组件ThreadPoolTaskExecutor,大体配置为这样的:
<!-- spring线程池-->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="5" />
<!-- 线程池维护线程所允许的空闲时间,默认为60s -->
<property name="keepAliveSeconds" value="200" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="20" />
<!-- 缓存队列最大长度 -->
<property name="queueCapacity" value="20" />
<!-- 对拒绝task的处理策略 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者-->
<property name="rejectedExecutionHandler">
<!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
<!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
<!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
<property name="waitForTasksToCompleteOnShutdown" value="true" />
</bean>
复制
当时,我的业务是这样的,用户连续抽奖,抽到奖品列表之后给用户返回(这样提高了QPS),然后将用户抽奖的奖品异步刷入数据库内,在刷入数据库的时候,子线程抛异常了,按道理来讲,不会给用户返回刷入失败后的奖品,这样一般情况下能想到的就是整个事务会回滚,然后让用户再抽一次,那么问题就来了?在子线程抛异常了主线程能回滚吗? 答案是不能,因为主线程拿不到子线程抛的异常信息,spring事务管理的是当前线程下的,并且事务的隔离级别默认是 PROPAGATION_REQUIRED--支持当前事务,假设当前没有事务。就新建一个事务,这涉及到ThreadLocal以及线程私有栈的概念,如果Spring 事务使用InhertableThreadLocal就可以把连接传到子线程,但是为什么Spring不那么干呢?因为这样毫无意义,如果把同一个连接传到子线程,那就是SQL操作会串行执行,那何必还多线程呢,很显然,在另外一个线程下自然会创建一个新的事物,而不是进行事务传播,所以不能够回滚业务
这个时候,我想到了这个类Callable/Future,之前无意中有了解过它的特性,也是作为异步线程调用自己的业务的,特点就是它可以拿到子线程的返回信息
public <T> Future<T> submit(Callable<T> task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
复制
所以,使用了这个,它是一种阻塞式的线程,当call的 具体用法如下:
@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,100,10, TimeUnit.SECONDS,
new LinkedBlockingDeque<>()
);
Future future = executor.submit(new Callable() {
@Override
public Object call() throws Exception {
return 1/0;
}
});
System.out.println(future.get());
}
//Console.log
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
复制
因此这种写法可以得到子线程的返回值,在并发编程里,我们要对读写有更清晰的认识。否则,在并发业务中你可能会乱了手脚,导致系统的数据达不到你的目标值。