天天看點

48. 源代碼解讀-RocketMQ-client接收消息流程

消費消息可以分成pull和push方式,push消息使用比較簡單,因為RocketMQ已經幫助我們封裝了大部分流程,我們隻要重寫回調函數即可。

下面我們就以push消費方式為例,分析下這部分源代碼流程。

48. 源代碼解讀-RocketMQ-client接收消息流程

我們接着看consumer.start()方法

DefaultMQPushConsumerImpl.java

在初始化一堆參數之後,然後調用mQClientFactory.start();

其實這個命名有點奇怪啊(阿裡程式員手抖了?),為什麼MQClientInstance類型的變量名稱叫mQClientFactory ...

那繼續看MQClientInstance的start

各行代碼的作用就像源代碼裡面的注釋一樣,重點看下pullMessageService.start和rebalanceService.start

pullMessageService.start作用是不斷從一個阻塞隊列裡面擷取pullRequest請求,然後去RocketMQ broker裡面擷取消息。

如果沒有pullRequest的話,那麼它将阻塞。

那麼,pullRequest請求是怎麼放進去的呢?這個就要看rebalanceService了。

順便說一句,pullMessageService和rebalanceService都是繼承自ServiceThread

ServiceThread簡單封裝了線程的啟動,調用start方法,就會調用它的run方法。

這樣啟動線程就要友善一點,看起來舒服一點。

嗯,繼續分析之前的分析。

從pullMessageService的run方法可以看出它是從阻塞隊列pullRequestQueue裡面擷取pullRequest,如果沒有那麼将阻塞。(如果不清楚java阻塞的使用,清百度)

執行完一次pullReqeust之後,再繼續下一次擷取阻塞隊列,因為它是個while循環。

是以,我們需要分析下pullRequest放進隊列的流程,也就是rebalanceService.

MQClientInstance.java

RebalanceImpl.java

一路跟下來,來到了RebalanceImpl.java的rebalanceByTopic方法,這個方法裡面有兩個case(Broadcasting和Clustering)也就是消息消費的兩個模式,廣播和叢集消息。

廣播的話,所有的監聽者都會收到消息,叢集的話,隻有一個消費者可以收到,我們以叢集消息為例。

先大概解釋下在rebalanceByTopic裡面要做什麼。

從namesrv擷取broker裡面這個topic的消費者數量

從namesrv擷取broker這個topic的消息隊列數量

根據前兩部擷取的資料進行負載均衡計算,計算出目前消費者用戶端配置設定到的消息隊列。

按照配置設定到的消息隊列,去broker請求這個消息隊列裡面的消息。

上面代碼厘米mqset就是這個topic的消費隊列,一般是4個,但是這個值是可以修改的,存儲的位置在~/store/config/topics.json裡面,比如:

可以修改readQueueNums和writeQueueNums為其他值

這段代碼就是用戶端根據擷取到的這個topic消費者數量和消息隊列數量,使用負載均衡政策計算出目前用戶端能夠使用的消息隊列。

負載均衡政策代碼在這個位置。

48. 源代碼解讀-RocketMQ-client接收消息流程

那我們繼續4.4 pullMessageService.start分析,因為rebalanceService已經把pullRequest放到了阻塞隊列。

調用到DefaultMQPushConsumerImpl.pullMessage(pullRequest)這個方法裡面。

上面這段代碼主要就是設定消息擷取後的回調函數PullCallback pullCallback,然後調用pullAPIWrapper.pullKernelImpl去Broker裡面擷取消息。

擷取成功後,就會回調pullCallback的onSuccess方法的FOUND case分支。

在pullCallback的onSucess方法的FOUND case分支,會根據回調是同步還是異步,分為兩種情況,如下:

同步消息和異步消息差別的源代碼實作以後再講。

     本文轉自rongwei84n 51CTO部落格,原文連結:http://blog.51cto.com/483181/2056301,如需轉載請自行聯系原作者