天天看點

Kafka重平衡機制一、什麼是 Rebalance二、Rebalance 的觸發時機三、Rebalance 的過程四、Rebalance 的問題五、避免 Rebalance六、重平衡的應用

一、什麼是 Rebalance

分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為重平衡(Rebalance)。

Rebalance 實作了消費者群組的高可用性和伸縮性。

消費者通過向被指派為群組協調器(Coordinator)的 broker 發送心跳來維持它們和群組的從屬關系以及它們對分區的所有權。

所謂協調者,在 Kafka 中對應的術語是 Coordinator,它專門為 Consumer Group 服務,負責為 Group 執行 Rebalance 以及提供位移管理群組成員管理等。具體來講,Consumer 端應用程式在送出位移時,其實是向 Coordinator 所在的 Broker 送出位移。同樣地,當 Consumer 應用啟動時,也是向 Coordinator 所在的 Broker 發送各種請求,然後由 Coordinator 負責執行消費者組的注冊、成員管理記錄等中繼資料管理操作。

當在群組裡面 新增/移除消費者 或者 新增/移除 kafka 叢集 broker 節點 時,群組協調器 Broker 會觸發再均衡,重新為每一個 Partition 配置設定消費者。Rebalance 期間,消費者無法讀取消息,造成整個消費者群組一小段時間的不可用。

Rebalance 本質上是一種協定,規定了一個 Consumer Group 下的所有 Consumer 如何達成一緻,來配置設定訂閱 Topic 的每個分區。比如某個 Group 下有 20 個 Consumer 執行個體,它訂閱了一個具有 100 個分區的 Topic。正常情況下,Kafka 平均會為每個 Consumer 配置設定 5 個分區。這個配置設定的過程就叫 Rebalance。

二、Rebalance 的觸發時機

  • 組成員數發生變更。比如有新的 Consumer 執行個體加入組或者離開組,抑或是有 Consumer 執行個體崩潰被“踢出”組。
    • 新增消費者。customer 訂閱主題之後,第一次執行 poll 方法
    • 移除消費者。執行 customer.close()操作或者消費用戶端當機,就不再通過 poll 向群組協調器發送心跳了,當群組協調器檢測次消費者沒有心跳,就會觸發再均衡。
  • 訂閱主題數發生變更。Consumer Group 可以使用正規表達式的方式訂閱主題,比如consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的主題。在 Consumer Group 的運作過程中,你新建立了一個滿足這樣條件的主題,那麼該 Group 就會發生 Rebalance。
  • 訂閱主題的分區數發生變更。Kafka 目前隻能允許增加一個主題的分區數。當分區數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance。
    • 新增 broker。如重新開機 broker 節點
    • 移除 broker。如 kill 掉 broker 節點。

三、Rebalance 的過程

Rebalance 是通過消費者群組中的稱為“群主”消費者用戶端進行的。什麼是群主呢?“群主”就是第一個加入群組的消費者。消費者第一次加入群組時,它會向群組協調器發送一個 JoinGroup 的請求,如果是第一個,則此消費者被指定為“群主”(群主是不是和 qq 群很想啊,就是那個第一個進群的人)。

Kafka重平衡機制一、什麼是 Rebalance二、Rebalance 的觸發時機三、Rebalance 的過程四、Rebalance 的問題五、避免 Rebalance六、重平衡的應用
  1. 群主從群組協調器擷取群組成員清單,然後給每一個消費者進行配置設定分區 Partition。有兩種配置設定政策:Range 和 RoundRobin。
  • Range 政策,就是把若幹個連續的分區配置設定給消費者,如存在分區 1-5,假設有 3 個消費者,則消費者 1 負責分區 1-2,消費者 2 負責分區 3-4,消費者 3 負責分區 5。
  • RoundRoin 政策,就是把所有分區逐個分給消費者,如存在分區 1-5,假設有 3 個消費者,則分區 1->消費 1,分區 2->消費者 2,分區 3>消費者 3,分區 4>消費者 1,分區 5->消費者 2。
  1. 群主配置設定完成之後,把配置設定情況發送給群組協調器。
  2. 群組協調器再把這些資訊發送給消費者。每一個消費者隻能看到自己的配置設定資訊,隻有群主知道所有消費者的配置設定資訊。

查找協調者

所有 Broker 在啟動時,都會建立和開啟相應的 Coordinator 元件。也就是說,所有 Broker 都有各自的 Coordinator 元件。那麼,Consumer Group 如何确定為它服務的 Coordinator 在哪台 Broker 上呢?答案就在我們之前說過的 Kafka 内部位移主題 __consumer_offsets身上。

目前,Kafka 為某個 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 個步驟。

  1. 第 1 步:确定由位移主題的哪個分區來儲存該 Group 資料:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
  2. 第 2 步:找出該分區 Leader 副本所在的 Broker,該 Broker 即為對應的 Coordinator。

四、Rebalance 的問題

  • 首先,在 Rebalance 過程中,所有 Consumer 執行個體都會停止消費,等待 Rebalance 完成。
  • 其次,目前 Rebalance 的設計是所有 Consumer 執行個體共同參與,全部重新配置設定所有分區。其實更高效的做法是盡量減少配置設定方案的變動。例如執行個體 A 之前負責消費分區 1、2、3,那麼 Rebalance 之後,如果可能的話,最好還是讓執行個體 A 繼續消費分區 1、2、3,而不是被重新配置設定其他的分區。這樣的話,執行個體 A 連接配接這些分區所在 Broker 的 TCP 連接配接就可以繼續用,不用重新建立連接配接其他 Broker 的 Socket 資源。
  • 最後,Rebalance 實在太慢了。曾經,有個國外使用者的 Group 内有幾百個 Consumer 執行個體,成功 Rebalance 一次要幾個小時!這完全是不能忍受的。最悲劇的是,目前社群對此無能為力,至少現在還沒有特别好的解決方案。

    所謂“本事大不如不攤上”,也許最好的解決方案就是避免 Rebalance 的發生吧。

Rebalance 整個過程中,所有執行個體都不能消費任何消息,是以它對 Consumer 的 TPS 影響很大。

五、避免 Rebalance

了解了 Rebalance 的問題,我們可以知道,如果減少 Rebalance,可以整體提高 Consumer 的 TPS。

前面介紹了,Rebalance 的觸發時機有三個。其中,增加 Consumer 執行個體的操作都是計劃内的,可能是出于增加 TPS 或提高伸縮性的需要。

未及時發送心跳

第一類非必要 Rebalance 是因為未能及時發送心跳,導緻 Consumer 被“踢出”Group 而引發的。是以,你需要仔細地設定 session.timeout.ms 和 heartbeat.interval.ms的值。我在這裡給出一些推薦數值,你可以“無腦”地應用在你的生産環境中。

  • 設定 session.timeout.ms = 6s。
  • 設定 heartbeat.interval.ms = 2s。
  • 要保證 Consumer 執行個體在被判定為“dead”之前,能夠發送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

将 session.timeout.ms 設定成 6s 主要是為了讓 Coordinator 能夠更快地定位已經挂掉的 Consumer。畢竟,我們還是希望能盡快揪出那些“屍位素餐”的 Consumer,早日把它們踢出 Group。希望這份配置能夠較好地幫助你規避第一類“不必要”的 Rebalance。

Consumer 消費時間過長

第二類非必要 Rebalance 是 Consumer 消費時間過長導緻的。我之前有一個客戶,在他們的場景中,Consumer 消費資料時需要将消息處理之後寫入到 MongoDB。顯然,這是一個很重的消費邏輯。MongoDB 的一丁點不穩定都會導緻 Consumer 程式消費時長的增加。此時,max.poll.interval.ms 參數值的設定顯得尤為關鍵。如果要避免非預期的 Rebalance,你最好将該參數值設定得大一點,比你的下遊最大處理時間稍長一點。就拿 MongoDB 這個例子來說,如果寫 MongoDB 的最長時間是 7 分鐘,那麼你可以将該參數設定為 8 分鐘左右。

如果你按照上面的推薦數值恰當地設定了這幾個參數,卻發現還是出現了 Rebalance,那麼我建議你去排查一下Consumer 端的 GC 表現,比如是否出現了頻繁的 Full GC 導緻的長時間停頓,進而引發了 Rebalance。為什麼特意說 GC?那是因為在實際場景中,我見過太多因為 GC 設定不合理導緻程式頻發 Full GC 而引發的非預期 Rebalance 了。

六、重平衡的應用

如果 Kafka 觸發了再均衡,我們需要在消費者失去對一個分區的所有權之前送出最後一個已處理記錄的偏移量。如果消費者準備了一個緩沖區用于處理偶發的事件,那麼在失去分區所有權之前,需要處理在緩沖區累積下來的記錄。可能還需要關閉檔案句柄、資料庫連接配接等。

在為消費者配置設定新分區或移除舊分區時,可以通過消費者 API 執行一些應用程式代碼,在調用 subscribe() 方法時傳進去一個 ConsumerRebalanceListener 執行個體就可以了。 ConsumerRebalanceListener 有兩個需要實作的方法。

  • public void onPartitionsRevoked(Collection partitions) 方法會在再均衡開始之前和消費者停止讀取消息之後被調用。如果在這裡送出偏移量,下一個接管分區的消費者就知道該從哪裡開始讀取了。
  • public void onPartitionsAssigned(Collection partitions) 方法會在重新配置設定分區之後和消費者開始讀取消息之前被調用。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets=
  new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition>
      partitions) {
    }

    public void onPartitionsRevoked(Collection<TopicPartition>
      partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance());

    while (true) {
        ConsumerRecords<String, String> records =
          consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            System.out.println("topic = %s, partition = %s, offset = %d,
             customer = %s, country = %s\n",
             record.topic(), record.partition(), record.offset(),
             record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(),
             record.partition()), new
             OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // 忽略異常,正在關閉消費者
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }
}