天天看點

Apche Kafka 的生與死 – failover 機制詳解

kafka 作為 high throughput 的消息中間件,以其性能,簡單和穩定性,成為目前實時流處理架構中的主流的基礎元件。

當然在使用 kafka 中也碰到不少問題,尤其是 failover 的問題,常常給大家帶來不少困擾和麻煩。 

是以在梳理完 kafka 源碼的基礎上,盡量用通俗易懂的方式,把 kafka 發生 failover 時的機制解釋清楚,讓大家在使用和運維中,做到心中有數。

這裡讨論 kafka 的 failover 的前提是在0.8版本後, kafka 提供了 replica 機制。 

對于0.7版本不存在 failover 的說法,因為任意一個 broker dead 都會導緻上面的資料不可讀,進而導緻服務中斷。

下面簡單的介紹一下 0.8中加入的 replica 機制和相應的元件,

基本思想大同小異,如下圖 (ref.2):

Apche Kafka 的生與死 – failover 機制詳解

圖中有4個 kafka brokers,并且topic1有四個 partition(用藍色表示)分布在4個 brokers 上,為 leader replica; 

且每個 partition 都有兩個 follower replicas(用橘色表示),分布在和 leader replica 不同的 brokers。 

這個配置設定算法很簡單,有興趣的可以參考kafka的design。

為了支援replica機制,主要增加的兩個元件是,replica manager和controller, 如下圖:

Apche Kafka 的生與死 – failover 機制詳解

每個 broker server 都會建立一個 replica manager,所有的資料的讀寫都需要經過它 , 

0.7版本,kafka 會直接從 logmanager 中讀資料,但在增加 replica 機制後,隻有 leader replica 可以響應資料的讀寫請求 。 

是以,replica manager 需要管理所有 partition 的 replica 狀态,并響應讀寫請求,以及其他和 replica 相關的操作。

大家可以看到,每個 partition 都有一個 leader replica,和若幹的 follower replica,那麼誰來決定誰是leader? 

你說有 zookeeper,但用 zk 為每個 partition 做 elect,效率太低,而且 zk 會不堪重負; 

是以現在的通用做法是,隻用 zk 選一個 master 節點,然後由這個 master 節點來做其他的所有仲裁工作。 

kafka 的做法就是在 brokers 中選出一個作為 controller,來做為 master 節點,進而仲裁所有的 partition 的 leader 選舉。

下面我們會從如下幾個方面來解釋 failover 機制, 

先從 client 的角度看看當 kafka 發生 failover 時,資料一緻性問題。 

然後從 kafka 的各個重要元件,zookeeper,broker, controller 發生 failover 會造成什麼樣的影響? 

最後給出一些判斷 kafka 狀态的 tips。

除了要打開 replica 機制,還取決于 produce 的 request.required.acks 的設定,

acks = 0,發就發了,不需要 ack,無論成功與否 ;

acks = 1,當寫 leader replica 成功後就傳回,其他的 replica 都是通過fetcher去異步更新的,當然這樣會有資料丢失的風險,如果leader的資料沒有來得及同步,leader挂了,那麼會丢失資料;

acks = –1, 要等待所有的replicas都成功後,才能傳回;這種純同步寫的延遲會比較高。

是以,一般的情況下,thoughput 優先,設成1,在極端情況下,是有可能丢失資料的; 

如果可以接受較長的寫延遲,可以選擇将 acks 設為 –1。

首先無論是 high-level 或 low-level consumer,我們要知道他是怎麼從 kafka 讀資料的?

Apche Kafka 的生與死 – failover 機制詳解

kafka 的 log patition 存在檔案中,并以 offset 作為索引,是以 consumer 需要對于每個 partition 記錄上次讀到的 offset (high-level和low-level的差別在于是 kafka 幫你記,還是你自己記);

是以如果 consumer dead,重新開機後隻需要繼續從上次的 offset 開始讀,那就不會有不一緻的問題。

但如果是 kafka broker dead,并發生 partition leader 切換,如何保證在新的 leader 上這個 offset 仍然有效?  

kafka 用一種機制,即 committed offset,來保證這種一緻性,如下圖(ref.2)

Apche Kafka 的生與死 – failover 機制詳解

log 除了有 log end offset 來表示 log 的末端,還有一個 committed offset, 表示有效的 offset; 

committed offset 隻有在所有 replica 都同步完該 offset 後,才會被置為該offset; 

是以圖中 committed 置為2, 因為 broker3 上的 replica 還沒有完成 offset 3 的同步; 

是以這時,offset 3 的 message 對 consumer 是不可見的,consumer最多隻能讀到 offset 2。 

如果此時,leader dead,無論哪個 follower 重新選舉成 leader,都不會影響資料的一緻性,因為consumer可見的offset最多為2,而這個offset在所有的replica上都是一緻的。

是以在一般正常情況下,當 kafka 發生 failover 的時候,consumer 是不會讀到不一緻資料的。特例的情況就是,目前 leader 是唯一有效的 replica,其他replica都處在完全不同步狀态,這樣發生 leader 切換,一定是會丢資料的,并會發生 offset 不一緻。

kafka 首先對于 zookeeper 是強依賴,是以 zookeeper 發生異常時,會對資料造成如何的影響?

如果 zookeeper dead,broker 是無法啟動的,報如下的異常:

Apche Kafka 的生與死 – failover 機制詳解

這種異常,有可能是 zookeeper dead,也有可能是網絡不通,總之就是連不上 zookeeper。 

這種 case,kafka完全不工作,直到可以連上 zookeeper 為止。

其實上面這種情況比較簡單,比較麻煩的是 zookeeper hang,可以說 kafka 的80%以上問題都是由于這個原因 

zookeeper hang 的原因有很多,主要是 zk 負載過重,zk 所在主機 cpu,memeory 或網絡資源不夠等

zookeeper hang 帶來的主要問題就是 session timeout,這樣會觸發如下的問題,

a. controller fail,controller 發生重新選舉和切換,具體過程參考下文。

b. broker fail,導緻partition的leader發生切換或partition offline,具體過程參考下文。

c. broker 被 hang 住 。 

這是一種比較特殊的 case,出現時在 server.log 會出現如下的log,

server.log: 

“info i wrote this conflicted ephemeral node [{"jmx_port":9999,"timestamp":"1444709  63049","host":"10.151.4.136","version":1,"port":9092}] at /brokers/ids/1 a while back in a different session, hence i will backoff for this node to be deleted by zookeeper and retry (kafka.utils.zkutils$)”

問題在于“the current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.” 

即 zk 的 session 過期和 ephemeral node 删除并不是一個原子操作; 

出現的case如下:

在極端case下,zk 觸發了 session timeout,但還沒來得及完成 /brokers/ids/1 節點的删除,就被 hang 住了,比如是去做很耗時的 fsync 操作 。

但是 broker 1 收到 session timeout 事件後,會嘗試重新去 zk 上建立 /brokers/ids/1 節點,可這時舊的節點仍然存在,是以會得到 nodeexists,其實這個是不合理的,因為既然 session timeout,這個節點就應該不存在。

通常的做法,既然已經存在,我就不管了,該幹啥幹啥去;問題是一會 zk 從 fsync hang 中恢複了,他會記得還有一個節點沒有删除,這時會去把 /brokers/ids/1 節點删除。

結果就是對于client,雖然沒有再次收到 session 過期的事件,但是 /brokers/ids/1 節點卻不存在了。

是以這裡做的處理是,在前面發現 nodeexists 時,while true 等待,一直等到 zk 從 hang 中恢複删除該節點,然後建立新節點成功,才算完; 

這樣做的結果是這個broker也會被一直卡在這兒,等待該節點被成功建立。

broker 的 failover,可以分為兩個過程,一個是 broker failure, 一個是 broker startup。

在談failover之前,我們先看一個更簡單的過程,就是新加一個全新的 broker: 

首先明确,新加的 broker 對現存所有的 topic 和 partition,不會有任何影響; 

因為一個 topic 的 partition 的所有 replica 的 assignment 情況,在建立時就決定了,并不會自動發生變化,除非你手動的去做 reassignment。 

是以新加一個 broker,所需要做的隻是大家同步一下中繼資料,大家都知道來了一個新的 broker,當你建立新的 topic 或 partition 的時候,它會被用上。

首先明确,這裡的 broker failure,并不一定是 broker server 真正的 dead了, 隻是指該 broker 所對應的 zk ephemeral node ,比如/brokers/ids/1,發生 session timeout; 

當然發生這個的原因,除了server dead,還有很多,比如網絡不通;但是我們不關心,隻要出現 sessioin timeout,我們就認為這個 broker 不工作了; 

會出現如下的log,

controller.log: 

“info [brokerchangelistener on controller 1]: newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.replicastatemachine$brokerchangelistener)” 

“info [controller 1]: broker failure callback for 4 (kafka.controller.kafkacontroller)”

當一個 broker failure 會影響什麼,其實對于多 replicas 場景,一般對最終客戶沒啥影響。 

隻會影響哪些 leader replica 在該 broker 的 partitions; 需要重新做 leader election,如果無法選出一個新的 leader,會導緻 partition offline。 

因為如果隻是 follow replica failure,不會影響 partition 的狀态,還是可以服務的,隻是可用 replica 少了一個;需要注意的是,kafka 是不會自動補齊失敗的replica的,即壞一個少一個; 

但是對于 leader replica failure,就需要重新再 elect leader,前面已經讨論過,新選取出的 leader 是可以保證 offset 一緻性的;

note: 其實這裡的一緻性是有前提的,即除了 fail 的 leader,在 isr(in-sync replicas) 裡面還存在其他的 replica;顧名思義,isr,就是能 catch up with leader 的 replica。 

雖然 partition 在建立的時候,會配置設定一個 ar(assigned replicas),但是在運作的過程中,可能會有一些 replica 由于各種原因無法跟上 leader,這樣的 replica 會被從 isr 中去除。 

是以 isr <= ar; 

如果,isr 中 沒有其他的 replica,并且允許 unclean election,那麼可以從 ar 中選取一個 leader,但這樣一定是丢資料的,無法保證 offset 的一緻性。

這裡的 startup,就是指 failover 中的 startup,會出現如下的log,

“info [controller 1]: new broker startup callback for 3 (kafka.controller.kafkacontroller)”

過程也不複雜,先将該 broker 上的所有的 replica 設為 online,然後觸發 offline partition 或 new partition 的 state 轉變為 online; 

是以 broker startup,隻會影響 offline partition 或 new partition,讓他們有可能成為 online。 

那麼對于普通的已經 online partition,影響隻是多一個可用的 replica,那還是在它完成catch up,被加入 isr 後的事。

note: partition 的 leader 在 broker failover 後,不會馬上自動切換回來,這樣會産生的問題是,broker間負載不均衡,因為所有的讀寫都需要通過 leader。 

為了解決這個問題,在server的配置中有個配置,auto.leader.rebalance.enable,将其設為true; 

這樣 controller 會啟動一個 scheduler 線程,定期去為每個 broker 做 rebalance,即發現如果該 broker 上的 imbalance ratio 達到一定比例,就會将其中的某些 partition 的 leader,進行重新 elect 到原先的 broker 上。

前面說明過,某個 broker server 會被選出作為 controller,這個選舉的過程就是依賴于 zookeeper 的 ephemeral node,誰可以先在"/controller"目錄建立節點,誰就是 controller; 

是以反之,我們也是 watch 這個目錄來判斷 controller 是否發生 failover 或 變化。controller 發生 failover 時,會出現如下 log:

“info [sessionexpirationlistener on 1], zk expired; shut down all controller components and try to re-elect (kafka.controller.kafkacontroller$sessionexpirationlistener)”

controller 主要是作為 master 來仲裁 partition 的 leader 的,并維護 partition 和 replicas 的狀态機,以及相應的 zk 的 watcher 注冊;

controller 的 failover 過程如下:

試圖去在“/controller” 目錄搶占建立 ephemeral node;

如果已經有其他的 broker 先建立成功,那麼說明新的 controller 已經誕生,更新目前的中繼資料即可;

如果自己建立成功,說明我已經成為新的 controller,下面就要開始做初始化工作,

初始化主要就是建立和初始化 partition 和 replicas 的狀态機,并對 partitions 和 brokers 的目錄的變化設定 watcher。

可以看到,單純 controller 發生 failover,是不會影響正常資料讀寫的,隻是 partition 的 leader 無法被重新選舉,如果此時有 partition 的 leader fail,會導緻 partition offline; 

但是 controller 的 dead,往往是伴随着 broker 的 dead,是以在 controller 發生 failover 的過程中,往往會出現 partition offline, 導緻資料暫時不可用。

a, 驗證topic 是否work?

最簡單的方式,就是用 producer 和 consumer console 來測試

producer console,如下可以往 localhost 的 topic test,插入兩條 message,

consumer console,如下就可以把剛寫入的 message 讀出,

如果整個過程沒有報錯,ok,說明你的topic是可以工作的

b, 再看看topic是否健康?

這樣會列印出 topic test 的 detail 資訊,如圖,

Apche Kafka 的生與死 – failover 機制詳解

從這個圖可以說明幾個問題:

首先,topic 有幾個 partitions,并且 replicas factor 是多少,即有幾個 replica? 

圖中分别有32個 partitions,并且每個 partition 有兩個 replica。

再者,每個 partition 的 replicas 都被配置設定到哪些 brokers 上,并且該 partition 的 leader 是誰? 

比如,圖中的 partition0,replicas 被配置設定到 brokers 4和1上面,其中 leader replica 在 broker 1 上。

最後,是否健康? 

從以下幾個方面依次表明健康程度,

isr 為空,說明這個 partition 已經 offline 無法提供服務了,這種 case 在我們的圖中沒有出現;

isr 有資料,但是 isr < replicas,這種情況下對于使用者是沒有感覺的,但是說明有部分 replicas 已經出問題了,至少是暫時無法和 leader 同步;比如,圖中的 partition0,isr 隻有1,說明 replica 4 已經 offline

isr = replicas,但是 leader 不是 replicas 中的第一個 replica,這個說明 leader 是發生過重新選取的,這樣可能會導緻 brokers 負載不均衡;比如,圖中的 partition9,leader是2,而不是3,說明雖然目前它的所有 replica 都是正常的,但之前發生過重新選舉。

c,最後就是看kafka的日志,kafka/logs

主要是看 controller.log 和 server.log,分别記錄 controller 和 broker server 的日志。 

然後根據前面我給的每種異常的日志,你可以看出來到底是出現什麼問題。

本文章摘自部落格園,原文釋出日期:2015-11-17