本文基于RocketMQ 4.7.1版本
上一篇文章介紹了broker的啟動,本文介紹broker如何處理收到的生産者消息。
文章目錄
- 一、進入處理器前的準備
- 二、進入處理器
-
- 1、校驗
- 2、加鎖
- 3、寫入消息檔案
- 4、刷盤
一、進入處理器前的準備
broker收到生産者發送的消息後,會将消息轉發給SendMessageProcessor,由它儲存到檔案中。
不過在轉發前還要做一些準備工作:
- 解析請求封包,得到請求的類型,rocketmq根據類型找到對應的處理器,也就是SendMessageProcessor;
- 建立一個Runnable對象,在該對象的run方法裡面有對處理器的調用邏輯,在下面介紹該對象,可以認為這裡使用了指令模式,Runnable對象相當于指令對象;
- 判斷broker是否需要做流控,如果需要,則傳回生産者,告知目前系統正忙,是否做流控的判斷條件是:如果目前系統正在忙于向日志檔案裡面寫日志或者臨時存儲空間裡面空間不夠,都說明目前系統正忙;
- 将第二步建立的Runnable對象放入線程池,等待線程調用。
上面的第四步使用了異步線程處理生産者請求,而接收請求的IO線程隻是簡單的處理請求封包,丢入線程池後,接着處理下一個請求,這樣做的好處是使用少量的IO線程便可以接收大量的網絡請求。
異步處理在rocketmq中随處可見,其性能高和異步處理有很大的關系。
下面來看一下第二步提到的Runnable對象的處理邏輯。
Runnable run = new Runnable() {
@Override
public void run() {
try {
//調用RpcHook,
//RpcHook是在BrokerController.initialize()方法中通過ServiceProvider從檔案
//META-INF/service/org.apache.rocketmq.remoting.RPCHook中加載的,預設沒有注冊任何RpcHook執行個體
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
//建立回調對象,當處理器處理完成之後調用回調方法
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
//調用RpcHook
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
//如果是需要傳回結果的RPC調用,則将response傳回給請求方
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
//列印日志代碼删減
}
} else {
}
}
}
};
//檢查是否是異步處理器
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
//調用處理器邏輯
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
//調用處理器邏輯
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
//代碼删減
//主要是列印日志,并傳回請求方報錯資訊
}
}
}
}
從上面代碼可以看到,Runnable對象主要是調用處理器,并将結果傳回給調用方。Runnable對象是在異步線程中執行的。
SendMessageProcessor處理器繼承了AbstractSendMessageProcessor,是以上面的代碼會調用asyncProcessRequest方法。不過該處理器也提供了processRequest方法,其實這兩個方法調用的底層邏輯完全一樣,隻是做了不同的封裝而已。
二、進入處理器
先看一下處理器的整個處理流程,然後再對其中的部分内容詳細介紹。
1、校驗
SendMessageProcessor首先需要做一系列的校驗,隻有校驗通過的消息才可以寫入消息檔案,下面列舉的是所要進行的校驗:
- 如果配置了啟動延時參數startAcceptSendRequestTimeStamp,且目前系統時間小于該參數,那麼broker會傳回一個失敗資訊,表示現在broker還不能提供服務;
- 如果broker的權限是隻讀(通過參數brokerPermission配置),且主題是有序的,那麼傳回失敗;
- 主題名不能為空,不能超過127個字元,隻能包含數字、大小寫字母、%、中劃線和下劃線;
- 檢驗主題是否接收消息,一些系統主題是不接收消息的;
- 校驗主題是否存在,如果不存在則建立該主題;
- 如果隊列号大于目前存在的最大隊列号,則傳回隊列号不合法的錯誤資訊;
- 如果隊列号小于0,則随機生成一個;
- 檢查DefaultMessageStore是否關閉,是否允許寫入,目前broker角色是否是slave,如果檢查失敗則傳回錯誤資訊,DefaultMessageStore是消息存儲器,消息寫入檔案都是通過該對象完成;
- 檢查目前系統是否繁忙,系統寫入某個消息或一批消息時,如果長時間未能寫入完成,則表示系統繁忙;
- 消息内容字的節長度是否超過最大值,預設是4M;
2、加鎖
當通過了校驗之後,消息就可以寫入檔案了。寫入前,要先上鎖,保證同時隻能一個線程寫入。rocketmq提供了兩種上鎖的方法,一種是自旋鎖,一種是可重入鎖,可以通過參數useReentrantLockWhenPutMessage配置,預設是false,也就是使用自旋鎖。
3、寫入消息檔案
加鎖成功後,判斷目前活動檔案是否已經寫滿,預設檔案最大是1G。如果已經寫滿,則要建立一個新檔案,這裡說一下消息檔案的檔案名命名規則:檔案名由20位數字組成,檔案名表示下一個要寫入的消息的位移。建立消息檔案,rocketmq提供了兩種方式:一種是同步,一種是異步,預設是異步方式。同步方式很簡單,直接建立對應的檔案即可,異步是通過AllocateMappedFileService完成的。使用異步方式時,将需要建立的檔案大小及檔案名放入一個隊列中,AllocateMappedFileService啟動異步線程讀取該隊列,根據要求建立對應的檔案,AllocateMappedFileService與同步方式還有一點不同,同步方式一次隻建立一個檔案,而AllocateMappedFileService會将兩個建立檔案的請求放入隊列中,一個是目前需要的檔案,一個是目前檔案寫滿後,下一個要寫入的檔案。
當得到目前可以寫入的活動消息檔案後,就可以将消息寫入了。與上面類似,寫入消息也分了兩種方式,一種是直接寫入檔案,一種是寫入記憶體緩存。使用第二種方式需要設定參數transientStorePoolEnable=true,預設是false。使用第二種方式還需要其他的配置才能生效,下面會再介紹。
4、刷盤
檔案寫入結束之後,下面一個非常重要的工作就是刷盤(本文不對如何同步消息給slave進行介紹)。
檔案寫入時,隻是寫入了磁盤緩存,如果不刷盤,一旦停機掉電就會丢失資料。是以每次寫完消息之後,都要刷盤。
刷盤也分了兩種方式,一種同步,一種異步。可以通過參數flushDiskType配置兩種方式。同步刷盤是将重新整理請求放入一個隊列,異步線程讀取該隊列請求重新整理磁盤,此時線程會阻塞,直到異步線程重新整理完成。異步刷盤根據transientStorePoolEnable的值的不同,重新整理方式也不同,當設定transientStorePoolEnable=false時,異步刷盤與同步刷盤使用相同的邏輯,隻不過異步刷盤線程不會阻塞而是直接傳回。如果設定transientStorePoolEnable=true,那麼rocketmq使用CommitRealTimeService重新整理磁盤。還記得上面介紹消息寫入檔案時,第二種方式是寫入記憶體緩存,如果使用了記憶體緩存,那麼刷盤必須使用CommitRealTimeService,否則緩存内容無法重新整理進磁盤。是以要想使用記憶體緩存方式寫消息,必須設定transientStorePoolEnable=true和flushDiskType=ASYNC_FLUSH。因為記憶體緩存是分塊的,CommitRealTimeService一次寫入一塊,當把所有的都寫入完成之後,調用flush方法重新整理。
相對來說,一旦當機,CommitRealTimeService可能丢失的消息會多一些,是以如果不允許消息丢失,則必須使用同步刷盤,如果可以容忍較少的消息丢失,那麼可以異步刷盤并設定transientStorePoolEnable=false。