consumer的正常訂閱消息的總體操作流程:
構造初始化-》注冊監聽-》啟動-》無限循環請求隊列-》長連接配接的資料拉取
一,構造初始化
DefaultMQPushConsumer:正常的訂閱消息,需要制定唯一的分組名稱
最終構造的對象是DefaultMQPushConsumerImpl:
核心的功能實作,整合内部多個元件
配置消息的消息開始位置:ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
配置namesrv位址:拉取對應的配置内容和關系結構
訂閱内容:基于特定topic的訂閱,然後内置了表達式引擎(過濾内容)
注冊監聽:該監聽是監聽到有消息時的自有業務邏輯處理
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wang-group-consmuer");
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//指定nameserver的位址,所有的資料互動都是基于nameserve來進行的資訊擷取和更新及心跳
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
構造函數的初始化
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
實際實作服務的核心屬性
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
/**
* Flow control interval
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
/**
* Delay some time when suspend pull service
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
//負載的選擇器
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final RPCHook rpcHook;
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
//用戶端執行個體,複用
private MQClientInstance mQClientFactory;
//拉取服務請求的包裝
private PullAPIWrapper pullAPIWrapper;
private volatile boolean pause = false;
private boolean consumeOrderly = false;
//消息監聽
private MessageListener messageListenerInner;
//消息的偏移位置
private OffsetStore offsetStore;
//用戶端服務
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
二,
啟動
start:最終的核心啟動是DefaultMQPushConsumerImpl的啟動
啟動和消息發送端的啟動類似,進行對應的初始化及啟動操作
配置服務的執行狀态,内部拆分了四個狀态機制
驗證關鍵配置,主要是影響到和新功能的配置的内容
将配置的訂閱資訊拷貝到對應的監聽,對應的負載,消費等資料對象中
生成執行個體id
獲得MQClient的執行個體工程,和producer一緻,關鍵的核心操作實作,綜合體
将核心的配置注入到負載服務中
包裝拉取服務,包裝成獨立的綜合體
獲得消息的開始消費偏移位置,用于消息拉取的請求參數
根據消息的監聽類型,設定,分為兩類,順序監聽和并發監聽,并發監聽的效率更高
注冊消費處理
執行個體MQClient的工程方法啟動
标示為目前服務為運作狀态
更新訂閱的topic資訊
檢查使用的broker資訊
發送心跳内容給所有的broker
馬上調用負載服務平衡
public void start() throws MQClientException {
//消費端的核心啟動入口
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
核心功能的操作啟動
//消費端主動推送的啟動核心入口
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
//驗證核心關鍵配置
this.checkConfig();
//将配置中的訂閱資訊拷貝到推送服務中
this.copySubscription();
//生成id
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//獲得用戶端工廠,共享的用戶端工程單例設計
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//将配置中的核心配置指派給負載中
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//拉取消息的包裝api
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//設定消息的開始位置
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加載消息讀取位置資訊
this.offsetStore.load();
//消息接受器的監聽類型
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//消費者監聽服務啟動
this.consumeMessageService.start();
//注冊服務處理事件
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//連結工程的啟動
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//更新topic資訊到目前的記憶體資料結構中,便于後期的直接使用
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//檢查需要連結的broker是否可用
this.mQClientFactory.checkClientInBroker();
//發送心跳資料給目前服務所連結的所有broker中,操作過程是基于安全鎖機制
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//喚醒負載平衡操作
this.mQClientFactory.rebalanceImmediately();
}
三,監聽
業務根據RocketMQ的規範,根據業務特定實作的接口
接口分為兩類,順序消息,并行消費
//消息接受器的監聽類型
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
//有序消息
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
//并行消息
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
并行消息
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
//消息的實際拉取操作
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
//消費消息的頂層結構
private final DefaultMQPushConsumer defaultMQPushConsumer;
//并行消息接聽
private final MessageListenerConcurrently messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
//線程池執行配置
private final ThreadPoolExecutor consumeExecutor;
private final String consumerGroup;
//任務調用
private final ScheduledExecutorService scheduledExecutorService;
//任務排程清除逾時
private final ScheduledExecutorService cleanExpireMsgExecutors;
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//清除逾時消息
cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
四,長連接配接的拉取
啟動入口在MQClient的連結工程的啟動中
在啟動中指定負載服務的啟動中RebalanceService
在操作中this.waitForRunning(waitInterval);預設是阻塞20000,但是在服務啟動後,執行了this.mQClientFactory.rebalanceImmediately();
核心操作在重置負載中this.mqClientFactory.doRebalance();
有一個前提,根據目前服務的模式,分為push自動領取,pull程式主動發起,還有一個條件就是是否是順序消息
周遊訂閱資訊,重新負載topic資訊
最核心的操作在更新queue到負載中,完成了初始化
此時調用的是PullMessageService的執行拉取操作,該對象是核心
後期的操作就是自動化請求,基于隊列的形式,持續的請求,完成後再放置下一次請求到隊列中,循環請求
//連結工程的啟動
mQClientFactory.start();
//初始化用戶端請求執行個體
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
//mq的核心配置資訊
this.clientConfig = clientConfig;
//目前程序内的唯一辨別,升序數值
this.instanceIndex = instanceIndex;
//netty通信的用戶端配置
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
//解析用戶端請求,封裝的事件處理
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
//用戶端執行個體的實際實作,網絡通信的核心,隻是初始化了通信架構,具體的連結後面根據不同的位址再進行連結操作
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//設定核心的nameserv位址
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
//mq管理
this.mQAdminImpl = new MQAdminImpl(this);
//拉取消息的實作
this.pullMessageService = new PullMessageService(this);
//負載均衡的實作,可能有相關的機器增加删除,需要定期的進行重負載操作
this.rebalanceService = new RebalanceService(this);
//消息發送者的包裝,發送者的發送者,這個邏輯有點亂,并且在構造方法中重新初始化的
//producer -> DefaultMQProducer -> DefaultMQProducerImpl -> MQClientInstance -> DefaultMQProducer
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
//消費用戶端的狀态管理
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
//啟動發送消息的核心,同時也是訂閱消息的核心
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
//啟動netty的用戶端配置
// Start request-response channel
this.mQClientAPIImpl.start();
//啟動任務,更新,驗證,心跳等操作
// Start various schedule tasks
this.startScheduledTask();
//消費端
// Start pull service
this.pullMessageService.start();
//消費端,重新負載設定,請求的初始化操作也在此方法内執行
// Start rebalance service
this.rebalanceService.start();
//前面已經初始化過操作,該入參為false,隻需要初始化其他操作
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
初始化操作類RebalanceService
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
//消息的網絡操作及功能
private final MQClientInstance mqClientFactory;
//負載服務的初始化,構造是連接配接服務的執行個體,目前是線程的子類
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//等待執行
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
表現的狀态是在waiting,然後再指定負載,執行的負載操作是
//負載平衡重置
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
此時consumerTable中的資料在前面的初始化啟動中進行了注冊操作
//注冊服務處理事件
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
執行平台操作
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
public void doRebalance(final boolean isOrder) {
//獲得訂閱的内容
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//操作對topic的負載
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//基于資料的統一處理
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
//預設的叢集模式
case CLUSTERING: {
//獲得目前topic的訂閱隊列資訊
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//請求獲得topic的cid請求
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序操作
Collections.sort(mqAll);
Collections.sort(cidAll);
//選擇cid的政策
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//是否有變化
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
//消息隊列的修改
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
主要的操作在修改處理消息隊列的變化boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
//獲得處理隊列的資料
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
//周遊隊列同時對消息對鞋進行修正
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
//針對相同的topic的處理
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
//構造拉取消息的請求
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
//封裝拉取的請求結構
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//處理消息拉取請求
this.dispatchPullRequest(pullRequestList);
return changed;
}
拉取的操作this.dispatchPullRequest(pullRequestList);是抽象的設計,需要根據目前的實作類進行實作
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
//實行消息的拉取操作
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
逐層調用實作
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
//将請求直接添加到隊列中,這裡肯定是第一次初始化和後來無限次操作的入口
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
将最終的拉取請求添加的請求隊列中,等待請求隊列的掃描和執行
最終的執行開始及操作,在啟動的最後一步執行this.mQClientFactory.rebalanceImmediately();
//喚醒負載
public void rebalanceImmediately() {
this.rebalanceService.wakeup();
}
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
該喚醒操作和簽名的waiting操作是一一對應的操作,待服務完全啟動後,執行拉取的喚醒操作
版權聲明:本文為CSDN部落客「weixin_34378767」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_34378767/article/details/92641350