天天看點

Seata Transaction Coordinator

引言

前面,我們已經介紹了 Seata 的整體設計思想,接下來我們深入到其實作細節中,本文介紹 Seata 中最核心的子產品 Transaction Coordinator 的實作,其他 Seata 相關文章均收錄于

<Seata系列文章>

中。

TC

Seata Transaction Coordinator

Transaction Coordinator 整體的子產品圖如上所示:

  • Coordinator Core: 在最下面的子產品是事務協調器核心代碼,主要用來處理事務協調的邏輯,如分支的注冊, commit, rollback 等協調活動。
  • Store: 存儲子產品,用來将我們的資料持久化,防止重新開機或者當機資料丢失。
  • Discover: 服務注冊/發現子產品,用于将 Server 位址暴露給我們 Client。
  • Config: 用來存儲和查找我們服務端的配置。
  • Lock: 鎖子產品,用于給 Seata 提供全局鎖的功能。
  • Rpc: 用于和其他端通信。
  • HA-Cluster: 高可用叢集,目前還沒開源。為 Seata 提供可靠的高可用功能。

Discover

首先來講講比較基礎的 Discover 子產品,又稱服務注冊/發現子產品。我們将 TC 啟動之後,需要将自己的位址暴露給其他使用者 TM & RM, 這部分工作就是由 Discover 子產品實作的。

public interface RegistryService<T> {

    /**
     * The constant PREFIX_SERVICE_MAPPING.
     */
    String PREFIX_SERVICE_MAPPING = "vgroup_mapping.";
    /**
     * The constant PREFIX_SERVICE_ROOT.
     */
    String PREFIX_SERVICE_ROOT = "service";
    /**
     * The constant CONFIG_SPLIT_CHAR.
     */
    String CONFIG_SPLIT_CHAR = ".";

    /**
     * Register.
     *
     * @param address the address
     * @throws Exception the exception
     */
    void register(InetSocketAddress address) throws Exception;

    /**
     * Unregister.
     *
     * @param address the address
     * @throws Exception the exception
     */
    void unregister(InetSocketAddress address) throws Exception;

    /**
     * Subscribe.
     *
     * @param cluster  the cluster
     * @param listener the listener
     * @throws Exception the exception
     */
    void subscribe(String cluster, T listener) throws Exception;

    /**
     * Unsubscribe.
     *
     * @param cluster  the cluster
     * @param listener the listener
     * @throws Exception the exception
     */
    void unsubscribe(String cluster, T listener) throws Exception;

    /**
     * Lookup list.
     *
     * @param key the key
     * @return the list
     * @throws Exception the exception
     */
    List<InetSocketAddress> lookup(String key) throws Exception;

    /**
     * Close.
     * @throws Exception
     */
    void close() throws Exception;
}           

這個子產品有個核心接口 RegistryService,如上圖所示:

  • register:TC 使用,進行服務注冊。
  • unregister:TC 使用,一般在JVM關閉鈎子,ShutdownHook中調用。
  • subscribe:TM RM 使用,注冊監聽事件,用來監聽位址的變化。
  • unsubscribe:TM RM 使用,取消注冊監聽事件, 一般在JVM關閉鈎子,ShutdownHook中調用。
  • lookup:TM RM使用,根據key查找服務位址清單。
  • close:都可以使用,用于關閉Register資源。

如果需要添加自己定義的服務注冊/發現,那麼實作這個接口即可。截止目前在社群的不斷開發推動下,已經有七種服務注冊/發現,分别是consul,etcd3,sofa,redis, zk, nacos, eruka。下面簡單介紹下 redis 的實作:

register

@Override
public void register(InetSocketAddress address) {
    // 校驗位址是否合法
    NetUtil.validAddress(address);
    String serverAddr = NetUtil.toStringAddress(address);
    // 擷取 Redis 的執行個體
    Jedis jedis = jedisPool.getResource();
    try {
        // 将位址注冊到目前 Redis 上面。
        jedis.hset(getRedisRegistryKey(), serverAddr, ManagementFactory.getRuntimeMXBean().getName());
        // 發送注冊成功的通知
        jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
    } finally {
        jedis.close();
    }
}           

流程如下:

  1. 校驗位址是否合法
  2. 擷取 Redis 的執行個體,然後将位址注冊到目前 Redis 上面。
  3. 發送注冊成功的通知

unregister接口類似,就是反方向操作, 這裡不做詳解。

lookup

@Override
public List<InetSocketAddress> lookup(String key) {
    Configuration config = ConfigurationFactory.getInstance();
    // 擷取目前 clusterName 名字
    String clusterName = config.getConfig(PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key);
    if (null == clusterName) {
        return null;
    }
    // 判斷目前 cluster 是否已經擷取過了,如果擷取過就從map中取
    if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
        Jedis jedis = jedisPool.getResource();
        Map<String, String> instances = null;
        // 從 Redis 拿到位址資料,将其轉換成我們所需要的
        try {
            instances = jedis.hgetAll(getRedisRegistryKey());
        } finally {
            jedis.close();
        }
        if (null != instances && !instances.isEmpty()) {
            Set<InetSocketAddress> newAddressSet = new HashSet<>();
            for (Map.Entry<String, String> instance : instances.entrySet()) {
                String serverAddr = instance.getKey();
                newAddressSet.add(NetUtil.toInetSocketAddress(serverAddr));
            }
            CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
        }
        // 将資料變更的 Listener 注冊到 Redis
        subscribe(clusterName, new RedisListener() {
            @Override
            public void onEvent(String msg) {
                String[] msgr = msg.split("-");
                String serverAddr = msgr[0];
                String eventType = msgr[1];
                switch (eventType) {
                    case RedisListener.REGISTER:
                        CLUSTER_ADDRESS_MAP.get(clusterName).add(NetUtil.toInetSocketAddress(serverAddr));
                        break;
                    case RedisListener.UN_REGISTER:
                        CLUSTER_ADDRESS_MAP.get(clusterName).remove(NetUtil.toInetSocketAddress(serverAddr));
                        break;
                    default:
                        throw new ShouldNeverHappenException("unknown redis msg:" + msg);
                }
            }
        });
    }
    return new ArrayList<>(CLUSTER_ADDRESS_MAP.get(clusterName));
}           

訂閱的過程如下:

  1. 擷取目前 clusterName 名字
  2. 判斷目前 cluster 是否已經擷取過了,如果擷取過就從map中取。
  3. 從 Redis 拿到位址資料,将其轉換成我們所需要的資料。
  4. 将資料變動的 Listener 注冊到 Redis

其實這裡面有個問題, 如果擷取了伺服器清單, 但是還沒來得及注冊訂閱時, 發生了伺服器清單變化, 那麼用戶端會感覺不到, 但是這個問題在 Redis 中确實沒有什麼好的辦法解決, 畢竟 Redis 沒有提供機制來解決這個問題。但是 etcd3 中是有機制來解決的, 擷取資料時能拿到當時的版本号, 然後訂閱時從該版本号開始即可。然後我看了一下基于 etcd3 的

RegistryService

實作, 發現它并沒有使用該機制。于是我提了一個

issue

PR

, 感興趣的同學可以去看一下。

subscribe

@Override
public void subscribe(String cluster, RedisListener listener) {
    // 存儲該 listener
    String redisRegistryKey = REDIS_FILEKEY_PREFIX + cluster;
    LISTENER_SERVICE_MAP.putIfAbsent(cluster, new ArrayList<>());
    LISTENER_SERVICE_MAP.get(cluster).add(listener);
    threadPoolExecutor.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Jedis jedis = jedisPool.getResource();
                try {
                    // 向 Redis 注冊
                    jedis.subscribe(new NotifySub(LISTENER_SERVICE_MAP.get(cluster)), redisRegistryKey);
                } finally {
                    jedis.close();
                }
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    });
}           
  1. 存儲該 listener
  2. 向 Redis 注冊

RegistryService

的主要功能就這些了, TM 和 TC 是通過 lookup 找到伺服器清單之後, 會根據設定的負載均衡政策請求 TC, 接下來我們看一看 loadbalance。

loadbalance
public interface LoadBalance {

    /**
     * Select t.
     *
     * @param <T>      the type parameter
     * @param invokers the invokers
     * @return the t
     * @throws Exception the exception
     */
    <T> T select(List<T> invokers) throws Exception;
}           

這個接口的實作比較簡單, 目前就隻有随機和輪訓。

Config

配置子產品也是一個比較基礎,比較簡單的子產品。我們需要配置一些常用的參數比如:Netty的select線程數量,work線程數量,session允許最大為多少等等,當然這些參數在 Seata 中都有自己的預設設定。

同樣的在 Seata 中也提供了一個接口 Configuration,通過它來存取配置内容:

public interface Configuration<T> {
    // 這裡隻保留了 getShort 其他都類似
    /**
     * Gets short.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @param timeoutMills the timeout mills
     * @return the short
     */
    short getShort(String dataId, int defaultValue, long timeoutMills);

    /**
     * Gets short.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @return the int
     */
    short getShort(String dataId, short defaultValue);

    /**
     * Gets short.
     *
     * @param dataId the data id
     * @return the int
     */
    short getShort(String dataId);

    /**
     * Gets config.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @param timeoutMills the timeout mills
     * @return the config
     */
    String getConfig(String dataId, String defaultValue, long timeoutMills);

    /**
     * Gets config.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @return the config
     */
    String getConfig(String dataId, String defaultValue);

    /**
     * Gets config.
     *
     * @param dataId       the data id
     * @param timeoutMills the timeout mills
     * @return the config
     */
    String getConfig(String dataId, long timeoutMills);

    /**
     * Gets config.
     *
     * @param dataId the data id
     * @return the config
     */
    String getConfig(String dataId);

    /**
     * Put config boolean.
     *
     * @param dataId       the data id
     * @param content      the content
     * @param timeoutMills the timeout mills
     * @return the boolean
     */
    boolean putConfig(String dataId, String content, long timeoutMills);

    /**
     * Put config boolean.
     *
     * @param dataId  the data id
     * @param content the content
     * @return the boolean
     */
    boolean putConfig(String dataId, String content);

    /**
     * Put config if absent boolean.
     *
     * @param dataId       the data id
     * @param content      the content
     * @param timeoutMills the timeout mills
     * @return the boolean
     */
    boolean putConfigIfAbsent(String dataId, String content, long timeoutMills);

    /**
     * Put config if absent boolean.
     *
     * @param dataId  the data id
     * @param content the content
     * @return the boolean
     */
    boolean putConfigIfAbsent(String dataId, String content);

    /**
     * Remove config boolean.
     *
     * @param dataId       the data id
     * @param timeoutMills the timeout mills
     * @return the boolean
     */
    boolean removeConfig(String dataId, long timeoutMills);

    /**
     * Remove config boolean.
     *
     * @param dataId the data id
     * @return the boolean
     */
    boolean removeConfig(String dataId);

    /**
     * Add config listener.
     *
     * @param dataId   the data id
     * @param listener the listener
     */
    void addConfigListener(String dataId, T listener);

    /**
     * Remove config listener.
     *
     * @param dataId   the data id
     * @param listener the listener
     */
    void removeConfigListener(String dataId, T listener);

    /**
     * Gets config listeners.
     *
     * @param dataId the data id
     * @return the config listeners
     */
    List<T> getConfigListeners(String dataId);

    /**
     * Gets config from sys pro.
     *
     * @param dataId the data id
     * @return the config from sys pro
     */
    default String getConfigFromSysPro(String dataId) {
        return System.getProperty(dataId);
    }

}           
  • getShort/getInt/Long/Boolean/Config():通過dataId來擷取對應的值。
  • putConfig:用于添加配置。
  • removeConfig:删除一個配置。
  • add/remove/get ConfigListener:添加/删除/擷取 配置監聽器,一般用來監聽配置的變更。

目前為止有四種方式擷取 Config:File(檔案擷取), Nacos, Apollo, ZK,etcd。在 Seata 中首先現在項目 resources 下儲存一個 registry.conf 檔案,在該檔案中配置具體使用 Config 接口哪個實作類。

// registry.conf 相關内容
config {
  # file、nacos 、apollo、zk、consul
  type = "file"

  file {
    name = "file.conf"
  }
}           

config 相關的内容我們就不多描述了, 就是簡單地存取資料, 發生變化時通知各個節點進行改變。

Store

存儲層的實作對于 Seata 是否高性能,是否可靠非常關鍵。

如果存儲層沒有實作好,那麼如果發生當機,在 TC 中正在進行分布式事務處理的資料将會被丢失,既然使用了分布式事務,那麼其肯定不能容忍丢失。如果存儲層實作好了,但是其性能有很大問題,RM 可能會發生頻繁復原那麼其完全無法應對高并發的場景。

在 Seata 中預設提供了檔案方式的存儲,下面我們定義我們存儲的資料為 Session,而我們的TM創造的全局事務資料叫 GlobalSession,RM 創造的分支事務叫 BranchSession,一個 GlobalSession 可以擁有多個 BranchSession。我們的目的就是要将這麼多 Session 存儲下來。

Seata 中目前有 2 種實作方案, 一種是基于檔案的, 一種是基于 DB 的, 我們接下來會分别介紹。

File

基于檔案的實作是

FileTransactionStoreManager

, 它可以使用同步刷盤或異步刷盤的政策,每當有 Session 的狀态的更新時,它都會将變化的内容存儲起來。為了防止存儲檔案的無限增殖,當達到一定條件時,它會另打開一個檔案從頭開始記錄,并将之前的檔案儲存起來。這裡有一個非常巧妙的設計,就是該方案既能保證所有逾時事務不丢,隻有已完成的事務被清除,同時檔案的大小也得到了控制。我們會結合代碼來介紹 Seata 是如何做到的。

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    // 靠鎖保證安全
    writeSessionLock.lock();
    long curFileTrxNum;
    try {
        // 實際的寫資料過程,将編碼後的比特數組寫入 FileChannel
        if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
            return false;
        }
        lastModifiedTime = System.currentTimeMillis();
        curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
        // 如果目前事務存儲檔案已經累計記錄一定數量的事務,并且該檔案使用時間達标,則進行目前檔案的儲存和新檔案的建立
        if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 &&
            (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
            return saveHistory();
        }
    } catch (Exception exx) {
        LOGGER.error("writeSession error," + exx.getMessage());
        return false;
    } finally {
        writeSessionLock.unlock();
    }
    // 實際刷盤過程,根據配置,可以是同步也可以是異步
    flushDisk(curFileTrxNum, currFileChannel);
    return true;
}           

上面的代碼,就是存儲 Session 的入口,其中

logOperation

可以是增加、删除、更新,

session

可以是 GlobalSession,也可以是 BranchSession。其中就 3 個關鍵函數,

writeDataFile

saveHistory

flushDisk

,我們分别介紹一下它們。

private boolean writeDataFile(byte[] bs) {
    if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {
        return false;
    }
    ByteBuffer byteBuffer = null;

    // 有一個預設緩存,如果該緩存太小,則臨時申請
    if (bs.length + 4 > MAX_WRITE_BUFFER_SIZE) {
        //allocateNew
        byteBuffer = ByteBuffer.allocateDirect(bs.length + 4);
    } else {
        byteBuffer = writeBuffer;
        //recycle
        byteBuffer.clear();
    }

    byteBuffer.putInt(bs.length);
    byteBuffer.put(bs);
    return writeDataFileByBuffer(byteBuffer);
}

private boolean writeDataFileByBuffer(ByteBuffer byteBuffer) {
    byteBuffer.flip();
    for (int retry = 0; retry < MAX_WRITE_RETRY; retry++) {
        try {
            // 循環寫入
            while (byteBuffer.hasRemaining()) {
                currFileChannel.write(byteBuffer);
            }
            return true;
        } catch (IOException exx) {
            LOGGER.error("write data file error:" + exx.getMessage());
        }
    }
    LOGGER.error("write dataFile failed,retry more than :" + MAX_WRITE_RETRY);
    return false;
}           

writeDataFile

的實作很簡單,就是同步寫入 FileChannel。接下來看一下最妙的

saveHistory

private boolean saveHistory() throws IOException {
    boolean result;
    try {
        // 找到記憶體中儲存的所有逾時的事務,這些事務是需要復原的,不能清除,但是其他完成的事務是可以清除的,這個儲存過程實際上就是将逾時事務追加到目前檔案的結尾
        result = findTimeoutAndSave();
        // 然後異步關閉檔案
        writeDataFileRunnable.putRequest(new CloseFileRequest(currFileChannel, currRaf));
        // 同時,給檔案改名為historyFullFileName,替換掉舊的historyFullFile,同一時刻,Seata 隻會有 2 個事務存儲檔案,一個是currentDataFile代表正在使用的檔案,一個是historyFullFile,存儲了過往的所有可能有用的 Session
        Files.move(currDataFile.toPath(), new File(hisFullFileName).toPath(), StandardCopyOption.REPLACE_EXISTING);
    } catch (IOException exx) {
        LOGGER.error("save history data file error," + exx.getMessage());
        result = false;
    } finally {
        initFile(currFullFileName);
    }
    return result;
}
// 找到所有逾時的 Session 存儲起來
private boolean findTimeoutAndSave() throws IOException {
    List<GlobalSession> globalSessionsOverMaxTimeout =
        sessionManager.findGlobalSessions(new SessionCondition(MAX_TRX_TIMEOUT_MILLS));
    if (CollectionUtils.isEmpty(globalSessionsOverMaxTimeout)) {
        return true;
    }
    List<byte[]> listBytes = new ArrayList<>();
    int totalSize = 0;
    // 1. find all data and merge
    for (GlobalSession globalSession : globalSessionsOverMaxTimeout) {
        TransactionWriteStore globalWriteStore = new TransactionWriteStore(globalSession, LogOperation.GLOBAL_ADD);
        byte[] data = globalWriteStore.encode();
        listBytes.add(data);
        totalSize += data.length + INT_BYTE_SIZE;
        List<BranchSession> branchSessIonsOverMaXTimeout = globalSession.getSortedBranches();
        if (null != branchSessIonsOverMaXTimeout) {
            for (BranchSession branchSession : branchSessIonsOverMaXTimeout) {
                TransactionWriteStore branchWriteStore =
                    new TransactionWriteStore(branchSession, LogOperation.BRANCH_ADD);
                data = branchWriteStore.encode();
                listBytes.add(data);
                totalSize += data.length + INT_BYTE_SIZE;
            }
        }
    }
    // 2. batch write
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(totalSize);
    for (byte[] bytes : listBytes) {
        byteBuffer.putInt(bytes.length);
        byteBuffer.put(bytes);
    }
    if (writeDataFileByBuffer(byteBuffer)) {
        currFileChannel.force(false);
        return true;
    }
    return false;
}           

現在我們知道 Seata 同時最多有2個存儲檔案,一個是 currentDataFile 一個是 historyFullFile,currentDataFile 存儲了最新的資料,而 historyFullFile 相較于 currentDataFile,還存儲了之前過期的所有 Session。任何時候,如果 TC 當機,重新開機時隻要先讀取 historyFullFile,再讀取 currentDataFile 就能恢複所有資料。

替換 historyFullFile 時,因為會将所有逾時的 Session 資訊先寫入 currentDataFile,然後才會将 currentDataFile 改名為 historyFullFile 并替換掉之前的 oldHistoryFullFile,這樣所有過期 Session 就被延續下去了,實際上 Session 過期時間和建立 currentDataFile 的時間是一緻的,都是 30 分鐘,這樣再進行 historyFullFile 替換時,之前的 oldHistoryFullFile 實際上隻會存在逾時 Session 和完成的 Session,所有逾時 Session 已經被記錄在新的 historyFullFile 中了,而完成的 Session 會在替換時,随着 oldHistoryFullFile 一起被删除。這就是為什麼我覺得這個地方的設計十分巧妙。

最後刷盤的過程也很簡單。根據配置,如果是同步刷盤會用

Future#get

阻塞等待,否則異步進行,

writeDataFileRunnable

内部有一個阻塞隊列,會有一個線程循環從中提取任務并執行,應該不難了解吧。

private void flushDisk(long curFileNum, FileChannel currFileChannel) {
    if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
        SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);
        writeDataFileRunnable.putRequest(syncFlushRequest);
        syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);
    } else {
        writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));
    }
}           

DB

接下來,我們看一下基于 DB 的實作。

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        logStore.insertGlobalTransactionDO(convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        logStore.updateGlobalTransactionDO(convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        logStore.deleteGlobalTransactionDO(convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        logStore.insertBranchTransactionDO(convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        logStore.updateBranchTransactionDO(convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        logStore.deleteBranchTransactionDO(convertBranchTransactionDO(session));
    } else {
        throw new StoreException("Unknown LogOperation:" + logOperation.name());
    }
    return true;
}           

基于 DB 的實作相較于基于檔案的實作就顯得樸實無華,

logStore

實際上就是一個 DAO 層的接口,對應了資料的 CRUD,在重新開機恢複時隻不過是按照條件周遊 DB 中的所有資料,進行 Session 恢複。

Lock

大家知道資料庫實作隔離級别主要是通過鎖來實作的,同樣的在分布式事務架構 Seata 中要實作隔離級别也需要通過鎖。一般在資料庫中資料庫的隔離級别一共有四種:讀未送出,讀已送出,可重複讀,串行化。在 Seata 中可以保證寫操作的互斥性,而讀的隔離級别一般是讀未送出,但是提供了達到讀已送出隔離的手段。

Lock 子產品也就是 Seata 實作隔離級别的核心子產品。在 Lock 子產品中提供了一個接口用于管理我們的鎖:

public interface LockManager {

    /**
     * Acquire lock boolean.
     *
     * @param branchSession the branch session
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean acquireLock(BranchSession branchSession) throws TransactionException;

    /**
     * Un lock boolean.
     *
     * @param branchSession the branch session
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean releaseLock(BranchSession branchSession) throws TransactionException;

    /**
     * Un lock boolean.
     *
     * @param globalSession the global session
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException;

    /**
     * Is lockable boolean.
     *
     * @param xid        the xid
     * @param resourceId the resource id
     * @param lockKey    the lock key
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException;

    /**
     * Clean all locks.
     *
     * @throws TransactionException the transaction exception
     */
    void cleanAllLocks() throws TransactionException;

}           
  • acquireLock:用于對我們的 BranchSession 加鎖,這裡雖然是傳的分支事務 Session,實際上是對分支事務操作的資料行加鎖,成功傳回 true。
  • isLockable:根據事務 ID,資源 ID,鎖住的Key來查詢是否已經加鎖。
  • releaseLock: 釋放分支事務的所有鎖。
  • releaseGlobalSessionLock: 釋放全局事務的所有分支事務的鎖。
  • cleanAllLocks:清除所有的鎖。

在 Seata 中, LockManager 下層有使用的鎖有兩種實作, 一種是基于記憶體的鎖(Session 存儲模式為 File 時使用), 一種是基于 DB 的(Session 存儲模式為 DB 時使用),它們都實作了

Locker

接口:

public interface Locker {

    /**
     * Acquire lock boolean.
     *
     * @param rowLock the row lock
     * @return the boolean
     */
    boolean acquireLock(List<RowLock> rowLock) ;

    /**
     * Un lock boolean.
     *
     * @param rowLock the row lock
     * @return the boolean
     */
    boolean releaseLock(List<RowLock> rowLock);

    /**
     * Is lockable boolean.
     *
     * @param rowLock the row lock
     * @return the boolean
     */
    boolean isLockable(List<RowLock> rowLock);

    /**
     * Clean all locks boolean.
     *
     * @return the boolean
     */
    void cleanAllLocks();
}           

我們可以看到, 在 Locker 中将 branchSession 的概念剝離出去了, 隻保留了 RowLock 的概念, 責任更加單一, 接下來我們分别看看它的實作類。

MemoryLocker

記憶體鎖的實作全都存在一個鎖 Map 中, 它是整個 Locker 的實作核心, 我們先來看一下它的結構:

private static final ConcurrentHashMap<String /* resourceId */,
        ConcurrentHashMap<String /* tableName */,
            ConcurrentHashMap<Integer /* bucketId */,
                ConcurrentHashMap<String /* pk */, Long/* transactionId */>>>>
        LOCK_MAP
        = new ConcurrentHashMap<>();           

我們可以看到, 通過這個 Map 将鎖的粒度控制的很小, 最外層 Map 的 key 是 resourceId, 也就是對應了一個 RM, 然後第二層 Map 的 key 是表名, 對應了 RM 上操作的一張表, 下一層 Map 的 key 是 BucketID, Seata 根據表主鍵哈希值進行了分桶, 讓沖突的機率降低, 預設有 128 個桶, 最後一層 Map 的 key 才是主鍵, Value 是持有該主鍵鎖的事務 ID。

明确了鎖存儲的資料結構後, 再分析加解鎖過程就清晰多了:

// MemoryLocker
@Override
public boolean acquireLock(List<RowLock> rowLocks) {
    if (CollectionUtils.isEmpty(rowLocks)) {
        //no lock
        return true;
    }
    String resourceId = branchSession.getResourceId();
    long transactionId = branchSession.getTransactionId();

    ConcurrentHashMap<ConcurrentHashMap<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
    ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
    // 确認 RM 對應的 Map 是否已經建構
    if (dbLockMap == null) {
        LOCK_MAP.putIfAbsent(resourceId,
            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>>());
        dbLockMap = LOCK_MAP.get(resourceId);
    }

    for (RowLock lock : rowLocks) {
        String tableName = lock.getTableName();
        String pk = lock.getPk();
        ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>> tableLockMap = dbLockMap.get(tableName);
        // 确認表對應的 Map 是否已經建構好
        if (tableLockMap == null) {
            dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>());
            tableLockMap = dbLockMap.get(tableName);
        }
        int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
        ConcurrentHashMap<String, Long> bucketLockMap = tableLockMap.get(bucketId);
        // 确認 bucket map 是否已經建構好
        if (bucketLockMap == null) {
            tableLockMap.putIfAbsent(bucketId, new ConcurrentHashMap<String, Long>());
            bucketLockMap = tableLockMap.get(bucketId);
        }
        // 實際加鎖過程
        Long previousLockTransactionId = bucketLockMap.putIfAbsent(pk, transactionId);
        if (previousLockTransactionId == null) {
            //No existing lock, and now locked by myself
            Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
            if (keysInHolder == null) {
                bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
                keysInHolder = bucketHolder.get(bucketLockMap);
            }
            keysInHolder.add(pk);
        } else if (previousLockTransactionId == transactionId) {
            // Locked by me before
            continue;
        } else {
            LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + previousLockTransactionId);
            try {
                // Release all acquired locks.
                branchSession.unlock();
            } catch (TransactionException e) {
                throw new FrameworkException(e);
            }
            return false;
        }
    }
    return true;
}           

我們可以看到, 加鎖的過程無非就是确認各級 Map 中是否有自己要的資料, 如果沒有就用 putIfAbsent 添加進去, 最後到主鍵所在的 bucketMap 時, 才是真正加鎖并确認的過程:

  1. 使用 putIfAbsent 将自己的 transactionId 填入 bucketLockMap
  2. 如果 previousLockTransactionId 為空, 說明自己獲得了鎖, 把自己獲得的鎖記錄在 branchSession 中, 友善釋放時查找
  3. 如果 previousLockTransactionId 和自己的 transactionId 相同, 說明這個鎖之前就被自己持有了, 直接傳回即可
  4. 否則, 發生了鎖沖突, 釋放自己之前擷取到的所有鎖

其實, 這個實作中原來有一個死鎖的 bug, 之前給 bucket Map 的加鎖過程, 使用了 Synchronized block, 如果兩個分支 Session 要同時鎖一個表的相同資料, 并且加鎖的順序不同(BS1: row1, row2, row3; BS2: row3, row2, row1), 就會發生死鎖。

導緻這個 bug 的原因是Synchronized block 的作用範圍有誤, 将解鎖過程也包在了該代碼塊中。是以, 當時我就提了

pr

, 有興趣的同學可以去看一看。

釋放鎖的過程就很簡單了, 周遊 branchSession 中持有的所有鎖, 并依次釋放它們。

// MemoryLocker
@Override
public boolean releaseLock(List<RowLock> rowLock) {
    // 取出所有持有的鎖
    ConcurrentHashMap<ConcurrentHashMap<String, Long>, Set<String>> lockHolder = branchSession.getLockHolder();
    if (lockHolder == null || lockHolder.size() == 0) {
        return true;
    }
    Iterator<Map.Entry<ConcurrentHashMap<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();
    // 挨個釋放鎖
    while (it.hasNext()) {
        Map.Entry<ConcurrentHashMap<String, Long>, Set<String>> entry = it.next();
        ConcurrentHashMap<String, Long> bucket = entry.getKey();
        Set<String> keys = entry.getValue();
        for (String key : keys) {
            // remove lock only if it locked by myself
            bucket.remove(key, branchSession.getTransactionId());
        }
    }
    lockHolder.clear();
    return true;
}           

這裡大家可能會有疑問, 存在記憶體中的鎖, 如果發生了崩潰, 重新開機的時候鎖不就沒了麼, 其實 Seata 在重新開機并恢複 Session 的同時, 也會按順序恢複各個 Session 的鎖, 下面隻會展示核心代碼。

/**
 * io.seata.server.session.SessionHolder#reload
 */
protected static void reload() {
    // ...
    Collection<GlobalSession> reloadedSessions = ROOT_SESSION_MANAGER.allSessions();
    if (reloadedSessions != null && !reloadedSessions.isEmpty()) {
        reloadedSessions.forEach(globalSession -> {
            GlobalStatus globalStatus = globalSession.getStatus();
            switch (globalStatus) {
                case UnKnown:
                case Committed:
                case CommitFailed:
                case Rollbacked:
                case RollbackFailed:
                case TimeoutRollbacked:
                case TimeoutRollbackFailed:
                case Finished:
                    throw new ShouldNeverHappenException("Reloaded Session should NOT be " + globalStatus);
                case AsyncCommitting:
                    try {
                        // 恢複未完成的異步送出過程
                        globalSession.addSessionLifecycleListener(getAsyncCommittingSessionManager());
                        getAsyncCommittingSessionManager().addGlobalSession(globalSession);
                    } catch (TransactionException e) {
                        throw new ShouldNeverHappenException(e);
                    }
                    break;
                default: {
                    ArrayList<BranchSession> branchSessions = globalSession.getSortedBranches();
                    // Lock, 重新加鎖
                    branchSessions.forEach(branchSession -> {
                        try {
                            branchSession.lock();
                        } catch (TransactionException e) {
                            throw new ShouldNeverHappenException(e);
                        }
                    });
                    // ...
                }
            }

        });
    }
}           

DataBaseLocker

和 SessionManager 的實作相同, DataBaseLocker 的加鎖過程實際上就是一個對 DB 增删資料。因為這部分比較簡單, 是以我們隻展示加鎖的最核心内容:

// LockStoreDataBaseDAO
protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
    PreparedStatement ps = null;
    try {
        //insert
        String insertLockSQL = LockStoreSqls.getInsertLockSQL(lockTable, dbType);
        ps = conn.prepareStatement(insertLockSQL);
        ps.setString(1, lockDO.getXid());
        ps.setLong(2, lockDO.getTransactionId());
        ps.setLong(3, lockDO.getBranchId());
        ps.setString(4, lockDO.getResourceId());
        ps.setString(5, lockDO.getTableName());
        ps.setString(6, lockDO.getPk());
        ps.setString(7, lockDO.getRowKey());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        if (ps != null) {
            try {
                ps.close();
            } catch (SQLException e) {
            }
        }
    }
}           

Rpc

保證 Seata 高性能的關鍵之一也是使用了 Netty 作為 RPC 架構,采用預設配置的線程模型如下圖所示:

Seata Transaction Coordinator

如果采用預設的基本配置, 那麼會有一個 Acceptor 線程用于處理用戶端的連結,會有 cpu*2 數量的 NIO-Thread,在這些 NIO-Thread 線程中不會做業務太重的事情,隻會做一些速度比較快的事情,比如編解碼,心跳事件,和 TM 注冊。一些比較費時間的業務操作将會交給業務線程池,預設情況下業務線程池配置為最小線程為 100,最大為 500。

關于 Netty 的使用基礎, 我們這裡就不詳細介紹了, 簡單說就是對于每個連接配接都會綁定上資料的 handler, 它會按照責任鍊的原則, 順着 handler 的綁定順序, 處理資料, 這裡簡單看下它都綁定了什麼 handler:

// Rpc Server 和 Rpc Client
ch.pipeline()
    .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
    .addLast(new ProtocolV1Decoder())
    .addLast(new ProtocolV1Encoder())
    .addList(this);           

我們可以看到, 它們都綁定了心跳元件 IdleStateHandler, 然後是編解碼器, 最後是 Server(TC) 和 Client(TM RM), 它們會拿到原始的請求和回應資料, 據此來進行業務互動。

前面介紹 Discover 子產品時, 我們知道 Server 是将自己注冊到注冊中心, 然後 Client 訂閱更新, 并得到 Server 的清單, 最後通過負載均衡選擇一個 Server 進行連接配接。當連接配接建立成功後, Server 會儲存所有的連接配接, 在需要進行分支復原和送出時, 從所有 RM 的連接配接記錄中, 找到對應 RM 的所有連接配接, 它會首先尋找最原始的 RM 節點, 如果該節點當機了, 它會找到該 RM 的其他節點, 然後發送分支送出請求。

HA-Cluster

尚未實作

Metrics

統計接口目前的實作也很簡單, 就是在記憶體中計數, 然後支援通過 HTTP 擷取統計資料,這部分很簡單我就不展示了。

Coordinator Core

在 TC 端, 大部分工作都是響應 TM 的請求, 然後發送送出復原請求給 RM,下達送出或復原指令, 這些部分我們會在後面的 AT 模式串講 和 TCC 模式串講中介紹, 本節主要看一下在 TC 子產品中, 自主進行的一些工作。

當 TC 啟動時, 先恢複本機的 Session, 然後啟動 RPC Server, 最後注冊自己的位址到注冊中心, 這些我們前面已經介紹過了, 除此之外, TC 還會啟動幾個背景線程, 這些線程保證了 TC 的協調工作能夠在發生錯誤時, 最終能順利完成, 我們來看一下這部分的代碼:

/**
 * Init.
 */
public void init() {
    retryRollbacking.scheduleAtFixedRate(() -> {
        try {
            handleRetryRollbacking();
        } catch (Exception e) {
            LOGGER.info("Exception retry rollbacking ... ", e);
        }
    }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    retryCommitting.scheduleAtFixedRate(() -> {
        try {
            handleRetryCommitting();
        } catch (Exception e) {
            LOGGER.info("Exception retry committing ... ", e);
        }
    }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    asyncCommitting.scheduleAtFixedRate(() -> {
        try {
            handleAsyncCommitting();
        } catch (Exception e) {
            LOGGER.info("Exception async committing ... ", e);
        }
    }, 0, ASYN_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    timeoutCheck.scheduleAtFixedRate(() -> {
        try {
            timeoutCheck();
        } catch (Exception e) {
            LOGGER.info("Exception timeout checking ... ", e);
        }
    }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    undoLogDelete.scheduleAtFixedRate(() -> {
        try {
            undoLogDelete();
        } catch (Exception e) {
            LOGGER.info("Exception undoLog deleting ... ", e);
        }
    }, UNDOLOG_DELAY_DELETE_PERIOD, UNDOLOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}           

我們可以看到, 這些背景任務分别是復原重試, 送出重試, 異步送出, 逾時檢測, 删除沒用的 AT 模式 undo log。

文章說明

更多有價值的文章均收錄于

貝貝貓的文章目錄
Seata Transaction Coordinator

版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!

創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。

參考内容

[1]

fescar鎖設計和隔離級别的了解

[2]

分布式事務中間件 Fescar - RM 子產品源碼解讀

[3]

Fescar分布式事務實作原了解析探秘

[4]

Seata TCC 分布式事務源碼分析

[5]

深度剖析一站式分布式事務方案 Seata-Server

[6]

分布式事務 Seata Saga 模式首秀以及三種模式詳解

[7]

螞蟻金服大規模分布式事務實踐和開源詳解

[8]

分布式事務 Seata TCC 模式深度解析

[9]

Fescar (Seata)0.4.0 中文文檔教程

[10]

Seata Github Wiki

[11]

深度剖析一站式分布式事務方案Seata(Fescar)-Server