天天看點

【RocketMQ源碼分析】消費者

基本使用

首先看看consumer的簡單使用:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 使用consumer group初始化DefaultMQPushConsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 指定namesrv
        consumer.setNamesrvAddr("localhost:9876");

        // 訂閱topic
        consumer.subscribe("Topic", "*");
        // 注冊消費消息的回調
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 啟動consumer
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
           

基本概念

消息擷取模式

1.Consumer有兩種消費擷取模式:

  • DefaultMQPushConsumer:類似于Broker Push消息到Consumer方式,但實際仍然是Consumer内部背景從Broker Pull消息,采用長輪詢方式拉消息,實時性同push方式一緻,且不會無謂的拉消息導緻Broker、Consumer壓力增大
  • DefaultMQPullConsumer:主動拉取模式

2.消費模式

廣播消費:Rocketmq會将消息發送給Group中的每一個消費者

叢集消費:同一條消息,隻能被Group中的任意一個消費者消費

3.Topic

RocketMQ中都是通過Topic來發送和消費消息的,但是Topic也僅僅是邏輯上的一個概念,而1個Topic下又包含了若幹個邏輯隊列,即消息隊列,消息内容實際是存放在隊列中的(儲存的消息資料實際上不是真正的消息資料,而是指向commit log的消息索引),而隊列又存儲在Broker中的.

【RocketMQ源碼分析】消費者

4.OffsetStore管理消息的Offset

  • LocalFileOffsetStore:廣播模式下,每個消費者都會消費所有的Queue,是以他們隻需要儲存自己的offset就行了,是以本地就行;
  • RemoteBrokerOffsetStore:但是叢集模式下,可能會消費者當機,重新開機,擴容等情況,是以不能将offset儲存到本地,使用RemoteBrokerOffsetStore的話,就可以将offset交給Broker儲存,交給Broker儲存的方法是persistAll(),這樣當消費者正常啟動後,從Broker哪裡擷取這個offset就行了。

DefaultMQPushConsumer啟動流程

DefaultMQPushConsumer.start方法裡面主要做一些類初始化的工作,先看看DefaultMQPushConsumer啟動方法:

public void start() throws MQClientException {
        switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            // 複制訂閱關系
            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            this.mQClientFactory =
                    MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
                        this.rpcHook);  //@1

            // 初始化Rebalance變量
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());   //@2
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
                .getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = ClientApisEnhancerFactory.getDefaultMQClientImpl(PullAPIWrapper.class, 
            		mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());  //@3
            // 每次拉消息之後,都會進行一次過濾。
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            }
            else {
                // 廣播消費/叢集消費
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    this.offsetStore =
                            new LocalFileOffsetStore(this.mQClientFactory,
                                this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                case CLUSTERING:
                    this.offsetStore =
                            new RemoteBrokerOffsetStore(this.mQClientFactory,
                                this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                default:
                    break;
                }
            }
            // 加載消費進度
            this.offsetStore.load();

            // 啟動消費消息服務
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this,
                            (MessageListenerOrderly) this.getMessageListenerInner());   //@4
            }
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this,
                            (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();   //@5

            boolean registerOK =
                    mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group["
                        + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please."
                        + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
            }

            mQClientFactory.start();   //@6
            log.info("the consumer [{}] start OK", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                    + this.serviceState//
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
        default:
            break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.mQClientFactory.rebalanceImmediately();
    }
           

@1.執行個體化MQClientInstance

RocketMQ 在同一個 JVM 程序擁有一個 clientConfigId(用戶端ID)該JVM程序中不同的消息消費組的消息用戶端ID相同,因為在JVM程序中對于每一個 ClientConfig 隻會執行個體化一個 MQClientInstance。

@2.初始化rebalanceImpl

RebalanceService其實是一個線程的封裝,RocetMQ 中通過RebalanceService線程實作消費隊列負載。RebalanceService在DefaultMQPushConsumerImpl#start()方法中調用了MQClientInstance#start()方法,在MQClientInstance#start()方法中調用了RebalanceService#start()方法。這個線程就是隔一段時間執行以下MQClientInstance#doRebalance()方法,而具體的doRebalance方法實作是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的屬性

@3.初始化pullAPIWrapper、pullMessageService拉取消息

初始化pullAPIWrapper,pullAPIWrapper對Pull接口進行進一步的封裝,而pullMessageService是一個線程的封裝,開啟長輪詢拉消息服務,單線程異步拉取,在MQClientInstance#start()方法中調用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息

@4.初始化消息消費服務

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
            new ConsumeMessageOrderlyService(this,
                (MessageListenerOrderly) this.getMessageListenerInner());
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
            new ConsumeMessageConcurrentlyService(this,
                (MessageListenerConcurrently) this.getMessageListenerInner());
}
           

Message儲存到ProcessQueue後,會使用ConsumeMessageService消費消息,ConsumeMessageService也是一個接口,它有2個實作,分别是ConsumeMessageOrderlyServiceConsumeMessageConcurrentlyService。根據監聽器MessageListener的類型,執行個體化對應的消息消費服務ConsumeMessageService:

  • ConsumeMessageOrderlyService

    同一隊列的消息同一時刻隻能一個線程消費,可保證消息在同一隊列嚴格有序消費

  • ConsumeMessageConcurrentlyService

    并發消費消息服務

@5.啟動消費消息服務

如果消息消費模式為叢集模式,啟動定時任務,預設每隔20s執行一次鎖定配置設定給自己的消息消費隊列。

1:根據目前負載的消息隊列,按照 Broker分類存儲在Map。負載的消息隊列在RebalanceService時根據目前消費者數量與消息消費隊列按照負載算法進行配置設定,然後嘗試對該消息隊列加鎖,如果申請鎖成功,則加入到待拉取任務中。

2:根據Broker擷取主節點的位址。

3:向Broker發送鎖定消息隊列請求,該方法會傳回本次成功鎖定的消息消費隊列,關于Broker端消息隊列鎖定實作見下文詳細分析。

4:周遊本次成功鎖定的隊列來更新對應的ProcessQueue的locked狀态,如果locked為false,則設定成true,并更新鎖定時間。

5:周遊mqs,如果消息隊列未成功鎖定,需要将ProceeQueue的locked狀态為false,在該處理隊列未被其他消費者鎖定之前,該消息隊列将暫停拉取消息。

@6.啟動MQClientInstance

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                }

                this.mQClientAPIImpl.start();
                this.startScheduledTask();
                this.pullMessageService.start();
                this.rebalanceService.start();

                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId()
                        + "] has been created before, and failed.", null);
            default:
                break;
            }
        }
    }
           
  • this.mQClientAPIImpl.start(); //遠端通信,NettyRemotingClient 啟動
  • this.startScheduledTask(); //1.定時擷取Name Server位址 2.定時從Name Server擷取Topic路由資訊 3.定時清理下線的Broker,向所有Broker發送心跳資訊 4.定時持久化Consumer消費進度(廣播存儲到本地,叢集存儲到Broker)
  • this.pullMessageService.start(); //輪訓拉取消息

    pullMessageService是一個線程的封裝,開啟長輪詢拉消息服務,單線程異步拉取,在MQClientInstance#start()方法中調用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息

  • this.rebalanceService.start(); //重平衡

    RebalanceService其實是一個線程的封裝,RocketMQ 中通RebalanceService線程實作消費隊列負載。RebalanceService在DefaultMQPushConsumerImpl#start()方法中調用了MQClientInstance#start()方法,在MQClientInstance#start()方法中調用了RebalanceService#start()方法。這個線程就是隔一段時間執行以下MQClientInstance#doRebalance()方法,而具體的doRebalance方法實作是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的屬性

繼續閱讀