一、Reactor 模式
reactor 模式是一種事件驅動的應用層 I/O 處理模式,基于分而治之和事件驅動的思想,緻力于建構一個高性能的可伸縮的 I/O 處理模式。維基百科對 Reactor pattern 的解釋:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers
大緻意思是說,reactor設計模式是一種事件處理模式,用于同時有一個或多個請求發送到事件處理器(service handler),這個事件處理器會采用多路分離(demultiplexes )的方式,同步的将這些請求分發到請求處理器(request handlers)。
不難看出,上邊介紹的 reactor 模式是一種抽象;從實作角度說,reactor 模式有許多變種,不同程式設計語言中的實作也有差異。就 java 而言,大師 Doug Lea 在其【Scalable IO in Java】中就講述了幾個reactor模式的演進,如單線程版本、多線程版 ,閱讀此文後,筆者對大師所講reactor模式演進的了解與網絡中一些描述稍有差異。
在reactor 單線程版中,隻有一個reactor線程,線程中通過 select (I/O 多路複用接口) 監聽所有 I/O 事件,收到 I/O 事件後通過 dispatch 進行分發給 Handlers 處理,此版本容易實作,也容易了解,但性能不高。為了适配多處理器,充分利用多核并行處理的優勢,實作高性能的網絡服務,可以采用分治政策,關鍵環節采用多線程模式,于是就出現了reactor多線程版本,而多線程的應用展現為worder線程和reactor線程,多線程應該被池化管理,這樣才容易被調整和控制。線程池中的線程數會比用戶端的數量少很多,實際數量可以根據程式本身是 CPU 密集型還是 I/O 密集型操作來進行合理的配置設定。
- 多個 worder 線程(池化管理)
- 屬于網絡 I/O 操作與業務處理的拆分,因為 reactors 監聽到 I/O 事件後應該快速分發給 handlers 來處理程式;但如果 handler 中的非 I/O 操作慢了就會減慢 reactor 中的 I/O 事件響應速度,是以把非 I/O 操作從 reactors 的 I/O 線程轉移到其他線程中,即由worker線程來分擔非 I/O 邏輯的操作處理。
- 多個 reactor 線程(池化管理)
- 屬于網絡建連操作與網絡 I/O 讀寫操作的拆分,因為由一個reactor在一個線程中完成所有 I/O 操作也會遇到性能瓶頸,可采取拆分并增加reactor政策,将 I/O 負載配置設定給多個 reactor(每個reactor都有自己的線程、選擇器和排程循環)以達到負載平衡。這看起來挺不錯,但誰來執行配置設定以達到負載均衡呢?或許是因為這個問題,将reactor拆分為兩類角色,mainReactor負責接收連接配接,之後采用一定的負載均衡政策将新連接配接配置設定給其他subReactor來處理 I/O 讀寫,這樣的拆分自然流暢。
如此就演進出如上圖中的主從reactor多線程模型。請注意,結合【Scalable IO in Java】原文中的用詞和描述看,上圖中的mainReactor和subReactor可以有多個并做池化管理,所有也有一些文章中會看到如主ReactorGroup、mainReactorGroup、從ReactorGroup、subReactorGroup等這類名詞用 Group 字尾來強調 Reactor 是池化管理。 或許是不好布局,也或許是為了凸顯主從reactor角色的協作關系,上圖中都隻展示了一個,另外服務端應用通常隻暴露一個服務端口時,隻需用一個 mainReactor 來監聽端口上的連接配接事件并處理。
二、Netty 主從 reactor 多線程模型
Netty中reactor所對應的實作類是NioEventLoop,其核心邏輯如下:
- 不同類型的 channel 向 Selector 注冊所感興趣的事件
- 掃描是否有感興趣的事件發生
- 事件發生後做相應的處理
用戶端和服務端分别會有不同類型的channel,用戶端建立SocketChannel向服務端發起連接配接請求,服務端建立ServerSocketChannel監聽用戶端連接配接,建連後建立SocketChannel與用戶端的SocketChannel互相收發資料,這些channel分工不同,向 Selector 注冊所感興趣的事件情況也不同:
用戶端/服務端 | channel | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
用戶端 | SocketChannel | YES | |||
服務端 | ServerSocketChannel | YES | |||
服務端 | SocketChannel | YES | YES |
Netty中 Nio 方式實作幾種 reactor 模型如下:
mainReactor 對應 Netty 中配置的 bossGroup 線程組(下圖中的主ReactorGroup),主要負責接受用戶端連接配接的建立。每 bind 一個端口就用掉一個bossGroup中的線程。
subReactor 對應 Netty 中配置的 workerGroup 線程組(下圖中的 reactorGroup),bossGroup 線程組接受完用戶端的連接配接後,将 channel 轉交給 workerGroup 線程組,在 workerGroup 線程組内選擇一個線程,執行 I/O 讀寫的處理,workerGroup 線程組預設是 2 * CPU 核數個線程。
主從 reactor 模式的核心流程:
- 如果隻監聽一個端口,那麼隻需一個主reactor幹活兒,是以通常看到boosGroup隻配置一個線程。主reactor運作在獨立的線程中 ,該線程中隻負責與用戶端的連接配接請求
- 從reactor在伺服器端可以不止一個, 通常運作多個從 reactor , 每個從 reactor 也運作在一個獨立的線程中 ,負責與用戶端的讀寫操作
- 主 reactor 檢測到用戶端的連結後,建立 NioSocketChannel,按照一定的算法循環選取(負載均衡)一個從reactor,并把剛建立的NioSocketChannel 注冊到這個從 reactor 中,這樣建連和讀寫事件互不影響。
- 一個 reactor 中可被注冊多個NioSocketChannel,這個 reactor 監聽所有的被配置設定的 NioSocketChannel 的讀寫事件 , 如果監聽到用戶端的資料發送事件 , 将對應的業務邏輯轉發給 NioSocketChannel 中的pipeline 裡的 handler 鍊進行處理
- handler 最好隻負責響應 I/O 事件,不處理具體的與用戶端互動的業務邏輯 , 這樣不會長時間阻塞 , 其 read 方法讀取用戶端資料後 , 将消息資料交給業務線程池去處理相關業務邏輯
- 業務線程池完成相關業務邏輯的處理後,将結果傳回,通過NioSocketChannel的的pipeline 裡的 handler 鍊将結果消息寫回給用戶端
- 當buffer不滿足将結果消息寫回給用戶端時的條件時,注冊寫事件,等待可寫時再寫
三、Seata Server 端 的 reactor 模式應用
Seata Server 采用了 主從 reactor 多線程模型,對應這個模型的話是有四個線程池,其中自定義業務線程池是兩個。
功能 | 線程池對象 | 備注 |
接收用戶端連接配接 | NettyServerBootstrap#eventLoopGroupBoss | |
處理 IO 事件 | NettyServerBootstrap#eventLoopGroupWorker | 部分 RPC 消息在這裡處理 |
處理用戶端的 request 消息 | AbstractNettyRemoting#messageExecutor | 用戶端主動發給的消息 |
處理用戶端的 response 消息 | NettyRemotingServer#branchResultMessageExecutor | 服務端主動發給用戶端消息,用戶端處理後給服務端響應 |
3.1、NettyServerBootstrap#eventLoopGroupBoss
筆者的環境未啟用 epoll,關鍵資訊如下:
- 線程數:1,隻監聽一個端口
- 線程名字首:“NettyBoss”
this.eventLoopGroupBoss = new NioEventLoopGroup(
//CONFIG.getInt("transport.threadFactory.bossThreadSize", 1);
nettyServerConfig.getBossThreadSize(),
new NamedThreadFactory(
// CONFIG.getConfig("transport.threadFactory.bossThreadPrefix", "NettyBoss");
nettyServerConfig.getBossThreadPrefix(),
//CONFIG.getConfig("transport.threadFactory.bossThreadSize", 1);
nettyServerConfig.getBossThreadSize())
);
複制代碼
3.2、NettyServerBootstrap#eventLoopGroupWorker
筆者的環境未啟用 epoll,關鍵資訊如下:
- 線程數:預設值是 cpu 核數 * 2
- 線程名字首:“NettyServerNIOWorker”
this.eventLoopGroupWorker = new NioEventLoopGroup(
// System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//預設值cpu核數*2
nettyServerConfig.getServerWorkerThreads(),
new NamedThreadFactory(
// CONFIG.getConfig("transport.threadFactory.workerThreadPrefix",
// enableEpoll() ? EPOLL_WORKER_THREAD_PREFIX : DEFAULT_NIO_WORKER_THREAD_PREFIX);
// 預設值 NettyServerNIOWorker ,沒有啟用 epoll
nettyServerConfig.getWorkerThreadPrefix(),
//System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//預設值 cpu核數*2
nettyServerConfig.getServerWorkerThreads())
);
複制代碼
3.3、AbstractNettyRemoting#messageExecutor
此線程池處理用戶端的 request 消息,關鍵參數資訊如下:
- 線程數:50 ~ 500
- keepAlive:500 秒
- 線程名字字首: "ServerHandlerThread"
- 隊列長度: 500
- 拒絕政策:CallerRunsPolicy(),飽和的情況下,調用者來執行該任務,即 Netty 的 I/O 線程
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
//Integer.parseInt(System.getProperty("transport.minServerPoolSize", "50"));
NettyServerConfig.getMinServerPoolSize(),
//Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
NettyServerConfig.getMaxServerPoolSize(),
//Integer.parseInt(System.getProperty("transport.keepAliveTime", "500"));
NettyServerConfig.getKeepAliveTime(),
TimeUnit.SECONDS,
//Integer.parseInt(System.getProperty("transport.maxTaskQueueSize", "500"));
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory(
"ServerHandlerThread",
//Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
NettyServerConfig.getMaxServerPoolSize()),
//飽和的情況下,調用者來執行該任務,即Netty的IO線程
new ThreadPoolExecutor.CallerRunsPolicy()
);
複制代碼
3.4、NettyRemotingServer#branchResultMessageExecutor
此線程池處理用戶端的 response 消息,關鍵參數資訊如下:
- 線程數:cpu 核數2 ~ cpu 核數2
- keepAlive:500 秒
- 線程名字字首: "BranchResultHandlerThread"
- 隊列長度: 20000
- 拒絕政策:CallerRunsPolicy(),飽和的情況下,調用者來執行該任務,即 Netty 的 IO 線程
private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(
//System.getProperty("transport.minBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),預設值 cpu核數*2
NettyServerConfig.getMinBranchResultPoolSize(),
//System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),預設值 cpu核數*2
NettyServerConfig.getMaxBranchResultPoolSize(),
// System.getProperty("transport.keepAliveTime", "500"),預設值500
NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(
//System.getProperty("transport.maxTaskQueueSize", "20000"),預設值 20000
NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory(
// 分支響應消息的處理線程的名字字首 BranchResultHandlerThread
"BranchResultHandlerThread",
// System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),預設值 cpu核數*2
NettyServerConfig.getMaxBranchResultPoolSize()
),
//飽和的情況下,調用者來執行該任務,即Netty的IO線程
new ThreadPoolExecutor.CallerRunsPolicy()
);
複制代碼
3.5、業務線程池如何處理消息
3.5.1、登記消息處理器
Seata 消息處理的核心邏輯是:定義好什麼類型的消息,使用哪個消息處理器,這個消息處理器的消息處理邏輯在哪個線程池中執行。這個映射關系通過AbstractNettyRemoting#processorTable來存儲。
/**
* 可以接收什麼類型的消息,以及使用哪個消息處理器和線程池來處理消息
* HashMap<消息類型, Pair<消息處理器, 線程池>>
* processor type {@link MessageType}
*/
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
複制代碼
各子產品 Netty 元件啟動前,通過AbstractNettyRemotingServer#registerProcessor方法登記到這個結構中。
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}
複制代碼
拿 Seata Server 來說,如在ServerBootStrap啟動前,通過NettyRemotingServer#registerProcessor注冊好消息處理器。不同消息對應的處理器的線程池也不同,也有一些消息沒有指定業務線程池(沒必要),情況如下:
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 3. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
複制代碼
3.5.2、處理消息
當 Seata Server 收到用戶端發送的 RPC 消息後,會進入AbstractNettyRemotingServer.ServerHandler#channelRead中,在這裡對消息類型簡單判斷後,委托給processMessage處理。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
// 收到消息後,委托給 processMessage 處理
processMessage(ctx, (RpcMessage) msg);
}
複制代碼
processMessage中通過消息類型找到消息處理器進行業務層處理:
- 如果消息處理器有指定的業務線程池,在指定的業務線程池中處理消息
- 若消息處理器沒有指定的業務線程池,則在 I/O 線程中直接處理。
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
...
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 通過消息類型找到消息處理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 如果消息處理器有指定的業務線程池
if (pair.getSecond() != null) {
try {
// 在指定的業務線程池中處理消息
pair.getSecond().execute(() -> {
...
pair.getFirst().process(ctx, rpcMessage);
...
});
} catch (RejectedExecutionException e) {
...
}
} else {
try {
//若消息處理器沒有指定的業務線程池,則在I/O現成中直接處理。
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
...
}
}
} else {
...
}
} else {
...
}
}
四、Seata client 端的 reactor 模式應用
Seata client 端也采用了 reactor 多線程模型,在初始化的時候有RmNettyRemotingClient和TmNettyRemotingClient兩個對象,分别會建立各自的 Bootstrap,RM 和 TM 各有自己的 I/O 線程池和業務線程池。
功能 | 線程池對象 | 備注 |
處理 IO 事件 | NettyClientBootstrap#eventLoopGroupWorker | |
處理業務消息 | AbstractNettyRemoting#messageExecutor |
源碼裡還有個NettyClientBootstrap#defaultEventExecutorGroup,沒看出來哪裡有用。TmNettyRemotingClient#getInstance()中建構了 TM 的業務線程池,指派給NettyClientBootstrap#messageExecutor,同樣RmNettyRemotingClient#getInstance()中建構了 RM 的業務線程池
4.1、NettyClientBootstrap#eventLoopGroupWorker
用戶端此線程池關鍵資訊如下:
- 線程數:1
- 線程名字字首: TM:"NettyClientSelector_TMROLE" RM:"NettyClientSelector_RMROLE"
// 單I/O線程
this.eventLoopGroupWorker = new NioEventLoopGroup(
//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize,
new NamedThreadFactory(
// CONFIG.getConfig("transport.threadFactory.clientSelectorThreadPrefix", "NettyClientSelector");
// 再拼上角色後預設值為:"NettyClientSelector_TMROLE"
getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize)
);
4.2、AbstractNettyRemoting#messageExecutor
TmNettyRemotingClient#getInstance()和RmNettyRemotingClient#getInstance()建立各自的線程池,配置并不相同。
1)TmNettyRemotingClient#getInstance()中所建立線程池的關鍵資訊如下:
- 線程數:預設值是 cpu 核數 _ 2 ~ cpu 核數 _ 2
- keepAlive:Integer.MAX_VALUE 秒
- 線程名字字首:rpcDispatch_TMROLE
- 隊列長度: 2000
- 拒絕政策:runsOldestTaskPolicy(),飽和的情況下,添加新任務并由投遞任務的線程運作最早的任務。
public static TmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (TmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 自定義TM業務線程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//2000
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),// TM的線程名是:rpcDispatch_TMROLE
nettyClientConfig.getClientWorkerThreads()),// 預設是cpu核數 * 2
RejectedPolicies.runsOldestTaskPolicy());//添加新任務并由主線程運作最早的任務。
instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
2)RmNettyRemotingClient#getInstance() 中所建立線程池的關鍵資訊如下:
- 線程數:預設是 cpu 核數 _ 2 ~ cpu 核數 _ 2
- keepAlive:Integer.MAX_VALUE 秒
- 線程名字字首:rpcDispatch_RMROLE
- 隊列長度: 20000
- 拒絕政策:CallerRunsPolicy(),飽和的情況下,調用者來執行該任務,即 Netty 的 IO 線程。
public static RmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (RmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 自定義RM業務線程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
nettyClientConfig.getClientWorkerThreads(), // 預設是cpu核數 * 2
KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//20000
new NamedThreadFactory(
nettyClientConfig.getRmDispatchThreadPrefix(),// RM的線程名是:rpcDispatch_RMROLE,
nettyClientConfig.getClientWorkerThreads()),// 預設是cpu核數 * 2
new ThreadPoolExecutor.CallerRunsPolicy());////飽和的情況下,調用者來執行該任務,即Netty的IO線程
instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
4.3、消息處理
TmNettyRemotingClient和RmNettyRemotingClient在init()方法中會調用registerProcessor()方法注冊各自的 RPC 消息處理器。收到 RPC 消息後就由這些處理器+對應的線程池做後續處理,消息的相關業務屬性在後續的事務流程中介紹。
五、支撐特殊能力的業務線程池
1)AbstractNettyRemotingClient#mergeSendExecutorService
用于批量發送請求,多個消息合并,減少通信次數。實作邏輯比較清晰,當允許發送批量消息時,消息首先分桶儲存到 basketMap,在一個周期性的無線循環中,把 basketMap 中的消息隊列取出來,把每個隊列的消息都放到 mergeMessage 中,最後把 mergeMessage 發送出去。
- 線程數:1
- 線程名字首:”rpcMergeMessageSend“
- AbstractNettyRemotingClient中功能相關的屬性介紹: Object mergeLock:發送請求的鎖對象。 Map<Integer, MergeMessage> mergeMsgMap:當發送消息的類型是 MergeMessage,那麼就将消息儲存到 mergeMsgMap。 ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap:當允許發送批量消息時,消息首先分桶儲存到 basketMap,然後通過定時任務将儲存 basketMap 的消息發送出去。basketMap 的是伺服器的位址,value 是儲存的發送個伺服器的消息。按照位址分桶是将要發給同一個伺服器的多個消息合并到一個MergedWarpMessage後發送。
- 有配置開關,預設值如下:
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
對應的關鍵代碼邏輯如下:
- 在AbstractNettyRemotingClient#sendSyncRequest中,同步發送時将消息緩存起來,預設配置看隻有 RM 開啟了消息合并發送,另外同步發送逾時設定,預設 TM 30 秒,RM 15 秒。按照 IP 位址分桶,同一個目标執行個體的消息才可以合并發送
public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
// 同步發送逾時設定,預設 TM 30秒,RM 15秒
long timeoutMillis = this.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
// 預設隻有RM開啟了消息合并發送,TM 并未開啟批發送
if (this.isEnableClientBatchSendRequest()) {
// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// put message into basketMap
// 按照目标位址分桶,同一個TC執行個體的消息才可以合并發送
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
// 通知合并發送線程 有消息要發送,醒來幹活兒
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
// 阻塞等待消息的響應。
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}",
exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
// 不合并發送的話,就擷取指定IP的channel,并立即發送。
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
- 在AbstractNettyRemotingClient#init中建構線程池mergeSendExecutorService,在這個線程池中執行消息的批處理(消息合并、消息發送)。
public void init() {
...
// 通過線程池有1個線程,執行消息合并發送
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,//1
MAX_MERGE_SEND_THREAD,//1
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(
//TM : rpcMergeMessageSend_TMROLE
//RM : rpcMergeMessageSend_RMROLE
//SERVER : rpcMergeMessageSend_SERVERROLE
getThreadPrefix(),
MAX_MERGE_SEND_THREAD)//1
);
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
- 批處理任務MergedSendRunnable中,實作了消息合并和消息發送
private class MergedSendRunnable implements Runnable {
@Override
public void run() {
while (true) {
//mergeLock 用于生産-消費的協作
synchronized (mergeLock) {
try {
// MAX_MERGE_SEND_MILLS = 1,還有線程休眠的效果
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
isSending = true;
// 發送消息,消息是按照IP位址分組
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}
MergedWarpMessage mergeMessage = new MergedWarpMessage();
//如果basket隊列不為空,将其中的消息全取出來,添加到mergeMessage中
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
// debug 列印本次發送的消息個數和每個消息的Id,以及此時在futures中做逾時管控的所有消息的Id,
// 兩個消息Id比對,可知道消息積壓情況9666
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// 擷取指定位址的channel對象,異步發送消息
// 發送批量消息是同步的請求,但是這裡不需要得到傳回的值,在消息儲存到basketMap之前,已經建立了messageFuture了,
// 傳回值将會從ClientOnResponseProcessor中得到
sendChannel = clientChannelManager.acquireChannel(address);
// 因為原始消息的發送已經加入過逾時管控,是以批量發送環節不再需要加入額外的逾時控制
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
// 發生異常,快速将儲存在mergeMessage的消息清理掉
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
isSending = false;
}
}
2)AbstractNettyRemoting#timerExecutor
Netty 的 I/O 操作異步的,RPC 消息的發送操作會對應一個 Future 對象,在 Seata 中這個 Futrue 對象被封裝為 MessageFuture,需同步發送的消息,其對應的 MessageFuture 被放入 map 緩存起來,當收到消息的 response 後,将消息從 map 中移除。AbstractNettyRemoting#timerExecutor裡的這個線程定時巡檢 map 中的消息,若逾時未收到 response 則認定為發送逾時。
- 線程數:1
- 線程名字首:”timeoutChecker“
- scheduleAtFixedRate :延遲 3 秒,頻率 3 秒
- AbstractNettyRemoting中的功能相關的屬性介紹: ScheduledExecutorService timerExecutor:執行定時任務,消息發送以後,到了過期時間還沒有傳回,則會對消息進行清理。 ConcurrentHashMap<Integer, MessageFuture> futures:儲存着不同消息,timerExecutor 會清理 futures 中過期的消息。
對應的關鍵代碼邏輯如下:
- 建構定時任務的線程池AbstractNettyRemoting#timerExecutor,隻用 1 個線程
/**
* 定時器,用于巡檢消息的發送是否逾時
*/
protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("timeoutChecker", 1, true));
- 通過AbstractNettyRemoting#sendSync同步發送消息,建構MessageFuture并放入futures這個 map 中,發送過程配置監聽器 用于處理 channel 異常,指定失敗原因并從futures中移除,還要銷毀 channel。
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
...
// 建構 MessageFuture
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
// 放入 futures 這個map中
futures.put(rpcMessage.getId(), messageFuture);
//檢查通道是否可以寫
channelWritableCheck(channel, rpcMessage.getBody());
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
//在請求發送之前執行鈎子
doBeforeRpcHooks(remoteAddr, rpcMessage);
// 發送請求,并配置監聽器 用于處理channel異常
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
// 這裡響應不成功,基本是channel不正常了
if (!future.isSuccess()) {
//移除消息
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
messageFuture1.setResultMessage(future.cause());
}
//響應不成功,則銷毀channel
destroyChannel(future.channel());
}
});
...
//擷取響應結果
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
//響應之後執行鈎子
doAfterRpcHooks(remoteAddr, rpcMessage, result);
...
}
- 正常收到 response 後,給MessageFuture對象指派,從futures中移除,如ClientOnResponseProcessor#process中的實作
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
...
// 從futures中移除
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
// 指派結果
messageFuture.setResultMessage(rpcMessage.getBody());
}
}
- 在AbstractNettyRemoting#init中開啟定時任務,巡檢出futures 這個 map 中的逾時對象後從 futures 中移除,不再檢查,并指定結果為 TimeoutException
public void init() {
// 檢測消息同步發送(sendSync(xxx))是否逾時,
// 定時任務預設是延遲3秒,間隔3秒
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
MessageFuture future = entry.getValue();
if (future.isTimeout()) {
// 如果過期了則将發送結果設定為TimeoutException
// 從futures中移除,不再檢查
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String
.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}
還有線程池跟服務注冊發現和建連相關,會後邊篇章再介紹。