天天看點

RocketMQ源碼解析之Consumer啟動

閱讀須知

  • 文章中使用注釋的方法會做深入分析

正文

Consumer 的啟動流程和 Producer 的啟動流程有很多複用的部分,前面我們已經分析過 Producer 的啟動流程,複用部分這裡不再重複。

使用 RocketMQ Consumer 消費消息的簡單配置如下,:

<bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start"
      destroy-method="shutdown">
    <property name="consumerGroup" value="${rocketmq.consumer.group}"/>
    <property name="namesrvAddr" value="${rocketmq.nameserver.address}"/>
    <property name="messageListener" ref="orderMessageListener"/>
    <property name="subscription">
        <map>
            <entry key="${rocketmq.topic}" value="*"/>
        </map>
    </property>
</bean>
           

我們看到配置中 init-method 配置了 start 方法,是以在 bean 的初始化過程中會調用 DefaultMQPushConsumer 的 start 方法:

public void start() throws MQClientException {
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    /* 啟動 */
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}
           

DefaultMQPushConsumerImpl:

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;
            // 一些配置的正确性校驗
            this.checkConfig();
            // 複制訂閱資訊,複制消息監聽器
            this.copySubscription();
            // 預設為叢集模式
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
            // 擷取并建立 client,與 Producer 啟動複用同樣的方法
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                // 根據不同的消息模式建立不同的偏移量存儲服務
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();
            // 判斷消息監聽器是順序消息|并發消息(普通消息)來建立對應的消息消費服務
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }
            // 消費服務啟動,對于并發消息消費服務,會啟動定時清理垃圾消息服務;順序消息我們單獨分析
            this.consumeMessageService.start();
            // 添加 consumerGroup 和消費者執行個體的映射
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                // 添加映射失敗停止消費服務,失敗的原因可能為同一個 consumerGroup 被多次添加,這是不允許的
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            // 啟動用戶端執行個體,和 Producer 啟動複用相同的流程
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            // 将狀态置為運作中
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
    // 更新topic路由資訊,主要通過向 NameServer 發送 GET_ROUTEINTO_BY_TOPIC 請求來擷取最新的 topic 路由資訊,然後更新内部消費者的訂閱資訊和生産者的釋出資訊
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    // 向 Broker 發送 CHECK_CLIENT_CONFIG 指令檢查用戶端的配置,主要檢查訂閱表達式的正确性,Broker 處理對應的指令的處理器為 ClientManageProcessor
    this.mQClientFactory.checkClientInBroker();
    // 向所有 Broker 發送心跳請求
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 啟動完成立刻做一次 rebanlance
    this.mQClientFactory.rebalanceImmediately();
}
           

rebanlance 是消息消費過程中一個重要的流程,我們單獨分析。Consumer 啟動流程和 Producer 啟動流程複用了很多邏輯,需要結合兩篇文章一起來看。

繼續閱讀