Spring非反應式事務實作原理
Spring基于注解和AOP的聲明式事務(@Transactional)已經是業務開發的常用工具,預設是采用同步的方式基于ThreadLocal(儲存連接配接資訊和會話資訊等)實作,在具體資料庫操作時就使用同一個資料庫連接配接,并手動送出事務,保證資料正确性。
基于反應式的Spring事務有何不同
Spring的反應式實作是基于Reactor架構,該架構對異步程式設計做了高度的抽象化,主動的線程切換隻能通過publishOn/subscribeOn跟換線程池,導緻在同步場景表現出色的ThreadLocal無法滿足全異步化的事務資訊存儲需求。Reactor 3提供了一種叫做Context的資料結構,用來替代Threadlocal。
Context的傳播機制
整體上Context類非常類似一個不可變的Map<Object, Object>,采用CopyOnWrite政策,綁定在每一個訂閱者上。但是,context傳播具體是怎麼實作的呢?有一個簡單的例子:
Flux.just(1, 2, 3)
.flatMap(x -> Mono.subscriberContext()
.map(context -> String.format("%s%d", context.get("msg"), x))
).subscriberContext(context -> context.put("msg", "no."))
.subscribe(System.out::println);
從代碼可以看到是通過subscriberContext方法直接put資料,但是這個Context對象是什麼時候建立的呢?檢視subscriberContext方法的源碼發現方法會建立一個FluxContextStart對象,該對象是InternalFluxOperator的子類,實作了subscribeOrReturn(被訂閱時調用),在其中将上下文的操作應用到訂閱者已有的上下文,而大多數訂閱者初始上下文都是Context.empty()。
final class FluxContextStart<T> extends InternalFluxOperator<T, T> implements Fuseable {
....
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
Context c = doOnContext.apply(actual.currentContext());
return new ContextStartSubscriber<>(actual, c);
}
....
}
從FluxContextStart的實作可見,Context是由CoreSubscriber的執行個體所持有,是以Context的傳播實際是訂閱者執行個體的傳播。而為了避免因為不同操作導緻的并發問題,對訂閱者的操作都是采用裝飾者模式包裝一個新的執行個體,類似Spark RDD的形式。
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
...
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
基于Flux的subscribe方法可見,每次訂閱時都會向上遊釋出者傳遞訂閱者執行個體,是以Context是自底向上傳播。
Spring R2DBC的事務實作
基于對正常聲明式事務的認識,找到TransactionAspectSupport#invokeWithinTransaction方法,這個方法定義了聲明的事務具體要路由到哪個事務管理器執行。自Spring 5.2 M2之後,Spring開始支援反應式事務,在invokeWithinTransaction方法内可以看到如下代碼:
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
// 從緩存中擷取已經加載的反應式事務管理器
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
// 根據傳回值類型擷取擴充卡
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
// 執行事務
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
再繼續跟進到反應式的txSupport.invokeWithinTransaction裡面,會将相關的事務管理器和事務設定等資訊放入上文說到的Context中。具體的對于R2DBC的資料庫反應式事務而言,其主要調用的是TransactionalOperatorImpl#transactional方法:
@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.subscriberContext(TransactionContextManager.getOrCreateContext())
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}
從代碼可以看見,這裡也是首先從上下文擷取事務資訊,保證整個反應式處理的各個操作符都會用到同樣的資料庫連接配接,并最終實作聲明式事務功能。
總結
Spring實作反應式事務本質上基于Reactor的Context傳播機制,結合原有事務機制改造出來的,是以總結下來就兩個核心點:1、Reactor的Context是一種類似不可變Map,綁定在每個訂閱者自底向上傳播;2、Spring反應式事務通過Reactor的Context在不同線程池共享。