天天看點

了解Spring R2DBC的聲明式事務實作機制Spring非反應式事務實作原理基于反應式的Spring事務有何不同Context的傳播機制總結參考資料

Spring非反應式事務實作原理

Spring基于注解和AOP的聲明式事務(@Transactional)已經是業務開發的常用工具,預設是采用同步的方式基于ThreadLocal(儲存連接配接資訊和會話資訊等)實作,在具體資料庫操作時就使用同一個資料庫連接配接,并手動送出事務,保證資料正确性。

基于反應式的Spring事務有何不同

Spring的反應式實作是基于Reactor架構,該架構對異步程式設計做了高度的抽象化,主動的線程切換隻能通過publishOn/subscribeOn跟換線程池,導緻在同步場景表現出色的ThreadLocal無法滿足全異步化的事務資訊存儲需求。Reactor 3提供了一種叫做Context的資料結構,用來替代Threadlocal。

Context的傳播機制

整體上Context類非常類似一個不可變的Map<Object, Object>,采用CopyOnWrite政策,綁定在每一個訂閱者上。但是,context傳播具體是怎麼實作的呢?有一個簡單的例子:

Flux.just(1, 2, 3)
    .flatMap(x -> Mono.subscriberContext()
        .map(context -> String.format("%s%d", context.get("msg"), x))
    ).subscriberContext(context -> context.put("msg", "no."))
    .subscribe(System.out::println);           

從代碼可以看到是通過subscriberContext方法直接put資料,但是這個Context對象是什麼時候建立的呢?檢視subscriberContext方法的源碼發現方法會建立一個FluxContextStart對象,該對象是InternalFluxOperator的子類,實作了subscribeOrReturn(被訂閱時調用),在其中将上下文的操作應用到訂閱者已有的上下文,而大多數訂閱者初始上下文都是Context.empty()。

final class FluxContextStart<T> extends InternalFluxOperator<T, T> implements Fuseable {
....
    @Override
    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
        Context c = doOnContext.apply(actual.currentContext());
        return new ContextStartSubscriber<>(actual, c);
    }
....
}           

從FluxContextStart的實作可見,Context是由CoreSubscriber的執行個體所持有,是以Context的傳播實際是訂閱者執行個體的傳播。而為了避免因為不同操作導緻的并發問題,對訂閱者的操作都是采用裝飾者模式包裝一個新的執行個體,類似Spark RDD的形式。

public final void subscribe(Subscriber<? super T> actual) {
    CorePublisher publisher = Operators.onLastAssembly(this);
    CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

    try {
        ...
        publisher.subscribe(subscriber);
    }
    catch (Throwable e) {
        Operators.reportThrowInSubscribe(subscriber, e);
        return;
    }
}           

基于Flux的subscribe方法可見,每次訂閱時都會向上遊釋出者傳遞訂閱者執行個體,是以Context是自底向上傳播。

Spring R2DBC的事務實作

基于對正常聲明式事務的認識,找到TransactionAspectSupport#invokeWithinTransaction方法,這個方法定義了聲明的事務具體要路由到哪個事務管理器執行。自Spring 5.2 M2之後,Spring開始支援反應式事務,在invokeWithinTransaction方法内可以看到如下代碼:

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
    // 從緩存中擷取已經加載的反應式事務管理器
    ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                    "Unsupported annotated transaction on suspending function detected: " + method +
                    ". Use TransactionalOperator.transactional extensions instead.");
        }
        // 根據傳回值類型擷取擴充卡
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                    method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
    });
    // 執行事務
    return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}           

再繼續跟進到反應式的txSupport.invokeWithinTransaction裡面,會将相關的事務管理器和事務設定等資訊放入上文說到的Context中。具體的對于R2DBC的資料庫反應式事務而言,其主要調用的是TransactionalOperatorImpl#transactional方法:

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
    return TransactionContextManager.currentContext().flatMap(context -> {
        Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // Need re-wrapping of ReactiveTransaction until we get hold of the exception
        // through usingWhen.
        return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
                this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
                .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
    })
    .subscriberContext(TransactionContextManager.getOrCreateContext())
    .subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}               

從代碼可以看見,這裡也是首先從上下文擷取事務資訊,保證整個反應式處理的各個操作符都會用到同樣的資料庫連接配接,并最終實作聲明式事務功能。

總結

Spring實作反應式事務本質上基于Reactor的Context傳播機制,結合原有事務機制改造出來的,是以總結下來就兩個核心點:1、Reactor的Context是一種類似不可變Map,綁定在每個訂閱者自底向上傳播;2、Spring反應式事務通過Reactor的Context在不同線程池共享。

參考資料