天天看點

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

作者:波哥帶你學Java

一、環境準備

1.源碼下載下傳

官方位址:https://seata.io/zh-cn/blog/download.html

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

image20220217165653477.png

通過idea打開seata-1.4.2版本的源碼

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

image20220217165735950.png

2.回顧AT模式

  其實在之前的應用課程中,我們已經用過AT模式,同時也寫過一個小的Demo,那麼這裡其實我們主要要分析的是AT模式官方文檔中的一些内容

  官方文檔:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

2.1寫隔離

  • 一階段本地事務送出前,需要確定先拿到「全局鎖」 。
  • 拿不到「全局鎖」 ,不能送出本地事務。
  • 拿「全局鎖」 的嘗試被限制在一定範圍内,超出範圍将放棄,并復原本地事務,釋放本地鎖。

圖解:

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

image20220221173230146.png

  如果 tx1 的二階段全局復原,則 tx1 需要重新擷取該資料的本地鎖,進行反向補償的更新操作,實作分支的復原。

  此時,如果 tx2 仍在等待該資料的 「全局鎖」,同時持有本地鎖,則 tx1 的分支復原會失敗。分支的復原會一直重試,直到 tx2 的 「全局鎖」 等鎖逾時,放棄 「全局鎖」 并復原本地事務釋放本地鎖,tx1 的分支復原最終成功。

  因為整個過程 「全局鎖」 在 tx1 結束前一直是被 tx1 持有的,是以不會發生 「髒寫」 的問題。

2.2 讀隔離

  在資料庫本地事務隔離級别 「讀已送出(Read Committed)」 或以上的基礎上,Seata(AT 模式)的預設全局隔離級别是 「讀未送出(Read Uncommitted)」 。

  如果應用在特定場景下,必須要求全局的 「讀已送出」 ,目前 Seata 的方式是通過 SELECT FOR UPDATE 語句的代理。

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

image20220221175333911.png

  SELECT FOR UPDATE 語句的執行會申請 「全局鎖」 ,如果 「全局鎖」 被其他事務持有,則釋放本地鎖(復原 SELECT FOR UPDATE 語句的本地執行)并重試。這個過程中,查詢是被 block 住的,直到 「全局鎖」 拿到,即讀取的相關資料是 「已送出」 的,才傳回。

  出于總體性能上的考慮,Seata 目前的方案并沒有對所有 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。

2.3 AT二階段

一階段:

  1. 解析 SQL:得到 SQL 的類型(UPDATE),表(product),條件(where name = 'TXC')等相關的資訊。
  2. 查詢前鏡像(改變之前的資料):根據解析得到的條件資訊,生成查詢語句,定位資料。
  3. 執行業務 SQL:更新這條資料。
  4. 查詢後鏡像(改變後的資料):根據前鏡像的結果,通過「主鍵」 定位資料。
  5. 插入復原日志:把前後鏡像資料以及業務 SQL 相關的資訊組成一條復原日志記錄,插入到UNDO_LOG 表中。
  6. 送出前,向 TC 注冊分支:申請「全局鎖」 。
  7. 本地事務送出:業務資料的更新和前面步驟中生成的 UNDO LOG 一并送出。
  8. 将本地事務送出的結果上報給 TC。

二階段-復原:

  1. 收到 TC 的分支復原請求,開啟一個本地事務,執行如下操作。
  2. 通過 XID 和 Branch ID 查找到相應的 UNDO LOG 記錄。
  3. 根據 UNDO LOG 中的前鏡像和業務 SQL 的相關資訊生成并執行復原的語句:
  4. 送出本地事務。并把本地事務的執行結果(即分支事務復原的結果)上報給 TC。

二階段-送出:

  1. 收到 TC 的分支送出請求,把請求放入一個異步任務的隊列中,馬上傳回送出成功的結果給 TC。
  2. 異步任務階段的分支送出請求将異步和批量地删除相應 UNDO LOG 記錄。

二、Seata源碼剖析

1. Seata用戶端啟動

首先一個Seata的用戶端啟動一般分為幾個流程:

  1. 自動加載各種Bean及配置資訊
  2. 初始化TM
  3. 初始化RM(具體服務)
  4. 初始化分布式事務用戶端完成,代理資料源
  5. 連接配接TC(Seata服務端),注冊RM,注冊TM
  6. 開啟全局事務

  在這個其中,就會涉及到幾個核心的類型,首先我們需要來看配置類型GlobalTransactionAutoConfiguration

  是以我們直接通過官方案例引入的Seata包,找到SpringBoot項目在啟動的時候自動掃描加載類型的spring.factories,然後找到GlobalTransactionAutoConfiguration(Seata自動配置類)

2.全局事務掃描類源碼

  這個類型的核心點,就是加載配置,注入相關的Bean

/**
 * seata自動配置類
 */
@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {

 private final ApplicationContext applicationContext;

 private final SeataProperties seataProperties;

 public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext,
   SeataProperties seataProperties) {
  this.applicationContext = applicationContext;
  this.seataProperties = seataProperties;
 }
 // 注入全局事務掃描器
 @Bean
 public GlobalTransactionScanner globalTransactionScanner() {

  String applicationName = applicationContext.getEnvironment()
    .getProperty("spring.application.name");

  String txServiceGroup = seataProperties.getTxServiceGroup();

  if (StringUtils.isEmpty(txServiceGroup)) {
   txServiceGroup = applicationName + "-fescar-service-group";
   seataProperties.setTxServiceGroup(txServiceGroup);
  }
  // 建構全局掃描器,傳入參數:應用名、事務分組名,失敗處理器
  return new GlobalTransactionScanner(applicationName, txServiceGroup);
 }
}

           

3.GlobalTransactionScanner全局事務掃描器

  在這其中我們要關心的是GlobalTransactionScanner這個類型,這個類型掃描@GlobalTransactional注解,并對代理方法進行攔截增強事務的功能。

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

image20220221231318290.png

  這裡給大家展示了目前GlobalTransactionScanner的類關系圖,其中我們現在繼承了Aop的AbstractAutoProxyCreator類型,在這其中有一個重點方法,其實這個方法就是判斷Bean對象是否需要代理,是否需要增強

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    if (beanName != null && this.targetSourcedBeans.contains(beanName)) {
        return bean;
    }
    if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
        return bean;
    }
    if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
        this.advisedBeans.put(cacheKey, Boolean.FALSE);
        return bean;
    }

    // Create proxy if we have advice.
    Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    if (specificInterceptors != DO_NOT_PROXY) {
        this.advisedBeans.put(cacheKey, Boolean.TRUE);
        Object proxy = createProxy(
            bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
        this.proxyTypes.put(cacheKey, proxy.getClass());
        return proxy;
    }

    this.advisedBeans.put(cacheKey, Boolean.FALSE);
    return bean;
}
           

  當然這是父類提供的方法,那子類繼承之後重寫此方法,完成了定制化的效果

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        // 加鎖防止并發
        synchronized (PROXYED_SET) {
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            interceptor = null;
            //check TCC proxy
            // 檢查是否是TCC模式
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                // 如果是:添加TCC攔截器
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                                     (ConfigurationChangeListener)interceptor);
            } else {
                // 不是TCC模式
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // 判斷是否有相關事務注解,如果沒有就不代理
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                // 當發現存在全局事務注解标注的Bean,添加攔截器
                if (globalTransactionalInterceptor == null) {
                    // 添加攔截器
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationCache.addConfigListener(
                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 檢查是否是代理對象
            if (!AopUtils.isAopProxy(bean)) {
                // 不是調用Spring代理(父級)
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 已經是代理對象,反射擷取代理類中的已經存在的攔截器組合,然後添加到該集合當中
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }
            // 将Bean添加到Set中
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}
           

  圖解位址:https://www.processon.com/view/link/6213d58f1e0853078013c58f

三、Seata源碼分析-2PC核心源碼解讀

1.2PC送出源碼流程

  上節課我們分析到了GlobalTransactionalInterceptor全局事務攔截器,一旦執行攔截器,我們就會進入到其中的invoke方法,在這其中會做一些@GlobalTransactional注解的判斷,如果有注解以後,會執行全局事務和本地事務,那麼在執行全局事務的時候會調用handleGlobalTransaction全局事務處理器,這裡主要是擷取事務資訊

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                               final GlobalTransactional globalTrxAnno) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

            // 擷取事務名稱,預設擷取方法名
            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod());
            }

            /**
                 * 解析GlobalTransactional注解屬性,封裝為對象
                 * @return
                 */
            @Override
            public TransactionInfo getTransactionInfo() {
                // reset the value of timeout
                // 擷取逾時時間,預設60秒
                int timeout = globalTrxAnno.timeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                    timeout = defaultGlobalTransactionTimeout;
                }

                // 建構事務資訊對象
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);// 逾時時間
                transactionInfo.setName(name()); // 事務名稱
                transactionInfo.setPropagation(globalTrxAnno.propagation());// 事務傳播
                transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());// 校驗或占用全局鎖重試間隔
                transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());// 校驗或占用全局鎖重試次數
                Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                // 其他建構資訊
                for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        // 執行異常
        TransactionalExecutor.Code code = e.getCode();
        switch (code) {
            case RollbackDone:
                throw e.getOriginalException();
            case BeginFailure:
                succeed = false;
                failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case CommitFailure:
                succeed = false;
                failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackFailure:
                failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            case RollbackRetrying:
                failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            default:
                throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
        }
    } finally {
        if (degradeCheck) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}
           

  在這其中,我們要關注一個重點方法execute()

其實這個方法主要的作用就是,執行事務的流程,大概一下幾點:

  1. 擷取事務資訊
  2. 開始執行全局事務
  3. 發生異常全局復原,各個資料通過undo_log表進行事務補償
  4. 全局事務送出
  5. 清除所有資源

  這個位置是非常核心的一個位置,因為我們所有的業務進來以後都會走這個位置。

  這其中的第三步和第四步就是在想TC(Seata-Server)發起全局事務的送出/復原

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    // 擷取事務資訊
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 擷取目前事務,主要擷取Xid
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 根據配置的不同僚務傳播行為,執行不同的邏輯
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                      , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        // 目前沒有事務,則建立一個新的事務
        if (tx == null) {
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 開始執行全局事務
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 執行目前業務邏輯:
                // 1. 在TC注冊目前分支事務,TC會在branch_table中插入一條分支事務資料
                // 2. 執行本地update語句,并在執行前後查詢資料狀态,并把資料前後鏡像存入到undo_log表中
                // 3. 遠端調用其他應用,遠端應用接收到xid,也會注冊分支事務,寫入branch_table及本地undo_log表
                // 4. 會在lock_table表中插入全局鎖資料(一個分支一條)
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 發生異常全局復原,各個資料通過undo_log表進行事務補償
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 全局送出事務
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            // 清除所有資源
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}
           

2.如何發起全局事務

  這個位置我們就看目前這個代碼中的 beginTransaction(txInfo, tx);方法

// 想TC發起請求,這裡采用了模闆模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeBegin();
        // 對TC發起請求
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
                                                           TransactionalExecutor.Code.BeginFailure);
    }
}   
           

  那我們向下來看begin方法,那要注意,這裡調用begin方法的是DefaultGlobalTransaction

@Override
public void begin(int timeout, String name) throws TransactionException {
    //判斷調用者是否是TM
    if (role != GlobalTransactionRole.Launcher) {
        assertXIDNotNull();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
        }
        return;
    }
    assertXIDNull();
    String currentXid = RootContext.getXID();
    if (currentXid != null) {
        throw new IllegalStateException("Global transaction already exists," +
                                        " can't begin a new global transaction, currentXid = " + currentXid);
    }
    // 擷取Xid
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction [{}]", xid);
    }
}
           

  在向下來看begin方法,這時候使用的是(預設事務管理者)DefaultTransactionManager.begin,來真正的擷取xid,其中就是傳入事務的相關資訊,最終TC端傳回對應的全局事務Xid。

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 發送請求得到響應
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    //傳回Xid
    return response.getXid();
}
           

  這裡采用的是Netty的通訊方式

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    try {
        // 通過Netty發送請求
        return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
    } catch (TimeoutException toe) {
        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
    }
}
           

四、Seata源碼分析-資料源代理

  上節課我們分析了整體的Seata-AT模式的2PC執行流程,那麼這節課我們要分析的就是在AT模式中的關鍵點,資料源代理

1.AT模式的核心點:

  1. 擷取全局鎖、開啟全局事務
  2. 解析SQL并寫入undolog

  那麼上節課其實我們已經把第一步分析清楚了,那麼這節課我們就要分析的是AT模式如何解析SQL并寫入undolog,首先我們要先明确實際上Seata其中采用了資料源代理的模式。

  那麼這個就需要我們在回顧一下GlobalTransactionScanner這個類型,在這個類型中繼承了一些的接口和抽象類,比較關鍵的幾個:

  • AbstractAutoProxyCreator
  • InitializingBean
  • ApplicationContextAware
  • DisposableBean

這裡給大家回顧一下:

  1. 繼承ApplicationContextAware類型以後,需要實作對應的方法: void setApplicationContext(ApplicationContext applicationContext) throws BeansException 當spring啟動完成後,會自動調用這個類型,把ApplicationContext給bean。也就是說,GlobalTransactionScanner天然能拿到Spring的環境。
  2. 繼承了InitializingBean接口,需要實作一個方法: void afterPropertiesSet() throws Exception; 凡是繼承該接口的類,在初始化bean的時候,當所有properties都設定完成後,會執行該方法。
  3. 繼承DisposableBean,需要實作一個方法: void destroy() throws Exception; 和InitializingBean接口相反,這個是在銷毀的時候會調用這個方法。
  4. AbstractAutoProxyCreator就比較複雜了,它Spring實作AOP的一種方式。本質上是一個BeanPostProcessor,他在bean初始化之前,調用内部的createProxy方法,建立一個bean的AOP代理bean并傳回,對Bean的增強。

總結一下:總體的邏輯就是,GlobalTransactionScanner掃描有注解的bean,做AOP增強。

2.資料源代理

  關于資料源代理這裡我們

  全局事務攔截成功後最終還是執行了業務方法的,但是由于Seata對資料源做了代理,是以sql解析與undolog入庫操作是在資料源代理中執行的,箭頭處的代理就是Seata對DataSource,Connection,Statement做的代理封裝類

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

image20220226142501746.png

  資料源代理是非常重要的一個環節。我們知道,在分布式事務運作過程中,undo log等的記錄、資源的鎖定等,都是使用者無感覺的,因為這些操作都在資料源的代理中完成了。

3.資料源代理DataSourceProxy

  DataSourceProxy的主要功能為,它在構造方法中調用了一個自定義的init方法,主要做了以下能力的增強:

  1. 為每個資料源辨別了資源組ID
  2. 如果配置打開,會有一個定時線程池定時更新表的中繼資料資訊并緩存到本地
  3. 生成代理連接配接ConnectionProxy

那我們先來看init方法

private void init(DataSource dataSource, String resourceGroupId) {
    //資源組ID,預設是“default”這個預設值
    this.resourceGroupId = resourceGroupId;
    try (Connection connection = dataSource.getConnection()) {
        //根據原始資料源得到JDBC連接配接和資料庫類型
        jdbcUrl = connection.getMetaData().getURL();
        dbType = JdbcUtils.getDbType(jdbcUrl);
        if (JdbcConstants.ORACLE.equals(dbType)) {
            userName = connection.getMetaData().getUserName();
        }
    } catch (SQLException e) {
        throw new IllegalStateException("can not init dataSource", e);
    }
    DefaultResourceManager.get().registerResource(this);
    if (ENABLE_TABLE_META_CHECKER_ENABLE) {
        //如果配置開關打開,會定時線程池不斷更新表的中繼資料資訊
        /**
        *每分鐘查詢一次資料源的表結構資訊并緩存,在需要查詢資料庫結構時會用到,不然每次去資料庫查詢結構效率會很低。
        */
        tableMetaExcutor.scheduleAtFixedRate(() -> {
            try (Connection connection = dataSource.getConnection()) {
                TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                    .refresh(connection, DataSourceProxy.this.getResourceId());
            } catch (Exception ignore) {
            }
        }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
    }

    //Set the default branch type to 'AT' in the RootContext.
    RootContext.setDefaultBranchType(this.getBranchType());
}
           

  這3個增強裡面,前兩個都比較容易了解,第三是最重要的。我們知道在AT模式裡面,會自動記錄undo log、資源鎖定等等,都是通過ConnectionProxy完成的。

  另外,DataSourceProxy重寫了幾個方法。

  重點是getConnection,此時會傳回一個ConnectionProxy,而不是原生的Connection

@Override
public ConnectionProxy getConnection() throws SQLException {
    Connection targetConnection = targetDataSource.getConnection();
    return new ConnectionProxy(this, targetConnection);
}

@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
    Connection targetConnection = targetDataSource.getConnection(username, password);
    return new ConnectionProxy(this, targetConnection);
}
           

4.ConnectionProxy分析

  ConnectionProxy繼承了AbstractConnectionProxy。一看到Abstract,就知道它的父類封裝了很多通用工作。它的父類裡面還使用了PreparedStatementProxy、StatementProxy、DataSourceProxy。

師兄因為和面試官聊了下Seata的底層原理,進了位元組了

image20220226172114629.png

  我們先來分析AbstractConnectionProxy

5.AbstractConnectionProxy

  在這個抽象連接配接對象中,定義了很多通用的邏輯,是以在這其中我們要關注的主要在于PreparedStatementProxy和StatementProxy,其實這裡的通用邏輯就是資料源連接配接的步驟,擷取連接配接,建立執行對象等等這些

@Override
public Statement createStatement() throws SQLException {
    //調用真實連接配接對象獲得Statement對象
    Statement targetStatement = getTargetConnection().createStatement();
    //建立Statement的代理
    return new StatementProxy(this, targetStatement);
}

@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
    //資料庫類型,比如mysql、oracle等
    String dbType = getDbType();
    // support oracle 10.2+
    PreparedStatement targetPreparedStatement = null;
    //如果是AT模式且開啟全局事務,那麼就會進入if分支
    if (BranchType.AT == RootContext.getBranchType()) {
        List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
        if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
            SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
            if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                //得到表的中繼資料
                TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                                                                                                   sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                //得到表的主鍵列名
                String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
            }
        }
    }
    if (targetPreparedStatement == null) {
        targetPreparedStatement = getTargetConnection().prepareStatement(sql);
    }
    // 建立PreparedStatementProxy代理
    return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
           

6.分布式事務SQL執行

  在這兩個代理對象中,執行SQL語句的關鍵方法如下:

@Override
public ResultSet executeQuery(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
}

@Override
public int executeUpdate(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
}

@Override
public boolean execute(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
}
           

  其他執行SQL語句的方法與上面三個方法都是類似的,都是調用ExecuteTemplate.execute方法,未完待續....

繼續閱讀