天天看點

【RocketMQ源碼分析】生産者

簡單使用

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //使用ProducerGroup初始化Producer
        DefaultMQProducer producer = new
            DefaultMQProducer("group");
        // 指定namesrv
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            //建立消息
            Message msg = new Message("Topic" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //發送消息
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //關閉Producer
        producer.shutdown();
    }
}
           

上面是同步發送消息的簡單使用,主要是初始化DefaultMQProducer,調用start()方法和send()方法.接下來具體分析生産者發送消息的具體過程。

DefaultMQProducer

DefaultMQProducer就是消息生産者的用戶端,DefaultMQProducer的定義如下:

public class DefaultMQProducer extends ClientConfig implements MQProducer {
  ...
}
           

可以看到DefaultMQProducer實作了MQProducer接口并繼承了ClientConfig類。

  • MQProducer接口

    在MQProducer接口定義了Producer的一些操作,并且繼承了MQAdmin,在MQAdmin主要是一些管理功能接口,MQProducer中定義了各種發送消息的結構,同步發送,異步發送,oneway發送,批量發送和事務消息的發送。

public interface MQProducer extends MQAdmin {
    void start() throws MQClientException;

    void shutdown();

    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;

    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

    // 省略一系列send方法

    // 失誤相關的發送消息
    TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;

    TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException;

    //for batch
    SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

    // 省略一系列的batch send的方法
}
           
public interface MQAdmin {

    void createTopic(final String key, final String newTopic, final int queueNum)
        throws MQClientException;

    void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
        throws MQClientException;

    long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;

    long maxOffset(final MessageQueue mq) throws MQClientException;

    long minOffset(final MessageQueue mq) throws MQClientException;

    long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;

    MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException;

    QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
        final long end) throws MQClientException, InterruptedException;
  
    MessageExt viewMessage(String topic,
        String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

}
           
  • ClientConfig類

    ClientConfig根據類名稱大概也能猜到,主要是負責一些配置資訊。

public class ClientConfig {
    public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
    private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
    private String clientIP = RemotingUtil.getLocalAddress();
    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
    protected String namespace;
    protected AccessChannel accessChannel = AccessChannel.LOCAL;

    private int pollNameServerInterval = 1000 * 30;
   
    private int heartbeatBrokerInterval = 1000 * 30;
   
    private int persistConsumerOffsetInterval = 1000 * 5;
    private boolean unitMode = false;
    private String unitName;
    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));

    private boolean useTLS = TlsSystemConfig.tlsEnable;

    private LanguageCode language = LanguageCode.JAVA;
}
           

大緻了解了一下DefaultMQProducer的結構,DefaultMQProducer有兩個核心方法start()、send()方法。

start()

DefaultMQProducer初始化完成後,會調用start()方法,start方法主要是負責建立網絡通信、初始化配置。它的具體實作最終是交給defaultMQProducerImpl#start()方法了。

public void start(final boolean startFactory) throws MQClientException {
    // 根據serviceState處理不同邏輯,初始化都是CREATE_JUST
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            // 檢查配置資訊,主要是ProducerGroup的,不重要
            this.checkConfig();
                        // 判斷ProducerGroup是否是CLIENT_INNER_PRODUCER_GROUP,如果是将 ClientConfig#instanceName 改為程序PID,什麼情況下是CLIENT_INNER_PRODUCER_GROUP呢?接下來會接觸到,由于我們都會指定自己的group,是以這裡會進入到if語句裡面修改ClientConfig#instanceName為程序PID
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
                        // 建立一個MQClientInstance,這個是重點,就是它完成了和broker的通信
            // 建立完成後MQClientManager會儲存ClientId -> MQClientInstance的映射
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                        // 注冊Producer,就是将 ProducerGroup -> DefaultMQProducerImpl 的關系儲存到MQClientInstance中
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                // 如果注冊失敗,抛異常結束,終止啟動
            }
                        // topicPublishInfoTable 初始化一個topic的路由資訊,這個CreateTopicKey就是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC,這個topic和Broker的AutoCreateTopicEnable進行配置使用,後面會詳細介紹
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                        // startFactory這時是true,是以會調用MQClientInstance的start()方法
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        // 這裡省略了一些 case 邏輯的處理,都是不重要的,邏輯都在CREATE_JUST這個case下面
    }
        // 發送心跳資料,這個後面再看吧
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
           

這裡會執行個體化一個核心的類MQClientInstance,RocketMQ 在同一個 JVM 程序擁有一個 clientConfigId(用戶端ID)該JVM程序中不同的消息消費組的消息用戶端ID相同,因為在JVM程序中對于每一個ClientConfig 隻會執行個體化一個 MQClientInstance。執行個體化完成之後會start啟動該執行個體,看一下具體的start方法:

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    //遠端通信,NettyRemotingClient 啟動
                    this.mQClientAPIImpl.start();
                    //1.定時擷取Name Server位址 2.定時從Name Server擷取Topic路由資訊 3.定時清理下線的Broker,向所有Broker發送心跳資訊 4.定時持久化Consumer消費進度(廣播存儲到本地,叢集存儲到Broker)
                    this.startScheduledTask();
                    // 開啟輪訓拉取消息
                    this.pullMessageService.start();
                    // 開啟重平衡
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
           
  • this.mQClientAPIImpl.start(); //遠端通信,NettyRemotingClient 啟動
//擷取NameServer位址
public List<String> getNameServerAddressList() {
        return this.remotingClient.getNameServerAddressList();
    }

    public RemotingClient getRemotingClient() {
        return remotingClient;
    }

    public String fetchNameServerAddr() {
        try {
            String addrs = this.topAddressing.fetchNSAddr();
            if (addrs != null) {
                if (!addrs.equals(this.nameSrvAddr)) {
                    log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
                    this.updateNameServerAddressList(addrs);
                    this.nameSrvAddr = addrs;
                    return nameSrvAddr;
                }
            }
        } catch (Exception e) {
            log.error("fetchNameServerAddr Exception", e);
        }
        return nameSrvAddr;
    }

    //更新NameServer位址
    public void updateNameServerAddressList(final String addrs) {
        String[] addrArray = addrs.split(";");
        List<String> list = Arrays.asList(addrArray);
        this.remotingClient.updateNameServerAddressList(list);
    }

    public void start() {
        this.remotingClient.start();
    }

    public void shutdown() {
        this.remotingClient.shutdown();
    }

    
    //建立topic
    public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
        final long timeoutMillis)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    }

    //發送消息
    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }
    //異步發送消息
    private void sendMessageAsync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final AtomicInteger times,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws InterruptedException, RemotingException {
    
    }
    //拉取消息
    public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

        switch (communicationMode) {
            case ONEWAY:
                assert false;
                return null;
            case ASYNC:
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            case SYNC:
                return this.pullMessageSync(addr, request, timeoutMillis);
            default:
                assert false;
                break;
        }

        return null;
    }
    //異步拉取消息
    private void pullMessageAsync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis,
        final PullCallback pullCallback
    ) throws RemotingException, InterruptedException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {

    }
    //檢查Broker是否正常
    public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
        
    }
           

可以看到MQClientAPIImpl類封裝了所有與伺服器通信部分API,我們随便找一個發送請求的方法看看,可以發現,MQClientAPIImpl中的請求都是先構造RemotingCommand,然後通過NettyRemotingClient發送請求,最後處理響應資料。

  • this.startScheduledTask(); //1.定時擷取Name Server位址 2.定時從Name Server擷取Topic路由資訊 3.定時清理下線的Broker,向所有Broker發送心跳資訊 4.定時持久化Consumer消費進度(廣播存儲到本地,叢集存儲到Broker)
  • this.pullMessageService.start(); //輪訓拉取消息

    pullMessageService是一個線程的封裝,開啟長輪詢拉消息服務,單線程異步拉取,在MQClientInstance#start()方法中調用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息

  • this.rebalanceService.start(); //重平衡

    RebalanceService其實是一個線程的封裝,RocketMQ 中通過RebalanceService線程實作消費隊列負載。RebalanceService在DefaultMQPushConsumerImpl#start()方法中調用了MQClientInstance#start()方法,在MQClientInstance#start()方法中調用了RebalanceService#start()方法。這個線程就是隔一段時間執行以下MQClientInstance#doRebalance()方法,而具體的doRebalance方法實作是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的屬性

send()

send方法最終也是交給DefaultMQProducerImpl.sendDefaultImpl方法實作的,看看具體實作:

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 檢查狀态和消息
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 擷取topic的路由資料,這裡是重點,這裡就是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC和Broker的AutoCreateTopicEnable進行配置如何配合的?的答案了
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());  //@1
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 如果是同步發送,會有重試機制
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 這裡是發送消息負載均衡的關鍵
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);   //@2
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                                        // 真正幹活的老弟在這
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);     //@3
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);   //@4
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch () {
                    // 這裡是一對catch Exception,省略了
                }
            } else {
                break;
            }
        }

                // 如果發送成功,傳回
        if (sendResult != null) {
            return sendResult;
        }

        // 這裡還有一堆處理異常的邏輯,也省略了

        throw mqClientException;
    }

    // 沒有路由資訊的一些異常處理
}
           

首先看一下參數

  • Message:這個就是我們要發送的消息資料,沒啥說的
  • CommunicationMode:就是發送消息的方式,同步?異步?還是Oneway?
  • SendCallback:這個是異步發送回調用的
  • timeout:逾時時間

@1.tryToFindTopicPublishInfo方法

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 根據topic從Map中直接擷取路由資料,Producer啟動時肯定是啥都沒有的
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        // 直接put進去一個空的TopicPublishInfo
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 從namesrv擷取這個topic的路由資訊
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
        // 如果從namesrv中拿到路由資料了直接傳回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else { // namesrv中也沒有再從namesrv中擷取?頭鐵?不撞南牆不回頭?
        // 這個updateTopicRouteInfoFromNameServer和上面的那個方法是不同的邏輯了,并不是重試
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
           

這個方法主要是實作根據topic擷取路由資訊,而最終是交給MQClientInstance.updateTopicRouteInfoFromNameServer,再交給MQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer方法調用Name Server接口,根據Topic擷取路由資訊。前面我們分析過MQClientAPIImpl類,這個類封裝了所有網絡通訊的接口,這裡可以展現出來。

@2.selectOneMessageQueue

擷取到Topic的路由資訊後,會調用selectOneMessageQueue()方法擷取一個MessageQueue。由于叢集模式下Broker有多個,是以這裡Broker也會存儲多個queue,這裡就需要從多個queue中擷取一個MessageQueue,這就涉及到一個負載均衡。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // Broker故障延遲機制,預設是false,不開啟,後面再分析
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }
        // 沒有開啟Broker故障延遲機制會走這裡
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
           

可以發現,普通情況下的負載均衡很簡單,就是通過自增的一個index和messageQueueList.size()求餘實作的,由于同步發送有重試機制,是以重試時會判斷是否是上次發送失敗的broker,如果是會跳過這個有問題的broker。

@3.sendKernelImpl

sendKernelImpl方法就是具體的發送消息的邏輯了

private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // 根據broker name擷取master的位址,釋出消息過程中,尋找Broker位址,一定是找Master
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        // 如果沒有擷取到master的位址,重新整理路由資訊
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        // 判斷是不是VIP,如果配置了是VIP發送消息,name會向broker的另外一個端口發送消息
        // VIP走小紅帽通道
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            // 省略了現在我們不關心的内容
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
           
            // 省略了現在我們不關心的内容
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            return sendResult;
        } catch () {
            // 省略了現在我們不關心的内容
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
           

最終是交給MQClientAPIImpl.sendMessage去發送消息,可以看到發送消息給broker都是給master發的,也就是消費者主要同broker的master建立連接配接。

@4.updateFaultItem-Broker故障延遲機制

我們看看RocketMQ的Broker故障延遲機制是怎麼減少前面提到的沒必要的調用的

if (this.sendLatencyFaultEnable) {
    try {
        int index = tpInfo.getSendWhichQueue().getAndIncrement();
        for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
            int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
            // latencyFaultTolerance這裡就是實作Broker故障延遲機制的關鍵了
            if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                    return mq;
            }
        }
                // 到這裡都還沒找到合适的MessageQueue,那麼也隻能随便挑一個了,總得給上級一個交代是吧
        // pickOneAtLeast()就不再細說了,大家可以自行了解,反正大概就是從差的裡面再挑一個好的出來
        final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
        int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
        if (writeQueueNums > 0) {
            final MessageQueue mq = tpInfo.selectOneMessageQueue();
            if (notBestBroker != null) {
                mq.setBrokerName(notBestBroker);
                mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
            }
            return mq;
        } else {
            latencyFaultTolerance.remove(notBestBroker);
        }
    } catch (Exception e) {
        log.error("Error occurred when selecting message queue", e);
    }

    return tpInfo.selectOneMessageQueue();
}
           

可以看到,進入if裡面後,和普通機制的處理差不多,會選擇一個MessageQueue,但是這裡調用了LatencyFaultTolerance對選擇的

MessageQueue的broker進行了判斷,如果可用的話,才會繼續使用,不可用的話繼續選擇下一個

那麼我們看看LatencyFaultTolerance#isAvailable()怎麼做的吧?

public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable();
    }
    return true;
}
           

邏輯比較簡單,從faultItemTable中根據broker name擷取到FaultItem對象,如果FaultItem不存在說明這個broker是可用的,如果FaultItem不為空,根據FaultItem#isAvailable()判斷是否可用,如果消息發送成功會更新faultItemTable。

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
           

發送失敗了也會更新

} catch (RemotingException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    ...
} catch (MQClientException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    ...
} catch (MQBrokerException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    ...
} catch (InterruptedException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
    ...
}
           

參考:https://www.jianshu.com/p/4e63d2143351

繼續閱讀