天天看點

RocketMQ源碼分析之Broker快速失敗元件BrokerFastFailure

作者:程式員阿龍

#頭條創作挑戰賽#

一、FastFailure是啥

FastFailure從字面意思上就能猜出來,其實就是快速失敗,什麼是快速失敗呢,就是broker感覺自己不行(繁忙)的時候,接下來的一些請求就會直接傳回失敗,如果有用過dubbo的小夥伴,可能會知道dubbo服務提供者端有個快速失敗機制,就是線程池滿的時候,再來調用請求就會直接拒絕掉,為啥會有這個機制呢?其實是為了保障broker或者是服務的穩定性與可用性,不然broker或者是服務非常繁忙的時候,處理請求很慢,還嘩嘩的一堆堆請求不停的打過來,容易将broker或者是服務幹當機。

二、什麼時候會發生

我們上面說過,當broker感覺自己不行的時候會有快速失敗的機制,那什麼時候會感覺到不行呢,其實就是OSPageCacheBusy,我們都知道commitlog 是單線程順序追加寫的,要想實作單線程順序追加寫,就得在追加寫之前擷取這個鎖,這個OSPageCacheBusy就是按照某個線程持有鎖的時間算出來的,當一次寫入持有鎖時間1s以上,RocketMQ就會認為OSPageCacheBusy,這個時候就會開啟FastFailure機制,将來的請求給快速拒絕掉。

現在想想,什麼時候會造成追加寫入commitlog很慢?

下面是我總結出來的2個原因:

  1. 記憶體爆了的時候,jvm進行垃圾回收,stop the world ,然後會造成整個寫入過程慢。
  2. commitlog這個使用的mmap記憶體檔案映射,記憶體檔案映射你可以了解為将檔案映射到記憶體中,檔案中的每一個位元組都在記憶體中有對應,然後你操作記憶體就相當于操作檔案,也就是說你往mmap寫入的話,其實就是往作業系統的vfs層的pagecache裡面寫入;但是我們都知道你操作的記憶體是一層虛拟記憶體,會将記憶體劃分為4k一個個的記憶體頁,雖然你做了mmap記憶體映射,實際上作業系統可能并沒有為你某個位置配置設定一個真實的實體記憶體頁,這個時候你往裡面寫入資料的話,作業系統發現寫入的那個虛拟記憶體頁沒有對應的實體記憶體頁,這個時候就會請求調頁,給你配置設定一個記憶體頁,如果你記憶體不足了,或者是正在swap交換記憶體,這個時候寫入os pagecache就會緩慢,造成波動(這塊内容涉及到作業系統的一些知識);

三、源碼分析

在BrokerController這個類執行個體化的時候會建立許多元件,其中一個BrokerFastFailure 元件;

  1. 啟動清理過期請求線程;
  2. 判斷任務是否需要快速失敗;
  3. 清理逾時請求;

1、啟動清理過期請求線程

啟動BrokerController 的時候,就會啟動BrokerFastFailure 這個元件,我們一起來看下它的啟動方法:

// 啟動清理過期請求線程
public void start() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            // 預設開啟
            if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                // 清理過期請求
                cleanExpiredRequest();
            }
        }
    }, 1000, 10, TimeUnit.MILLISECONDS);
}           

可以看到建立一個定時任務,然後每10ms,也就是10毫秒執行一次,判斷如果開啟這個快速失敗的話,就執行cleanExpiredRequest方法進行清理,這個預設是開啟的;

2、判斷任務是否需要快速失敗

private void cleanExpiredRequest() {
    // 如果說存儲元件處于os pagecache busy狀态,pagecache是一個讀寫高并發的情況,導緻pagecache來不及處理
    while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
        try {
            // 判斷是否繁忙
            if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                // 擷取task
                final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                if (null == runnable) {
                    break;
                }

                final RequestTask rt = castRunnable(runnable);
                // 快速失敗,把請求直接傳回一個響應,system busy,broker busy
                rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }

    // 清理逾時請求
    // 發送消息請求隊列
    cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
        this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
    // 拉取消息請求隊列
    cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
        this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
    // 心跳隊列
    cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
        this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
    // 結束事物請求隊列
    cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
        .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}           

可以看到上面有個while循環,如果是os pagecache一直繁忙的話,就會一直執行這個循環裡面的代碼,如果sendThreadPoolQueue(這個就是發送消息的隊列,當broker收到消息生産者發送消息的請求後,會交給線程池,然後線程處理不過來的就放到這個隊列中排隊等待處理,這個是線程池的一個知識)不是空的話,就會從這個隊列裡面取出任務,然後直接傳回SYSTEM_BUSY系統繁忙,[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d這串錯誤資訊最好有個印象,最起碼看到它能知道是啥原因。

我們看下判斷os pagecache繁忙的代碼:

// 核心是基于mappedfile來讀寫資料,基于os pagecache記憶體區域來映射一塊磁盤檔案
// 寫入也是寫到os pagecache裡去,讀取也是從os pagecache裡讀取
// 可能會出現一個問題,os pagecache,broker才采取了一個機制,讀寫分離,寫入到堆外緩存裡去
// transient pool,我們可以往這個裡面去寫入,定時觸發一個commit,從transient pool裡送出到os pagecache裡去
// 讀取的時候還是去讀取pagecache裡,寫入和讀取分離開來解決一個os pagecahce busy
@Override
public boolean isOSPageCacheBusy() {
    // 擷取到commitlog裡面的起始時間
    long begin = this.getCommitLog().getBeginTimeInLock();
    // 把commitlog起始時間和目前時間做一個內插補點diff
    long diff = this.systemClock.now() - begin;

    // 如果說時間內插補點是小于10 000 000,同時這個內插補點大于了os pagecache busy時間門檻值
    return diff < 10000000
        && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}           

其實就是在鎖内的時間超多1s,目前時間減去擷取鎖的時間(這個擷取鎖的時間是某個寫入消息線程擷取鎖的開始時間),就會任務是os pagecache 繁忙,當這個鎖被釋放,就會正常,會走一堆清理隊列中逾時任務;

3、清理逾時請求

我們可以看到清理的隊列有SendThreadPoolQueue,逾時時間是200ms,發送消息請求就會被放到這個隊列中;PullThreadPoolQueue,逾時時間是5000ms,拉取消息請求會在這個隊列中;HeartbeatThreadPoolQueue,逾時時間是31s,心跳請求放會被放到這個隊列中;EndTransactionThreadPoolQueue,逾時時間是3s,送出事務,復原事務的請求就會被放到這個隊列中。

// 清理逾時請求
// 發送消息請求隊列
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
    this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
// 拉取消息請求隊列
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
    this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
// 心跳隊列
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
    this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
// 結束事物請求隊列
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
    .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());           

我們來看下具體怎麼清理的

void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
    while (true) {
        try {
            if (!blockingQueue.isEmpty()) {
                final Runnable runnable = blockingQueue.peek();
                if (null == runnable) {
                    break;
                }
                final RequestTask rt = castRunnable(runnable);
                if (rt == null || rt.isStopRun()) {
                    break;
                }

                // 目前時間-任務建立時間戳
                final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                // 判斷是否大于指定時間
                if (behind >= maxWaitTimeMillsInQueue) {
                    if (blockingQueue.remove(runnable)) {
                        // 設定狀态
                        rt.setStopRun(true);
                        // 系統繁忙
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }
}           

四、總結

本文主要介紹了一下RocketMQ broker 裡面的FastFailure機制是啥,以及造成這個原因,然後從源碼的角度看了一下這個FastFailure 的實作。

繼續閱讀