天天看点

主线程和子线程下的事务不回滚【spring】

今天写了一个业务,用到了spring的多线程组件ThreadPoolTaskExecutor,大体配置为这样的:

<!-- spring线程池-->           
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 线程池维护线程的最少数量 -->
        <property name="corePoolSize" value="5" />
        <!-- 线程池维护线程所允许的空闲时间,默认为60s  -->
        <property name="keepAliveSeconds" value="200" />
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value="20" />
        <!-- 缓存队列最大长度 -->
        <property name="queueCapacity" value="20" />
        <!-- 对拒绝task的处理策略   线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者-->
        <property name="rejectedExecutionHandler">
        <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
        </property>
        <property name="waitForTasksToCompleteOnShutdown" value="true" />
    </bean>           

复制

当时,我的业务是这样的,用户连续抽奖,抽到奖品列表之后给用户返回(这样提高了QPS),然后将用户抽奖的奖品异步刷入数据库内,在刷入数据库的时候,子线程抛异常了,按道理来讲,不会给用户返回刷入失败后的奖品,这样一般情况下能想到的就是整个事务会回滚,然后让用户再抽一次,那么问题就来了?在子线程抛异常了主线程能回滚吗? 答案是不能,因为主线程拿不到子线程抛的异常信息,spring事务管理的是当前线程下的,并且事务的隔离级别默认是 PROPAGATION_REQUIRED--支持当前事务,假设当前没有事务。就新建一个事务,这涉及到ThreadLocal以及线程私有栈的概念,如果Spring 事务使用InhertableThreadLocal就可以把连接传到子线程,但是为什么Spring不那么干呢?因为这样毫无意义,如果把同一个连接传到子线程,那就是SQL操作会串行执行,那何必还多线程呢,很显然,在另外一个线程下自然会创建一个新的事物,而不是进行事务传播,所以不能够回滚业务

这个时候,我想到了这个类Callable/Future,之前无意中有了解过它的特性,也是作为异步线程调用自己的业务的,特点就是它可以拿到子线程的返回信息

public <T> Future<T> submit(Callable<T> task) {
		ExecutorService executor = getThreadPoolExecutor();
		try {
			return executor.submit(task);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}           

复制

所以,使用了这个,它是一种阻塞式的线程,当call的 具体用法如下:

@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
	ThreadPoolExecutor executor = new ThreadPoolExecutor(
			10,100,10, TimeUnit.SECONDS,
			new LinkedBlockingDeque<>()
	);

	Future future = executor.submit(new Callable() {
		@Override
		public Object call() throws Exception {
			return 1/0;
		}
	});
	System.out.println(future.get());
}

//Console.log

java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)           

复制

因此这种写法可以得到子线程的返回值,在并发编程里,我们要对读写有更清晰的认识。否则,在并发业务中你可能会乱了手脚,导致系统的数据达不到你的目标值。