天天看點

【硬剛Kafka】KAFKA基礎(四):Kafka架構深入(2)Kafka 生産者

本文是對《【硬剛大資料之學習路線篇】從零到大資料專家的學習指南(全面更新版)》的Kafka部分補充。

1)分區的原因

(1)友善在叢集中擴充,每個 Partition 可以通過調整以适應它所在的機器,而一個 topic又可以有多個 Partition 組成,是以整個叢集就可以适應任意大小的資料了;

(2)可以提高并發,因為可以以 Partition 為機關讀寫了。

2)分區的原則

我們需要将 producer 發送的資料封裝成一個 ProducerRecord 對象。

【硬剛Kafka】KAFKA基礎(四):Kafka架構深入(2)Kafka 生産者

(1)指明 partition 的情況下,直接将指明的值直接作為 partiton 值;

(2)沒有指明 partition 值但有 key 的情況下,将 key 的 hash 值與 topic 的 partition數進行取餘得到 partition 值;

(3)既沒有 partition 值又沒有 key 值的情況下,第一次調用時随機生成一個整數(後面每次調用在這個整數上自增),将這個值與 topic 可用的 partition 總數取餘得到 partition值,也就是常說的 round-robin 算法。

為保證 producer 發送的資料,能可靠的發送到指定的 topic,topic 的每個 partition 收到producer 發送的資料後,都需要向 producer 發送 ack(acknowledgement 确認收到),如果producer 收到 ack,就會進行下一輪的發送,否則重新發送資料。

【硬剛Kafka】KAFKA基礎(四):Kafka架構深入(2)Kafka 生産者

1)副本資料同步政策

【硬剛Kafka】KAFKA基礎(四):Kafka架構深入(2)Kafka 生産者

Kafka 選擇了第二種方案,原因如下:

1.同樣為了容忍 n 台節點的故障,第一種方案需要 2n+1 個副本,而第二種方案隻需要 n+1個副本,而 Kafka 的每個分區都有大量的資料,第一種方案會造成大量資料的備援。

2.雖然第二種方案的網絡延遲會比較高,但網絡延遲對 Kafka 的影響較小。 

2)ISR

  采用第二種方案之後,設想以下情景:leader 收到資料,所有 follower 都開始同步資料,但有一個 follower,因為某種故障,遲遲不能與 leader 進行同步,那 leader 就要一直等下去,直到它完成同步,才能發送 ack。這個問題怎麼解決呢?

   Leader 維護了一個動态的 in-sync replica set (ISR),意為和 leader 保持同步的 follower 集合。當 ISR 中的 follower 完成資料的同步之後,leader 就會給 follower 發送 ack。如果 follower長時間 未 向 leader 同 步 數 據 , 則 該 follower 将 被 踢 出 ISR , 該時間門檻值由replica.lag.time.max.ms 參數設定。Leader 發生故障之後,就會從 ISR 中選舉新的 leader。

3)ack 應答機制

  對于某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丢失,是以沒必要等 ISR 中的 follower 全部接收成功。

  是以 Kafka 為使用者提供了三種可靠性級别,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。 

acks 參數配置:

acks:

  0:producer 不等待 broker 的 ack,這一操作提供了一個最低的延遲,broker 一接收到還沒有寫入磁盤就已經傳回,當 broker 故障時有可能丢失資料;

  1:producer 等待 broker 的 ack,partition 的 leader 落盤成功後傳回 ack,如果在 follower同步成功之前 leader 故障,那麼将會丢失資料;

acks = 1資料丢失案例 

【硬剛Kafka】KAFKA基礎(四):Kafka架構深入(2)Kafka 生産者

  -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功後才傳回 ack。但是如果在 follower 同步完成後,broker 發送 ack 之前,leader 發生故障,那麼會

造成資料重複。 

acks =-1資料重複案例 

【硬剛Kafka】KAFKA基礎(四):Kafka架構深入(2)Kafka 生産者

4)故障處理細節

Log檔案中的HW和LEO

【硬剛Kafka】KAFKA基礎(四):Kafka架構深入(2)Kafka 生産者

LEO:指的是每個副本最大的 offset;

HW:指的是消費者能見到的最大的 offset,ISR 隊列中最小的 LEO。 

(1)follower 故障

follower 發生故障後會被臨時踢出 ISR,待該 follower 恢複後,follower 會讀取本地磁盤記錄的上次的 HW,并将 log 檔案高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步。

等該 follower 的 LEO 大于等于該 Partition 的 HW,即 follower 追上 leader 之後,就可以重新加入 ISR 了。

(2)leader 故障

leader 發生故障之後,會從 ISR 中選出一個新的 leader,之後,為保證多個副本之間的資料一緻性,其餘的 follower 會先将各自的 log 檔案高于 HW 的部分截掉,然後從新的 leader同步資料。

注意:這隻能保證副本之間的資料一緻性,并不能保證資料不丢失或者不重複。 

  将伺服器的 ACK 級别設定為-1,可以保證 Producer 到 Server 之間不會丢失資料,即 At Least Once 語義。相對的,将伺服器 ACK 級别設定為 0,可以保證生産者每條消息隻會被發送一次,即 At Most Once 語義。 

  At Least Once 可以保證資料不丢失,但是不能保證資料不重複;相對的,At Least Once可以保證資料不重複,但是不能保證資料不丢失。但是,對于一些非常重要的資訊,比如說交易資料,下遊資料消費者要求資料既不重複也不丢失,即 Exactly Once 語義。在 0.11 版本以前的 Kafka,對此是無能為力的,隻能保證資料不丢失,再在下遊消費者對資料做全局去重。對于多個下遊應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大影響。

  0.11 版本的 Kafka,引入了一項重大特性:幂等性。所謂的幂等性就是指 Producer 不論向 Server 發送多少次重複資料,Server 端都隻會持久化一條。幂等性結合 At Least Once 語義,就構成了 Kafka 的 Exactly Once 語義。即: At Least Once + 幂等性 = Exactly Once 

        要啟用幂等性,隻需要将 Producer 的參數中 enable.idompotence 設定為 true 即可。Kafka的幂等性實作其實就是将原來下遊需要做的去重放在了資料上遊。開啟幂等性的 Producer 在初始化的時候會被配置設定一個 PID,發往同一 Partition 的消息會附帶 Sequence Number。而Broker 端會對<PID, Partition, SeqNumber>做緩存,當具有相同主鍵的消息送出時,Broker 隻會持久化一條。

  但是 PID 重新開機就會變化,同時不同的 Partition 也具有不同主鍵,是以幂等性無法保證跨分區跨會話的 Exactly Once。

這個參數用老指定分區中必須由多少個副本收到消息,之後生産者才會認為這條消息寫入是成功的。acks參數有三種類型的值(都是字元串類型)。

acks=1 預設值為1.生産者發送消息之後,隻要分區的leader副本成功的寫入消息,生産端就會收到來自服務端的成功響應,說明發送成功。如果消息無法寫入leader副本,比如在leader副本崩潰、重新選舉新的leader副本的過程中,生産者就會收到一個錯誤的響應,為了避免消息丢失,生産者就會選擇重發消息;如果消息寫入leader副本并成功響應給生産者,并且在其他follower副本拉取之前leader副本崩潰,此時消息還會丢失,因為新選舉的leader副本中并沒有這條對應的消息。acks設定為1,是消息可靠性和吞吐量之間的這種方案。

acks=0 生産者發送消息之後,不需要等待任何服務端的響應。如果在消息從發送到寫入kafka的過程中出現異常,導緻kafka并沒有收到消息,此時生産者是不知道的,消息也就丢失了。akcs設定為0時,kafka可以達到最大的吞吐量。

acks=-1或acks=all 生産者在消息發送之後,需要等待isr中所有的副本都成功寫入消息此案能夠收到服務端的成功響應。acks設定為-1,可以達到相對最強的可靠性。但這不一定是最可靠的,因為isr中可能就隻有leader副本,這樣就退化成了acks=1 的情況。

注意,acks參數是一個字元串類型,而不是一個整數類型。配置錯誤會報異常。

生産者用戶端能發送消息的最大值,預設值為1048576B,1MB。不建議盲目修改,這個參數涉及其他的一些參數的關聯,比如broker端的message.max.bytes參數,如果broker的message.max.bytes參數設定為10,而max.request.size設定為20,當發送一條大小為15B的消息時,生産者參數就會報錯。

生産者重試次數,預設值為0。消息在從生産者從發出到成功寫入broker之前可能發生一些臨時性異常,比如網絡抖動、leader副本選舉等,這些異常往往是可以自行恢複的,生産者可以配置retries的值,通過生産端的内部重試來恢複而不是一味的将異常抛給生産者;如果重試達到設定次數,生産者才會放棄重試并抛出異常。但是!并不是所有的異常都可以通過重試來解決,比如消息過大,超過max.request.size參數配置的數值。

重試還和參數retry.backoff.ms有關,預設值為100,用來設定兩次重試之間的時間間隔,避免無效的頻繁重試。在配置retries和retry.backoff.ms之前,最好先估算一下可能的異常恢複時間,這樣可以設定總的重試時間要大于異常恢複時間,避免生産者過早的放棄重試。

這個參數用來制動多久之後關閉限制的連接配接,預設值540000(ms),9分鐘、。

這個參數用來指定生産者發送ProducerBatch之前等待更多的消息(ProducerRecord)假如ProducerBatch的時間,預設值為0。ProducerBatch在被填滿或者時間超過linger.ms值時發送出去。增大這個參數的值回增加消息的延遲(消費端接收延遲),但能夠提升一定的吞吐量。

這個參數用來設定socket接收消息緩沖區的大小,預設值為32768(B),即32KB。如果設定為-1,則使用作業系統的預設值。如果Producer和Kafka處于不同的機房,則可以适當的調大這個參數值。

這個參數用來設定socket發送消息緩沖區的大小,預設值為131072(B),即128KB。與receive.buffer.bytes參數一樣,如果設定為-1,則使用作業系統的預設值。

這個參數用來配置Producer等待請求響應的最長時間,預設值為3000(ms)。請求逾時之後可以選擇進行重試。這個參數需要比broker端參數replica.lag.time.max.ms值要大,這樣可以介紹因用戶端重試引起的消息重複的機率。

幂等性開啟,預設為false。

broker叢集位址,可以設定一到多個,建議至少設定為2個,若在應用程式啟動的時候,一個broker節點當機,還可以對另一個已提供節點進行連接配接。