天天看點

Seata 高性能 RPC 通信的實作- 巧用 reactor 模式

一、Reactor 模式

reactor 模式是一種事件驅動的應用層 I/O 處理模式,基于分而治之和事件驅動的思想,緻力于建構一個高性能的可伸縮的 I/O 處理模式。維基百科對 Reactor pattern 的解釋:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers

大緻意思是說,reactor設計模式是一種事件處理模式,用于同時有一個或多個請求發送到事件處理器(service handler),這個事件處理器會采用多路分離(demultiplexes )的方式,同步的将這些請求分發到請求處理器(request handlers)。

Seata 高性能 RPC 通信的實作- 巧用 reactor 模式

不難看出,上邊介紹的 reactor 模式是一種抽象;從實作角度說,reactor 模式有許多變種,不同程式設計語言中的實作也有差異。就 java 而言,大師 Doug Lea 在其【Scalable IO in Java】中就講述了幾個reactor模式的演進,如單線程版本、多線程版 ,閱讀此文後,筆者對大師所講reactor模式演進的了解與網絡中一些描述稍有差異。

在reactor 單線程版中,隻有一個reactor線程,線程中通過 select (I/O 多路複用接口) 監聽所有 I/O 事件,收到 I/O 事件後通過 dispatch 進行分發給 Handlers 處理,此版本容易實作,也容易了解,但性能不高。為了适配多處理器,充分利用多核并行處理的優勢,實作高性能的網絡服務,可以采用分治政策,關鍵環節采用多線程模式,于是就出現了reactor多線程版本,而多線程的應用展現為worder線程和reactor線程,多線程應該被池化管理,這樣才容易被調整和控制。線程池中的線程數會比用戶端的數量少很多,實際數量可以根據程式本身是 CPU 密集型還是 I/O 密集型操作來進行合理的配置設定。

  • 多個 worder 線程(池化管理)
    • 屬于網絡 I/O 操作與業務處理的拆分,因為 reactors 監聽到 I/O 事件後應該快速分發給 handlers 來處理程式;但如果 handler 中的非 I/O 操作慢了就會減慢 reactor 中的 I/O 事件響應速度,是以把非 I/O 操作從 reactors 的 I/O 線程轉移到其他線程中,即由worker線程來分擔非 I/O 邏輯的操作處理。
  • 多個 reactor 線程(池化管理)
    • 屬于網絡建連操作與網絡 I/O 讀寫操作的拆分,因為由一個reactor在一個線程中完成所有 I/O 操作也會遇到性能瓶頸,可采取拆分并增加reactor政策,将 I/O 負載配置設定給多個 reactor(每個reactor都有自己的線程、選擇器和排程循環)以達到負載平衡。這看起來挺不錯,但誰來執行配置設定以達到負載均衡呢?或許是因為這個問題,将reactor拆分為兩類角色,mainReactor負責接收連接配接,之後采用一定的負載均衡政策将新連接配接配置設定給其他subReactor來處理 I/O 讀寫,這樣的拆分自然流暢。
Seata 高性能 RPC 通信的實作- 巧用 reactor 模式

如此就演進出如上圖中的主從reactor多線程模型。請注意,結合【Scalable IO in Java】原文中的用詞和描述看,上圖中的mainReactor和subReactor可以有多個并做池化管理,所有也有一些文章中會看到如主ReactorGroup、mainReactorGroup、從ReactorGroup、subReactorGroup等這類名詞用 Group 字尾來強調 Reactor 是池化管理。 或許是不好布局,也或許是為了凸顯主從reactor角色的協作關系,上圖中都隻展示了一個,另外服務端應用通常隻暴露一個服務端口時,隻需用一個 mainReactor 來監聽端口上的連接配接事件并處理。

二、Netty 主從 reactor 多線程模型

Netty中reactor所對應的實作類是NioEventLoop,其核心邏輯如下:

  • 不同類型的 channel 向 Selector 注冊所感興趣的事件
  • 掃描是否有感興趣的事件發生
  • 事件發生後做相應的處理

用戶端和服務端分别會有不同類型的channel,用戶端建立SocketChannel向服務端發起連接配接請求,服務端建立ServerSocketChannel監聽用戶端連接配接,建連後建立SocketChannel與用戶端的SocketChannel互相收發資料,這些channel分工不同,向 Selector 注冊所感興趣的事件情況也不同:

用戶端/服務端 channel OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
用戶端 SocketChannel YES
服務端 ServerSocketChannel YES
服務端 SocketChannel YES YES

Netty中 Nio 方式實作幾種 reactor 模型如下:

Seata 高性能 RPC 通信的實作- 巧用 reactor 模式

mainReactor 對應 Netty 中配置的 bossGroup 線程組(下圖中的主ReactorGroup),主要負責接受用戶端連接配接的建立。每 bind 一個端口就用掉一個bossGroup中的線程。

subReactor 對應 Netty 中配置的 workerGroup 線程組(下圖中的 reactorGroup),bossGroup 線程組接受完用戶端的連接配接後,将 channel 轉交給 workerGroup 線程組,在 workerGroup 線程組内選擇一個線程,執行 I/O 讀寫的處理,workerGroup 線程組預設是 2 * CPU 核數個線程。

Seata 高性能 RPC 通信的實作- 巧用 reactor 模式

主從 reactor 模式的核心流程:

  1. 如果隻監聽一個端口,那麼隻需一個主reactor幹活兒,是以通常看到boosGroup隻配置一個線程。主reactor運作在獨立的線程中 ,該線程中隻負責與用戶端的連接配接請求
  2. 從reactor在伺服器端可以不止一個, 通常運作多個從 reactor , 每個從 reactor 也運作在一個獨立的線程中 ,負責與用戶端的讀寫操作
  3. 主 reactor 檢測到用戶端的連結後,建立 NioSocketChannel,按照一定的算法循環選取(負載均衡)一個從reactor,并把剛建立的NioSocketChannel 注冊到這個從 reactor 中,這樣建連和讀寫事件互不影響。
  4. 一個 reactor 中可被注冊多個NioSocketChannel,這個 reactor 監聽所有的被配置設定的 NioSocketChannel 的讀寫事件 , 如果監聽到用戶端的資料發送事件 , 将對應的業務邏輯轉發給 NioSocketChannel 中的pipeline 裡的 handler 鍊進行處理
  5. handler 最好隻負責響應 I/O 事件,不處理具體的與用戶端互動的業務邏輯 , 這樣不會長時間阻塞 , 其 read 方法讀取用戶端資料後 , 将消息資料交給業務線程池去處理相關業務邏輯
  6. 業務線程池完成相關業務邏輯的處理後,将結果傳回,通過NioSocketChannel的的pipeline 裡的 handler 鍊将結果消息寫回給用戶端
  7. 當buffer不滿足将結果消息寫回給用戶端時的條件時,注冊寫事件,等待可寫時再寫

三、Seata Server 端 的 reactor 模式應用

Seata Server 采用了 主從 reactor 多線程模型,對應這個模型的話是有四個線程池,其中自定義業務線程池是兩個。

功能 線程池對象 備注
接收用戶端連接配接 NettyServerBootstrap#eventLoopGroupBoss
處理 IO 事件 NettyServerBootstrap#eventLoopGroupWorker 部分 RPC 消息在這裡處理
處理用戶端的 request 消息 AbstractNettyRemoting#messageExecutor 用戶端主動發給的消息
處理用戶端的 response 消息 NettyRemotingServer#branchResultMessageExecutor 服務端主動發給用戶端消息,用戶端處理後給服務端響應

3.1、NettyServerBootstrap#eventLoopGroupBoss

筆者的環境未啟用 epoll,關鍵資訊如下:

  • 線程數:1,隻監聽一個端口
  • 線程名字首:“NettyBoss”
this.eventLoopGroupBoss = new NioEventLoopGroup(
        //CONFIG.getInt("transport.threadFactory.bossThreadSize", 1);
        nettyServerConfig.getBossThreadSize(),
    new NamedThreadFactory(
            // CONFIG.getConfig("transport.threadFactory.bossThreadPrefix", "NettyBoss");
            nettyServerConfig.getBossThreadPrefix(),
            //CONFIG.getConfig("transport.threadFactory.bossThreadSize", 1);
            nettyServerConfig.getBossThreadSize())
);

複制代碼           

3.2、NettyServerBootstrap#eventLoopGroupWorker

筆者的環境未啟用 epoll,關鍵資訊如下:

  • 線程數:預設值是 cpu 核數 * 2
  • 線程名字首:“NettyServerNIOWorker”
this.eventLoopGroupWorker = new NioEventLoopGroup(
        // System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//預設值cpu核數*2
        nettyServerConfig.getServerWorkerThreads(),
    new NamedThreadFactory(
            // CONFIG.getConfig("transport.threadFactory.workerThreadPrefix",
            //            enableEpoll() ? EPOLL_WORKER_THREAD_PREFIX : DEFAULT_NIO_WORKER_THREAD_PREFIX);
            // 預設值 NettyServerNIOWorker ,沒有啟用 epoll
            nettyServerConfig.getWorkerThreadPrefix(),
            //System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//預設值 cpu核數*2
            nettyServerConfig.getServerWorkerThreads())
);
複制代碼           

3.3、AbstractNettyRemoting#messageExecutor

此線程池處理用戶端的 request 消息,關鍵參數資訊如下:

  • 線程數:50 ~ 500
  • keepAlive:500 秒
  • 線程名字字首: "ServerHandlerThread"
  • 隊列長度: 500
  • 拒絕政策:CallerRunsPolicy(),飽和的情況下,調用者來執行該任務,即 Netty 的 I/O 線程
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
        //Integer.parseInt(System.getProperty("transport.minServerPoolSize", "50"));
        NettyServerConfig.getMinServerPoolSize(),
        //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
        NettyServerConfig.getMaxServerPoolSize(),
        //Integer.parseInt(System.getProperty("transport.keepAliveTime", "500"));
        NettyServerConfig.getKeepAliveTime(),
        TimeUnit.SECONDS,
        //Integer.parseInt(System.getProperty("transport.maxTaskQueueSize", "500"));
        new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
        new NamedThreadFactory(
                "ServerHandlerThread",
                //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
                NettyServerConfig.getMaxServerPoolSize()),
                //飽和的情況下,調用者來執行該任務,即Netty的IO線程
                new ThreadPoolExecutor.CallerRunsPolicy()
);
複制代碼           

3.4、NettyRemotingServer#branchResultMessageExecutor

此線程池處理用戶端的 response 消息,關鍵參數資訊如下:

  • 線程數:cpu 核數2 ~ cpu 核數2
  • keepAlive:500 秒
  • 線程名字字首: "BranchResultHandlerThread"
  • 隊列長度: 20000
  • 拒絕政策:CallerRunsPolicy(),飽和的情況下,調用者來執行該任務,即 Netty 的 IO 線程
private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(
        //System.getProperty("transport.minBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),預設值 cpu核數*2
        NettyServerConfig.getMinBranchResultPoolSize(),
        //System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),預設值 cpu核數*2
        NettyServerConfig.getMaxBranchResultPoolSize(),
        // System.getProperty("transport.keepAliveTime", "500"),預設值500
        NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(
                //System.getProperty("transport.maxTaskQueueSize", "20000"),預設值 20000
                NettyServerConfig.getMaxTaskQueueSize()),
        new NamedThreadFactory(
                // 分支響應消息的處理線程的名字字首  BranchResultHandlerThread
                "BranchResultHandlerThread",
                // System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),預設值 cpu核數*2
                NettyServerConfig.getMaxBranchResultPoolSize()
        ),
        //飽和的情況下,調用者來執行該任務,即Netty的IO線程
        new ThreadPoolExecutor.CallerRunsPolicy()
);
複制代碼           

3.5、業務線程池如何處理消息

3.5.1、登記消息處理器

Seata 消息處理的核心邏輯是:定義好什麼類型的消息,使用哪個消息處理器,這個消息處理器的消息處理邏輯在哪個線程池中執行。這個映射關系通過AbstractNettyRemoting#processorTable來存儲。

/**
 * 可以接收什麼類型的消息,以及使用哪個消息處理器和線程池來處理消息
 * HashMap<消息類型, Pair<消息處理器, 線程池>>
 * processor type {@link MessageType}
 */
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

複制代碼           

各子產品 Netty 元件啟動前,通過AbstractNettyRemotingServer#registerProcessor方法登記到這個結構中。

public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
    Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
    this.processorTable.put(messageType, pair);
}
複制代碼           

拿 Seata Server 來說,如在ServerBootStrap啟動前,通過NettyRemotingServer#registerProcessor注冊好消息處理器。不同消息對應的處理器的線程池也不同,也有一些消息沒有指定業務線程池(沒必要),情況如下:

private void registerProcessor() {
    // 1. registry on request message processor
    ServerOnRequestProcessor onRequestProcessor =
        new ServerOnRequestProcessor(this, getHandler());
    ShutdownHook.getInstance().addDisposable(onRequestProcessor);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2. registry on response message processor
    ServerOnResponseProcessor onResponseProcessor =
        new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
    // 3. registry rm message processor
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4. registry tm message processor
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5. registry heartbeat message processor
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

複制代碼           

3.5.2、處理消息

當 Seata Server 收到用戶端發送的 RPC 消息後,會進入AbstractNettyRemotingServer.ServerHandler#channelRead中,在這裡對消息類型簡單判斷後,委托給processMessage處理。

public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (!(msg instanceof RpcMessage)) {
        return;
    }
    // 收到消息後,委托給 processMessage 處理
    processMessage(ctx, (RpcMessage) msg);
}
複制代碼           

processMessage中通過消息類型找到消息處理器進行業務層處理:

  1. 如果消息處理器有指定的業務線程池,在指定的業務線程池中處理消息
  2. 若消息處理器沒有指定的業務線程池,則在 I/O 線程中直接處理。
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    ...
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 通過消息類型找到消息處理器
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息處理器有指定的業務線程池
            if (pair.getSecond() != null) {
                try {
                    // 在指定的業務線程池中處理消息
                    pair.getSecond().execute(() -> {
                        ...
                        pair.getFirst().process(ctx, rpcMessage);
                        ...
                    });
                } catch (RejectedExecutionException e) {
                    ...
                }
            } else {
                try {
                    //若消息處理器沒有指定的業務線程池,則在I/O現成中直接處理。
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                   ...
                }
            }
        } else {
          ...
        }
    } else {
        ...
    }
}           

四、Seata client 端的 reactor 模式應用

Seata client 端也采用了 reactor 多線程模型,在初始化的時候有RmNettyRemotingClient和TmNettyRemotingClient兩個對象,分别會建立各自的 Bootstrap,RM 和 TM 各有自己的 I/O 線程池和業務線程池。

功能 線程池對象 備注
處理 IO 事件 NettyClientBootstrap#eventLoopGroupWorker
處理業務消息 AbstractNettyRemoting#messageExecutor

源碼裡還有個NettyClientBootstrap#defaultEventExecutorGroup,沒看出來哪裡有用。TmNettyRemotingClient#getInstance()中建構了 TM 的業務線程池,指派給NettyClientBootstrap#messageExecutor,同樣RmNettyRemotingClient#getInstance()中建構了 RM 的業務線程池

4.1、NettyClientBootstrap#eventLoopGroupWorker

用戶端此線程池關鍵資訊如下:

  • 線程數:1
  • 線程名字字首: TM:"NettyClientSelector_TMROLE" RM:"NettyClientSelector_RMROLE"
// 單I/O線程
this.eventLoopGroupWorker = new NioEventLoopGroup(
//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize,
new NamedThreadFactory(
    // CONFIG.getConfig("transport.threadFactory.clientSelectorThreadPrefix", "NettyClientSelector");
    // 再拼上角色後預設值為:"NettyClientSelector_TMROLE"
    getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
    //CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
    selectorThreadSizeThreadSize)
);           

4.2、AbstractNettyRemoting#messageExecutor

TmNettyRemotingClient#getInstance()和RmNettyRemotingClient#getInstance()建立各自的線程池,配置并不相同。

1)TmNettyRemotingClient#getInstance()中所建立線程池的關鍵資訊如下:

  • 線程數:預設值是 cpu 核數 _ 2 ~ cpu 核數 _ 2
  • keepAlive:Integer.MAX_VALUE 秒
  • 線程名字字首:rpcDispatch_TMROLE
  • 隊列長度: 2000
  • 拒絕政策:runsOldestTaskPolicy(),飽和的情況下,添加新任務并由投遞任務的線程運作最早的任務。
public static TmNettyRemotingClient getInstance() {
    if (instance == null) {
        synchronized (TmNettyRemotingClient.class) {
            if (instance == null) {
                NettyClientConfig nettyClientConfig = new NettyClientConfig();
                // 自定義TM業務線程池
                final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
                        nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//2000
                        new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),// TM的線程名是:rpcDispatch_TMROLE
                                nettyClientConfig.getClientWorkerThreads()),// 預設是cpu核數 * 2
                        RejectedPolicies.runsOldestTaskPolicy());//添加新任務并由主線程運作最早的任務。
                instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
            }
        }
    }
    return instance;
}           

2)RmNettyRemotingClient#getInstance() 中所建立線程池的關鍵資訊如下:

  • 線程數:預設是 cpu 核數 _ 2 ~ cpu 核數 _ 2
  • keepAlive:Integer.MAX_VALUE 秒
  • 線程名字字首:rpcDispatch_RMROLE
  • 隊列長度: 20000
  • 拒絕政策:CallerRunsPolicy(),飽和的情況下,調用者來執行該任務,即 Netty 的 IO 線程。
public static RmNettyRemotingClient getInstance() {
    if (instance == null) {
        synchronized (RmNettyRemotingClient.class) {
            if (instance == null) {
                NettyClientConfig nettyClientConfig = new NettyClientConfig();
                // 自定義RM業務線程池
                final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
                        nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//20000
                        new NamedThreadFactory(
                            nettyClientConfig.getRmDispatchThreadPrefix(),// RM的線程名是:rpcDispatch_RMROLE,
                            nettyClientConfig.getClientWorkerThreads()),// 預設是cpu核數 * 2
                    new ThreadPoolExecutor.CallerRunsPolicy());////飽和的情況下,調用者來執行該任務,即Netty的IO線程
                instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
            }
        }
    }
    return instance;
}           

4.3、消息處理

TmNettyRemotingClient和RmNettyRemotingClient在init()方法中會調用registerProcessor()方法注冊各自的 RPC 消息處理器。收到 RPC 消息後就由這些處理器+對應的線程池做後續處理,消息的相關業務屬性在後續的事務流程中介紹。

五、支撐特殊能力的業務線程池

1)AbstractNettyRemotingClient#mergeSendExecutorService

用于批量發送請求,多個消息合并,減少通信次數。實作邏輯比較清晰,當允許發送批量消息時,消息首先分桶儲存到 basketMap,在一個周期性的無線循環中,把 basketMap 中的消息隊列取出來,把每個隊列的消息都放到 mergeMessage 中,最後把 mergeMessage 發送出去。

  • 線程數:1
  • 線程名字首:”rpcMergeMessageSend“
  • AbstractNettyRemotingClient中功能相關的屬性介紹: Object mergeLock:發送請求的鎖對象。 Map<Integer, MergeMessage> mergeMsgMap:當發送消息的類型是 MergeMessage,那麼就将消息儲存到 mergeMsgMap。 ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap:當允許發送批量消息時,消息首先分桶儲存到 basketMap,然後通過定時任務将儲存 basketMap 的消息發送出去。basketMap 的是伺服器的位址,value 是儲存的發送個伺服器的消息。按照位址分桶是将要發給同一個伺服器的多個消息合并到一個MergedWarpMessage後發送。
  • 有配置開關,預設值如下:
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false           

對應的關鍵代碼邏輯如下:

  1. 在AbstractNettyRemotingClient#sendSyncRequest中,同步發送時将消息緩存起來,預設配置看隻有 RM 開啟了消息合并發送,另外同步發送逾時設定,預設 TM 30 秒,RM 15 秒。按照 IP 位址分桶,同一個目标執行個體的消息才可以合并發送
public Object sendSyncRequest(Object msg) throws TimeoutException {
    String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
    // 同步發送逾時設定,預設 TM 30秒,RM 15秒
    long timeoutMillis = this.getRpcRequestTimeout();
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

    // send batch message
    // put message into basketMap, @see MergedSendRunnable
    // 預設隻有RM開啟了消息合并發送,TM 并未開啟批發送
    if (this.isEnableClientBatchSendRequest()) {

        // send batch message is sync request, needs to create messageFuture and put it in futures.
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        // put message into basketMap
        // 按照目标位址分桶,同一個TC執行個體的消息才可以合并發送
        BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
            key -> new LinkedBlockingQueue<>());
        if (!basket.offer(rpcMessage)) {
            LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                    serverAddress, rpcMessage);
            return null;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: {}", rpcMessage.getBody());
        }
        // 通知合并發送線程 有消息要發送,醒來幹活兒
        if (!isSending) {
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }

        try {
            // 阻塞等待消息的響應。
            return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}",
                exx.getMessage(), serverAddress, rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }

    } else {
        // 不合并發送的話,就擷取指定IP的channel,并立即發送。
        Channel channel = clientChannelManager.acquireChannel(serverAddress);
        return super.sendSync(channel, rpcMessage, timeoutMillis);
    }

}           
  1. 在AbstractNettyRemotingClient#init中建構線程池mergeSendExecutorService,在這個線程池中執行消息的批處理(消息合并、消息發送)。
public void init() {
    ...
    // 通過線程池有1個線程,執行消息合并發送
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(
            MAX_MERGE_SEND_THREAD,//1
            MAX_MERGE_SEND_THREAD,//1
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(
                    //TM : rpcMergeMessageSend_TMROLE
                    //RM : rpcMergeMessageSend_RMROLE
                    //SERVER : rpcMergeMessageSend_SERVERROLE
                    getThreadPrefix(),
                    MAX_MERGE_SEND_THREAD)//1
        );
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}           
  1. 批處理任務MergedSendRunnable中,實作了消息合并和消息發送
private class MergedSendRunnable implements Runnable {

    @Override
    public void run() {
        while (true) {
            //mergeLock 用于生産-消費的協作
            synchronized (mergeLock) {
                try {
                    // MAX_MERGE_SEND_MILLS = 1,還有線程休眠的效果
                    mergeLock.wait(MAX_MERGE_SEND_MILLS);
                } catch (InterruptedException e) {
                }
            }
            isSending = true;
            // 發送消息,消息是按照IP位址分組
            basketMap.forEach((address, basket) -> {
                if (basket.isEmpty()) {
                    return;
                }

                MergedWarpMessage mergeMessage = new MergedWarpMessage();
                //如果basket隊列不為空,将其中的消息全取出來,添加到mergeMessage中
                while (!basket.isEmpty()) {
                    RpcMessage msg = basket.poll();
                    mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                    mergeMessage.msgIds.add(msg.getId());
                }
                // debug 列印本次發送的消息個數和每個消息的Id,以及此時在futures中做逾時管控的所有消息的Id,
                // 兩個消息Id比對,可知道消息積壓情況9666
                if (mergeMessage.msgIds.size() > 1) {
                    printMergeMessageLog(mergeMessage);
                }
                Channel sendChannel = null;
                try {
                    // 擷取指定位址的channel對象,異步發送消息
                    // 發送批量消息是同步的請求,但是這裡不需要得到傳回的值,在消息儲存到basketMap之前,已經建立了messageFuture了,
                    // 傳回值将會從ClientOnResponseProcessor中得到
                    sendChannel = clientChannelManager.acquireChannel(address);
                    // 因為原始消息的發送已經加入過逾時管控,是以批量發送環節不再需要加入額外的逾時控制
                    AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                } catch (FrameworkException e) {
                    if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                        destroyChannel(address, sendChannel);
                    }
                    // fast fail
                    // 發生異常,快速将儲存在mergeMessage的消息清理掉
                    for (Integer msgId : mergeMessage.msgIds) {
                        MessageFuture messageFuture = futures.remove(msgId);
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(
                                new RuntimeException(String.format("%s is unreachable", address), e));
                        }
                    }
                    LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                }
            });
            isSending = false;
        }
    }           

2)AbstractNettyRemoting#timerExecutor

Netty 的 I/O 操作異步的,RPC 消息的發送操作會對應一個 Future 對象,在 Seata 中這個 Futrue 對象被封裝為 MessageFuture,需同步發送的消息,其對應的 MessageFuture 被放入 map 緩存起來,當收到消息的 response 後,将消息從 map 中移除。AbstractNettyRemoting#timerExecutor裡的這個線程定時巡檢 map 中的消息,若逾時未收到 response 則認定為發送逾時。

  • 線程數:1
  • 線程名字首:”timeoutChecker“
  • scheduleAtFixedRate :延遲 3 秒,頻率 3 秒
  • AbstractNettyRemoting中的功能相關的屬性介紹: ScheduledExecutorService timerExecutor:執行定時任務,消息發送以後,到了過期時間還沒有傳回,則會對消息進行清理。 ConcurrentHashMap<Integer, MessageFuture> futures:儲存着不同消息,timerExecutor 會清理 futures 中過期的消息。

對應的關鍵代碼邏輯如下:

  1. 建構定時任務的線程池AbstractNettyRemoting#timerExecutor,隻用 1 個線程
/**
 * 定時器,用于巡檢消息的發送是否逾時
 */
protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,
    new NamedThreadFactory("timeoutChecker", 1, true));           
  1. 通過AbstractNettyRemoting#sendSync同步發送消息,建構MessageFuture并放入futures這個 map 中,發送過程配置監聽器 用于處理 channel 異常,指定失敗原因并從futures中移除,還要銷毀 channel。
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    ...
    // 建構 MessageFuture
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    // 放入 futures 這個map中
    futures.put(rpcMessage.getId(), messageFuture);
    //檢查通道是否可以寫
    channelWritableCheck(channel, rpcMessage.getBody());
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    //在請求發送之前執行鈎子
    doBeforeRpcHooks(remoteAddr, rpcMessage);
    // 發送請求,并配置監聽器 用于處理channel異常
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        // 這裡響應不成功,基本是channel不正常了
        if (!future.isSuccess()) {
            //移除消息
            MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
            if (messageFuture1 != null) {
                messageFuture1.setResultMessage(future.cause());
            }
            //響應不成功,則銷毀channel
            destroyChannel(future.channel());
        }
    });
    ...
    //擷取響應結果
    Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
    //響應之後執行鈎子
    doAfterRpcHooks(remoteAddr, rpcMessage, result);
    ...
}           
  1. 正常收到 response 後,給MessageFuture對象指派,從futures中移除,如ClientOnResponseProcessor#process中的實作
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  ...
  // 從futures中移除
  MessageFuture messageFuture = futures.remove(rpcMessage.getId());
  if (messageFuture != null) {
      // 指派結果
      messageFuture.setResultMessage(rpcMessage.getBody());
  }
}           
  1. 在AbstractNettyRemoting#init中開啟定時任務,巡檢出futures 這個 map 中的逾時對象後從 futures 中移除,不再檢查,并指定結果為 TimeoutException
public void init() {
    // 檢測消息同步發送(sendSync(xxx))是否逾時,
    // 定時任務預設是延遲3秒,間隔3秒
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                MessageFuture future = entry.getValue();
                if (future.isTimeout()) {
                    // 如果過期了則将發送結果設定為TimeoutException
                    // 從futures中移除,不再檢查
                    futures.remove(entry.getKey());
                    RpcMessage rpcMessage = future.getRequestMessage();
                    future.setResultMessage(new TimeoutException(String
                        .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                    }
                }
            }

            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}           

還有線程池跟服務注冊發現和建連相關,會後邊篇章再介紹。