消費消息可以分成pull和push方式,push消息使用比較簡單,因為RocketMQ已經幫助我們封裝了大部分流程,我們隻要重寫回調函數即可。
下面我們就以push消費方式為例,分析下這部分源代碼流程。

我們接着看consumer.start()方法
DefaultMQPushConsumerImpl.java
在初始化一堆參數之後,然後調用mQClientFactory.start();
其實這個命名有點奇怪啊(阿裡程式員手抖了?),為什麼MQClientInstance類型的變量名稱叫mQClientFactory ...
那繼續看MQClientInstance的start
各行代碼的作用就像源代碼裡面的注釋一樣,重點看下pullMessageService.start和rebalanceService.start
pullMessageService.start作用是不斷從一個阻塞隊列裡面擷取pullRequest請求,然後去RocketMQ broker裡面擷取消息。
如果沒有pullRequest的話,那麼它将阻塞。
那麼,pullRequest請求是怎麼放進去的呢?這個就要看rebalanceService了。
順便說一句,pullMessageService和rebalanceService都是繼承自ServiceThread
ServiceThread簡單封裝了線程的啟動,調用start方法,就會調用它的run方法。
這樣啟動線程就要友善一點,看起來舒服一點。
嗯,繼續分析之前的分析。
從pullMessageService的run方法可以看出它是從阻塞隊列pullRequestQueue裡面擷取pullRequest,如果沒有那麼将阻塞。(如果不清楚java阻塞的使用,清百度)
執行完一次pullReqeust之後,再繼續下一次擷取阻塞隊列,因為它是個while循環。
是以,我們需要分析下pullRequest放進隊列的流程,也就是rebalanceService.
MQClientInstance.java
RebalanceImpl.java
一路跟下來,來到了RebalanceImpl.java的rebalanceByTopic方法,這個方法裡面有兩個case(Broadcasting和Clustering)也就是消息消費的兩個模式,廣播和叢集消息。
廣播的話,所有的監聽者都會收到消息,叢集的話,隻有一個消費者可以收到,我們以叢集消息為例。
先大概解釋下在rebalanceByTopic裡面要做什麼。
從namesrv擷取broker裡面這個topic的消費者數量
從namesrv擷取broker這個topic的消息隊列數量
根據前兩部擷取的資料進行負載均衡計算,計算出目前消費者用戶端配置設定到的消息隊列。
按照配置設定到的消息隊列,去broker請求這個消息隊列裡面的消息。
上面代碼厘米mqset就是這個topic的消費隊列,一般是4個,但是這個值是可以修改的,存儲的位置在~/store/config/topics.json裡面,比如:
可以修改readQueueNums和writeQueueNums為其他值
這段代碼就是用戶端根據擷取到的這個topic消費者數量和消息隊列數量,使用負載均衡政策計算出目前用戶端能夠使用的消息隊列。
負載均衡政策代碼在這個位置。
那我們繼續4.4 pullMessageService.start分析,因為rebalanceService已經把pullRequest放到了阻塞隊列。
調用到DefaultMQPushConsumerImpl.pullMessage(pullRequest)這個方法裡面。
上面這段代碼主要就是設定消息擷取後的回調函數PullCallback pullCallback,然後調用pullAPIWrapper.pullKernelImpl去Broker裡面擷取消息。
擷取成功後,就會回調pullCallback的onSuccess方法的FOUND case分支。
在pullCallback的onSucess方法的FOUND case分支,會根據回調是同步還是異步,分為兩種情況,如下:
同步消息和異步消息差別的源代碼實作以後再講。
本文轉自rongwei84n 51CTO部落格,原文連結:http://blog.51cto.com/483181/2056301,如需轉載請自行聯系原作者