基本使用
首先看看consumer的簡單使用:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 使用consumer group初始化DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 指定namesrv
consumer.setNamesrvAddr("localhost:9876");
// 訂閱topic
consumer.subscribe("Topic", "*");
// 注冊消費消息的回調
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
基本概念
消息擷取模式
1.Consumer有兩種消費擷取模式:
- DefaultMQPushConsumer:類似于Broker Push消息到Consumer方式,但實際仍然是Consumer内部背景從Broker Pull消息,采用長輪詢方式拉消息,實時性同push方式一緻,且不會無謂的拉消息導緻Broker、Consumer壓力增大
- DefaultMQPullConsumer:主動拉取模式
2.消費模式
廣播消費:Rocketmq會将消息發送給Group中的每一個消費者
叢集消費:同一條消息,隻能被Group中的任意一個消費者消費
3.Topic
RocketMQ中都是通過Topic來發送和消費消息的,但是Topic也僅僅是邏輯上的一個概念,而1個Topic下又包含了若幹個邏輯隊列,即消息隊列,消息内容實際是存放在隊列中的(儲存的消息資料實際上不是真正的消息資料,而是指向commit log的消息索引),而隊列又存儲在Broker中的.
4.OffsetStore管理消息的Offset
- LocalFileOffsetStore:廣播模式下,每個消費者都會消費所有的Queue,是以他們隻需要儲存自己的offset就行了,是以本地就行;
- RemoteBrokerOffsetStore:但是叢集模式下,可能會消費者當機,重新開機,擴容等情況,是以不能将offset儲存到本地,使用RemoteBrokerOffsetStore的話,就可以将offset交給Broker儲存,交給Broker儲存的方法是persistAll(),這樣當消費者正常啟動後,從Broker哪裡擷取這個offset就行了。
DefaultMQPushConsumer啟動流程
DefaultMQPushConsumer.start方法裡面主要做一些類初始化的工作,先看看DefaultMQPushConsumer啟動方法:
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
// 複制訂閱關系
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
this.rpcHook); //@1
// 初始化Rebalance變量
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); //@2
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = ClientApisEnhancerFactory.getDefaultMQClientImpl(PullAPIWrapper.class,
mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); //@3
// 每次拉消息之後,都會進行一次過濾。
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
}
else {
// 廣播消費/叢集消費
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore =
new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore =
new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
// 加載消費進度
this.offsetStore.load();
// 啟動消費消息服務
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner()); //@4
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start(); //@5
boolean registerOK =
mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group["
+ this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mQClientFactory.start(); //@6
log.info("the consumer [{}] start OK", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
@1.執行個體化MQClientInstance
RocketMQ 在同一個 JVM 程序擁有一個 clientConfigId(用戶端ID)該JVM程序中不同的消息消費組的消息用戶端ID相同,因為在JVM程序中對于每一個 ClientConfig 隻會執行個體化一個 MQClientInstance。
@2.初始化rebalanceImpl
RebalanceService其實是一個線程的封裝,RocetMQ 中通過RebalanceService線程實作消費隊列負載。RebalanceService在DefaultMQPushConsumerImpl#start()方法中調用了MQClientInstance#start()方法,在MQClientInstance#start()方法中調用了RebalanceService#start()方法。這個線程就是隔一段時間執行以下MQClientInstance#doRebalance()方法,而具體的doRebalance方法實作是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的屬性
@3.初始化pullAPIWrapper、pullMessageService拉取消息
初始化pullAPIWrapper,pullAPIWrapper對Pull接口進行進一步的封裝,而pullMessageService是一個線程的封裝,開啟長輪詢拉消息服務,單線程異步拉取,在MQClientInstance#start()方法中調用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息
@4.初始化消息消費服務
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
Message儲存到ProcessQueue後,會使用ConsumeMessageService消費消息,ConsumeMessageService也是一個接口,它有2個實作,分别是ConsumeMessageOrderlyServiceConsumeMessageConcurrentlyService。根據監聽器MessageListener的類型,執行個體化對應的消息消費服務ConsumeMessageService:
-
ConsumeMessageOrderlyService
同一隊列的消息同一時刻隻能一個線程消費,可保證消息在同一隊列嚴格有序消費
-
ConsumeMessageConcurrentlyService
并發消費消息服務
@5.啟動消費消息服務
如果消息消費模式為叢集模式,啟動定時任務,預設每隔20s執行一次鎖定配置設定給自己的消息消費隊列。
1:根據目前負載的消息隊列,按照 Broker分類存儲在Map。負載的消息隊列在RebalanceService時根據目前消費者數量與消息消費隊列按照負載算法進行配置設定,然後嘗試對該消息隊列加鎖,如果申請鎖成功,則加入到待拉取任務中。
2:根據Broker擷取主節點的位址。
3:向Broker發送鎖定消息隊列請求,該方法會傳回本次成功鎖定的消息消費隊列,關于Broker端消息隊列鎖定實作見下文詳細分析。
4:周遊本次成功鎖定的隊列來更新對應的ProcessQueue的locked狀态,如果locked為false,則設定成true,并更新鎖定時間。
5:周遊mqs,如果消息隊列未成功鎖定,需要将ProceeQueue的locked狀态為false,在該處理隊列未被其他消費者鎖定之前,該消息隊列将暫停拉取消息。
@6.啟動MQClientInstance
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
}
this.mQClientAPIImpl.start();
this.startScheduledTask();
this.pullMessageService.start();
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId()
+ "] has been created before, and failed.", null);
default:
break;
}
}
}
- this.mQClientAPIImpl.start(); //遠端通信,NettyRemotingClient 啟動
- this.startScheduledTask(); //1.定時擷取Name Server位址 2.定時從Name Server擷取Topic路由資訊 3.定時清理下線的Broker,向所有Broker發送心跳資訊 4.定時持久化Consumer消費進度(廣播存儲到本地,叢集存儲到Broker)
-
this.pullMessageService.start(); //輪訓拉取消息
pullMessageService是一個線程的封裝,開啟長輪詢拉消息服務,單線程異步拉取,在MQClientInstance#start()方法中調用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息
-
this.rebalanceService.start(); //重平衡
RebalanceService其實是一個線程的封裝,RocketMQ 中通RebalanceService線程實作消費隊列負載。RebalanceService在DefaultMQPushConsumerImpl#start()方法中調用了MQClientInstance#start()方法,在MQClientInstance#start()方法中調用了RebalanceService#start()方法。這個線程就是隔一段時間執行以下MQClientInstance#doRebalance()方法,而具體的doRebalance方法實作是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的屬性