天天看點

rocketmq之源碼分析consumer源碼解析注釋(十一)

consumer的正常訂閱消息的總體操作流程:

構造初始化-》注冊監聽-》啟動-》無限循環請求隊列-》長連接配接的資料拉取

一,構造初始化

    DefaultMQPushConsumer:正常的訂閱消息,需要制定唯一的分組名稱

        最終構造的對象是DefaultMQPushConsumerImpl:

            核心的功能實作,整合内部多個元件

    配置消息的消息開始位置:ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET

    配置namesrv位址:拉取對應的配置内容和關系結構

    訂閱内容:基于特定topic的訂閱,然後内置了表達式引擎(過濾内容)

    注冊監聽:該監聽是監聽到有消息時的自有業務邏輯處理

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wang-group-consmuer");

/*
 * Specify where to start in case the specified consumer group is a brand new one.
 */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//指定nameserver的位址,所有的資料互動都是基于nameserve來進行的資訊擷取和更新及心跳
consumer.setNamesrvAddr("127.0.0.1:9876");

/*
 * Subscribe one more more topics to consume.
 */
consumer.subscribe("TopicTest", "*");

/*
 *  Register callback to execute on arrival of messages fetched from brokers.
 */
consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

/*
 *  Launch the consumer instance.
 */
consumer.start();      

構造函數的初始化

public DefaultMQPushConsumer(final String consumerGroup) {
    this(consumerGroup, null, new AllocateMessageQueueAveragely());
}      
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}      

實際實作服務的核心屬性

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    /**
     * Delay some time when exception occur
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
    /**
     * Flow control interval
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
    /**
     * Delay some time when suspend pull service
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
    private final InternalLogger log = ClientLogger.getLog();
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    //負載的選擇器
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
    private final RPCHook rpcHook;
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;

    //用戶端執行個體,複用
    private MQClientInstance mQClientFactory;
    //拉取服務請求的包裝
    private PullAPIWrapper pullAPIWrapper;
    private volatile boolean pause = false;
    private boolean consumeOrderly = false;
    //消息監聽
    private MessageListener messageListenerInner;
    //消息的偏移位置
    private OffsetStore offsetStore;
    //用戶端服務
    private ConsumeMessageService consumeMessageService;
    private long queueFlowControlTimes = 0;
    private long queueMaxSpanFlowControlTimes = 0;      

二,

啟動

    start:最終的核心啟動是DefaultMQPushConsumerImpl的啟動

        啟動和消息發送端的啟動類似,進行對應的初始化及啟動操作

            配置服務的執行狀态,内部拆分了四個狀态機制

            驗證關鍵配置,主要是影響到和新功能的配置的内容

            将配置的訂閱資訊拷貝到對應的監聽,對應的負載,消費等資料對象中

            生成執行個體id

            獲得MQClient的執行個體工程,和producer一緻,關鍵的核心操作實作,綜合體

            将核心的配置注入到負載服務中

            包裝拉取服務,包裝成獨立的綜合體

            獲得消息的開始消費偏移位置,用于消息拉取的請求參數

            根據消息的監聽類型,設定,分為兩類,順序監聽和并發監聽,并發監聽的效率更高

            注冊消費處理

            執行個體MQClient的工程方法啟動

            标示為目前服務為運作狀态

            更新訂閱的topic資訊

            檢查使用的broker資訊

            發送心跳内容給所有的broker

            馬上調用負載服務平衡

public void start() throws MQClientException {
    //消費端的核心啟動入口
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}      

核心功能的操作啟動

//消費端主動推送的啟動核心入口
public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

            //驗證核心關鍵配置
            this.checkConfig();

            //将配置中的訂閱資訊拷貝到推送服務中
            this.copySubscription();

            //生成id
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            //獲得用戶端工廠,共享的用戶端工程單例設計
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

            //将配置中的核心配置指派給負載中
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            //拉取消息的包裝api
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            //設定消息的開始位置
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            //加載消息讀取位置資訊
            this.offsetStore.load();

            //消息接受器的監聽類型
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            //消費者監聽服務啟動
            this.consumeMessageService.start();

            //注冊服務處理事件
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            //連結工程的啟動
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    //更新topic資訊到目前的記憶體資料結構中,便于後期的直接使用
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    //檢查需要連結的broker是否可用
    this.mQClientFactory.checkClientInBroker();
    //發送心跳資料給目前服務所連結的所有broker中,操作過程是基于安全鎖機制
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //喚醒負載平衡操作
    this.mQClientFactory.rebalanceImmediately();
}      

三,監聽

    業務根據RocketMQ的規範,根據業務特定實作的接口

    接口分為兩類,順序消息,并行消費

//消息接受器的監聽類型
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
            //有序消息
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
            //并行消息
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}      

并行消息

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    private static final InternalLogger log = ClientLogger.getLog();

    //消息的實際拉取操作
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    //消費消息的頂層結構
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    //并行消息接聽
    private final MessageListenerConcurrently messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    //線程池執行配置
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;

    //任務調用
    private final ScheduledExecutorService scheduledExecutorService;
    //任務排程清除逾時
    private final ScheduledExecutorService cleanExpireMsgExecutors;

    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                //清除逾時消息
                cleanExpireMsg();
            }

        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }      

四,長連接配接的拉取

    啟動入口在MQClient的連結工程的啟動中

        在啟動中指定負載服務的啟動中RebalanceService

            在操作中this.waitForRunning(waitInterval);預設是阻塞20000,但是在服務啟動後,執行了this.mQClientFactory.rebalanceImmediately();

            核心操作在重置負載中this.mqClientFactory.doRebalance();

            有一個前提,根據目前服務的模式,分為push自動領取,pull程式主動發起,還有一個條件就是是否是順序消息

            周遊訂閱資訊,重新負載topic資訊

            最核心的操作在更新queue到負載中,完成了初始化

            此時調用的是PullMessageService的執行拉取操作,該對象是核心

            後期的操作就是自動化請求,基于隊列的形式,持續的請求,完成後再放置下一次請求到隊列中,循環請求

//連結工程的啟動
mQClientFactory.start();      
//初始化用戶端請求執行個體
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    //mq的核心配置資訊
    this.clientConfig = clientConfig;
    //目前程序内的唯一辨別,升序數值
    this.instanceIndex = instanceIndex;
    //netty通信的用戶端配置
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());

    //解析用戶端請求,封裝的事件處理
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);

    //用戶端執行個體的實際實作,網絡通信的核心,隻是初始化了通信架構,具體的連結後面根據不同的位址再進行連結操作
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

    //設定核心的nameserv位址
    if (this.clientConfig.getNamesrvAddr() != null) {
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }

    this.clientId = clientId;

    //mq管理
    this.mQAdminImpl = new MQAdminImpl(this);

    //拉取消息的實作
    this.pullMessageService = new PullMessageService(this);

    //負載均衡的實作,可能有相關的機器增加删除,需要定期的進行重負載操作
    this.rebalanceService = new RebalanceService(this);

    //消息發送者的包裝,發送者的發送者,這個邏輯有點亂,并且在構造方法中重新初始化的
    //producer -> DefaultMQProducer -> DefaultMQProducerImpl -> MQClientInstance -> DefaultMQProducer
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);

    //消費用戶端的狀态管理
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
        this.instanceIndex,
        this.clientId,
        this.clientConfig,
        MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
//啟動發送消息的核心,同時也是訂閱消息的核心
public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                //啟動netty的用戶端配置
                // Start request-response channel
                this.mQClientAPIImpl.start();

                //啟動任務,更新,驗證,心跳等操作
                // Start various schedule tasks
                this.startScheduledTask();

                //消費端
                // Start pull service
                this.pullMessageService.start();

                //消費端,重新負載設定,請求的初始化操作也在此方法内執行
                // Start rebalance service
                this.rebalanceService.start();

                //前面已經初始化過操作,該入參為false,隻需要初始化其他操作
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}      

初始化操作類RebalanceService

public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    //消息的網絡操作及功能
    private final MQClientInstance mqClientFactory;

    //負載服務的初始化,構造是連接配接服務的執行個體,目前是線程的子類
    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            //等待執行
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}      

表現的狀态是在waiting,然後再指定負載,執行的負載操作是

//負載平衡重置
public void doRebalance() {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}      

此時consumerTable中的資料在前面的初始化啟動中進行了注冊操作

//注冊服務處理事件
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}      

執行平台操作

@Override
public void doRebalance() {
    if (!this.pause) {
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}      
public void doRebalance(final boolean isOrder) {
    //獲得訂閱的内容
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //操作對topic的負載
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    //基于資料的統一處理
    this.truncateMessageQueueNotMyTopic();
}      
private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                        consumerGroup,
                        topic,
                        mqSet,
                        mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        //預設的叢集模式
        case CLUSTERING: {
            //獲得目前topic的訂閱隊列資訊
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            //請求獲得topic的cid請求
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
            }

            if (null == cidAll) {
                log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                //排序操作
                Collections.sort(mqAll);
                Collections.sort(cidAll);

                //選擇cid的政策
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                        e);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }

                //是否有變化
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info(
                        "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                        strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                        allocateResultSet.size(), allocateResultSet);
                    //消息隊列的修改
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}      

主要的操作在修改處理消息隊列的變化boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {
    boolean changed = false;

    //獲得處理隊列的資料
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    //周遊隊列同時對消息對鞋進行修正
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();

        //針對相同的topic的處理
        if (mq.getTopic().equals(topic)) {
            if (!mqSet.contains(mq)) {
                pq.setDropped(true);
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                    it.remove();
                    changed = true;
                    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                }
            } else if (pq.isPullExpired()) {
                switch (this.consumeType()) {
                    case CONSUME_ACTIVELY:
                        break;
                    case CONSUME_PASSIVELY:
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                consumerGroup, mq);
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    }

    //構造拉取消息的請求
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }

            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    //封裝拉取的請求結構
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    //處理消息拉取請求
    this.dispatchPullRequest(pullRequestList);

    return changed;
}      

拉取的操作this.dispatchPullRequest(pullRequestList);是抽象的設計,需要根據目前的實作類進行實作

@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
        //實行消息的拉取操作
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}      

逐層調用實作

public void executePullRequestImmediately(final PullRequest pullRequest) {
    this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}      
//将請求直接添加到隊列中,這裡肯定是第一次初始化和後來無限次操作的入口
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}      

将最終的拉取請求添加的請求隊列中,等待請求隊列的掃描和執行

 最終的執行開始及操作,在啟動的最後一步執行this.mQClientFactory.rebalanceImmediately();

//喚醒負載
public void rebalanceImmediately() {
    this.rebalanceService.wakeup();
}      
public void wakeup() {
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}      

該喚醒操作和簽名的waiting操作是一一對應的操作,待服務完全啟動後,執行拉取的喚醒操作

版權聲明:本文為CSDN部落客「weixin_34378767」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_34378767/article/details/92641350