天天看點

【Kafka】《Kafka權威指南》——寫資料

不管是把 Kafka 作為消息隊列、消息、總線還是資料存儲平台來使用 ,總是需要有一個可以往 Kafka 寫入資料的生産者和一個可以從 Kafka讀取資料的消費者,或者一個兼具兩種角 色的應用程式。

例如,在一個信用卡事務處理系統裡,有一個用戶端應用程式,它可能是一個線上商店, 每當有支付行為發生時,它負責把事務發送到 Kafka上。另一個應用程式根據規則引擎檢 查這個事務,決定是準許還是拒絕。 準許或拒絕的響應消息被寫回 Kafka,然後發送給發起事務的線上商店。第三個應用程式從 Kafka上讀取事務和稽核狀态,把它們儲存到資料 庫, 随後分析師可以對這些結果進行分析,或許還能借此改進規則引擎 。

開發者們可以使用 Kafka 内置的用戶端 API開發 Kafka應用程式。

在這一章,我們将從 Kafra生産者的設計群組件講起,學習如何使用 Kafka生産者。我們将展示如何建立 KafkaProducer和 ProducerRecords對象、如何将記錄發送給 Kafka,以及如何處理從 Kafka 傳回的錯誤,然後介紹用幹控制生産者行為的重要配置選項,最後深入 探讨如何使用不同的分區方法和序列化器,以及如何自定義序列化器和分區器 。

在下一章,我們将會介紹 Kafra的悄費者用戶端,以及如何從 Kafka讀取消息。

生産者概覽

一個應用程式在很多情況下需要往 Kafka 寫入消息 : 記錄使用者的活動(用于審計和分析 )、 記錄度量名額、儲存日志、消息、記錄智能家電的資訊、與其他應用程式進行異步通信、 緩沖即将寫入到資料庫的資料,等等。

多樣的使用場景意味着多樣的需求:是否每個消息都很重要?是否允許丢失 一 小部分消息?偶爾出現重複消息是否可以接受?是否有嚴格的延遲和吞吐量要求?

在之前提到的信用卡事務處理系統裡,消息丢失或消息重複是不允許的,可以接受的延遲最大為 500ms,對吞吐量要求較高,我們希望每秒鐘可以處理一百萬個消息。

儲存網站的點選資訊是另 一種使用場景。在這個場景裡,允許丢失少量的消息或出現少量 的消息重複,延遲可以高一些,隻要不影響使用者體驗就行。換句話說,隻要使用者點選連結 後可以馬上加載頁面,那麼我們并不介意消息要在幾秒鐘之後才能到達 Kafka 伺服器。 吞 吐量則取決于網站使用者使用網站的頻度。

不同的使用場景對生産者 API 的使用和配置會有直接的影響。

盡管生産者 API 使用起來很簡單 ,但消息的發送過程還是有點複雜的。下圖展示 了向Kafka 發送消息的主要步驟。

【Kafka】《Kafka權威指南》——寫資料

Kafka 生産者元件圖

我們從建立 一個 ProducerRecord 對象開始, ProducerRecord 對象需要包含目标主題和要發送的内容。我們還可以指定鍵或分區。在發送 ProducerRecord對象時,生産者要先把鍵和 值對象序列化成位元組數組,這樣它們才能夠在網絡上傳輸 。

接下來,資料被傳給分區器。如果之前在 ProducerRecord對象裡指定了分區,那麼分區器就不會再做任何事情,直接把指定的分區傳回。如果沒有指定分區 ,那麼分區器會根據 ProducerRecord對象的鍵來選擇一個分區 。選好分區以後 ,生産者就知道該往哪個主題和分區發送這條記錄了。緊接着,這條記錄被添加到一個記錄批次裡,這個批次裡的所有消息會被發送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。

伺服器在收到這些消息時會傳回一個響應。如果消息成功寫入 Kafka,就傳回 一 個 RecordMetaData 對象,它包含了主題和分區資訊,以及記錄在分區裡的偏移量。如果寫入 失敗, 就會傳回 一個錯誤 。生産者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還是失敗,就傳回錯誤資訊。

建立Kafka生産者

要往 Kafka寫入消息,首先要建立一個生産者對象,井設定一些屬性。

下面的代碼片段展示了如何建立一個新的生産者,這裡隻指定了必要的屬性,其他使用預設設定。

private Properties kafkaProps = new Properties(); 

kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
 
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
 
producer = new KafkaProducer<String, String>(kafkaProps);      

Kafka生産者有 3個必選的屬性

bootstrap.servers

該屬性指定 broker 的位址清單,位址的格式為 host:port。清單裡不需要包含所有的broker位址,生産者會從給定的 broker裡查找到其他 broker的資訊。不過建議至少要提供兩個 broker的資訊, 一旦其中一個當機,生産者仍然能夠連接配接到叢集上。

key.serializer

broker希望接收到的消息的鍵和值都是位元組數組。生産者接口允許使用參數化類型,是以可以把 Java對象作為鍵和值發送給 broker。這樣的代碼具有良好的可讀性,不過生産者需要知道如何把這些 Java對象轉換成位元組數組。 key.serializer必須被設定為一個實作了org.apache.kafka.common.serialization.Serializer接口的類,生産者會使用這個類把鍵對象序列化成位元組數組。 Kafka 用戶端預設提供了ByteArraySerializer(這個隻做很少的事情)、 StringSerializer和 IntegerSerializer,是以,如果你隻使用常見的幾種 Java對象類型,那麼就沒必要實作自己的序列化器 。要注意, key.serializer是必須設定的,就算你打算隻發送值内容。

value.serializer

與 key.serializer一樣, value.serializer指定的類會将值序列化。如果鍵和值都是字元串,可以使用與 key.serializer 一樣的序列化器。如果鍵是整數類型而值是字元扇 , 那麼需要使用不同的序列化器。

發送消息主要有3種方式

1、發送并忘記( fire-and-forget):我們把消息發送給伺服器,但井不關心它是否正常到達。大多數情況下,消息會正常到達,因為 Kafka是高可用的,而且生産者會自動嘗試重發。不過,使用這種方式有時候也會丢失一些消息。

2、同步發送:我們使用send()方怯發送消息, 它會傳回一個Future對象,調用get()方法進行等待, 就可以知道悄息是否發送成功。

3、異步發送:我們調用 send() 方怯,并指定一個回調函數, 伺服器在傳回響應時調用該函數。

在下面的幾個例子中 , 我們會介紹如何使用上述幾種方式來發送消息,以及如何處理可能 發生的異常情況。

本章的所有例子都使用單線程,但其實生産者是可以使用多線程來發送消息的。剛開始的 時候可以使用單個消費者和單個線程。如果需要更高的吞吐量,可以在生産者數量不變的 前提下增加線程數量。如果這樣做還不夠 , 可以增加生産者數量。

發送消息到Kafka

最簡單的同步發送消息方式如下所示 :

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
  producer.send(record);
} catch(Exception e) {
  e.printStack();
}      

生産者的 send() 方住将 ProducerRecord對象作為參數,它需要目标主題的名字和要發送的鍵和值對象,它們都是字元串。鍵和值對象的類型必須與序列化器和生産者對象相比對。

我們使用生産者的 send() 方越發送 ProducerRecord對象。從生産者的架構圖裡可以看到,消息先是被放進緩沖區,然後使用單獨的線程發送到伺服器端。 send() 方法會傳回一個包含 RecordMetadata 的 Future對象,不過因為我們會忽略傳回值,是以無法知道消息是否發送成功。如果不關心發送結果,那麼可以使用這種發送方式。比如,記錄 Twitter 消息日志,或記錄不太重要的應用程式日志。

我們可以忽略發送消息時可能發生的錯誤或在伺服器端可能發生的錯誤,但在發送消息之前,生産者還是有可能發生其他的異常。這些異常有可能是 SerializationException (說明序列化消息失敗)、 BufferExhaustedException 或 TimeoutException (說明緩沖區已滿),又或者是 InterruptException (說明發送線程被中斷)。

同步發送消息

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
    producer.send(record).get();
} catch(Exception e) {
    e.printStack();
}      

在這裡, producer.send() 方住先傳回一個 Future對象,然後調用 Future對象的 get() 方法等待 Kafka 響應。如果伺服器傳回錯誤, get()方怯會抛出異常。如果沒有發生錯誤,我們會得到一個 RecordMetadata對象,可以用它擷取消息的偏移量。如果在發送資料之前或者在發送過程中發生了任何錯誤 ,比如 broker傳回 了一個不允許重發消息的異常或者已經超過了重發的次數 ,那麼就會抛出異常。我們隻是簡單地把異常資訊列印出來。

如何處理從Kafka生産者傳回的錯誤

KafkaProducer一般會發生兩類錯誤。其中一類是可重試錯誤 ,這類錯誤可以通過重發消息來解決。比如對于連接配接錯誤,可以通過再次建立連接配接來解決,“無主(noleader)” 錯誤則可 以通過重新為分區選舉首領來解決。 KafkaProducer可以被配置成自動重試,如果在多次重試後仍無能解決問題,應用程式會收到一個重試異常。另一類錯誤無出通過重試解決 ,比如“消息太大”異常。對于這類錯誤, KafkaProducer不會進行任何重試,直接抛出異常。

異步發送消息

假設消息在應用程式和 Kafka叢集之間一個來回需要 10ms。如果在發送完每個消息後都等待回應,那麼發送 100個消息需要 1秒。但如果隻發送消息而不等待響應,那麼發送100個消息所需要的時間會少很多。大多數時候,我們并不需要等待響應——盡管 Kafka 會把目标主題、分區資訊和消息的偏移量發送回來,但對于發送端的應用程式來說不是必需的。不過在遇到消息發送失敗時,我們需要抛出異常、記錄錯誤日志,或者把消息寫入 “錯誤消息”檔案以便日後分析。

為了在異步發送消息的同時能夠對異常情況進行處理,生産者提供了回調支援 。下面是使用異步發送消息、回調的一個例子。

生産者的配置

到目前為止 , 我們隻介紹了生産者的幾個必要配置參數——bootstrap.servers API 以及序列化器。

生産者還有很多可配置的參數,在 Kafka文檔裡都有說明,它們大部分都有合理的預設值 , 是以沒有必要去修改它們 。不過有幾個參數在記憶體使用、性能和可靠性方面對生産者影響比較大,接下來我們會一一說明。

1. acks

acks 參數指定了必須要有多少個分區副本收到消息,生産者才會認為消息寫入是成功的。

這個參數對消息丢失的可能性有重要影響。 該參數有如下選項。

• 如果 acks=0, 生産者在成功寫入悄息之前不會等待任何來自伺服器的響應。也就是說, 如果當中出現了問題 , 導緻伺服器沒有收到消息,那麼生産者就無從得知,消息也就丢 失了。不過,因為生産者不需要等待伺服器的響應,是以它可以以網絡能夠支援的最大 速度發送消息,進而達到很高的吞吐量。

• 如果 acks=1,隻要叢集的首領節點收到消息,生産者就會收到 一個來自伺服器的成功 響應。如果消息無撞到達首領節點(比如首領節點崩憤,新的首領還沒有被選舉出來), 生産者會收到一個錯誤響應,為了避免資料丢失,生産者會重發消息。不過,如果一個 沒有收到消息的節點成為新首領,消息還是會丢失。這個時候的吞吐量取決于使用的是 同步發送還是異步發送。如果讓發送用戶端等待伺服器的響應(通過調用 Future對象 的 get()方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果用戶端使用異步回調,延遲問題就可以得到緩解,不過吞吐量還是會受發送中消息數量的限制(比如,生 産者在收到伺服器響應之前可以發送多少個消息)。

• 如果 acks=all,隻有當所有參與複制的節點全部收到消息時,生産者才會收到一個來自伺服器的成功響應。這種模式是最安全的,它可以保證不止一個伺服器收到消息,就算有伺服器發生崩潰,整個叢集仍然可以運作(第 5 章将讨論更多的細節)。不過,它的延遲比 acks=1時更高,因為我們要等待不隻一個伺服器節點接收消息。

2. buffer.memory

該參數用來設定生産者記憶體緩沖區的大小,生産者用它緩沖要發送到伺服器的消息。如果 應用程式發送消息的速度超過發送到伺服器的速度,會導緻生産者空間不足。這個時候, send()方法調用要麼被阻塞,要麼抛出異常,取決于如何設定 block.on.buffe.full 參數 (在0.9.0.0版本裡被替換成了max.block.ms,表示在抛出異常之前可以阻塞一段時間)。

3. compression.type

預設情況下,消息發送時不會被壓縮。該參數可以設定為 snappy、 gzip 或 lz4,它指定了消息被發送給 broker之前使用哪一種壓縮算法進行壓縮。 snappy 壓縮算怯由 Google巳發明, 它占用較少 的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。 gzip壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,是以如果網絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向 Kafka發送消息的瓶頸所在。

4. retries

生産者從伺服器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領)。在這種情況下, retries參數的值決定了生産者可以重發消息的次數,如果達到這個次數,生産者會放棄重試并傳回錯誤。預設情況下,生産者會在每次重試之間等待 1OOms,不過可以通過 retries.backoff.ms 參數來改變這個時間間隔。建議在設定重試次數和重試時間間隔之前, 先測試一下恢複一個崩潰節點需要多少時間(比如所有分區選舉出首領需要多長時間), 讓總的重試時間比 Kafka叢集從崩潰中恢複的時間長,否則生産者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦怯通過重試來解決(比如“悄息太大”錯誤)。一般情 況下,因為生産者會自動進行重試,是以就沒必要在代碼邏輯裡處理那些可重試的錯誤。 你隻需要處理那些不可重試的錯誤或重試次數超出上限的情況。

5. batch.size

當有多個消息需要被發送到同一個分區時,生産者會把它們放在放一個批次裡。該參數指定了一個批次可以使用的記憶體大小,按照位元組數計算(而不是消息個數)。當批次被填滿,批次裡的所有消息會被發送出去。不過生産者井不一定都會等到批次被填滿才發送,半捕 的批次,甚至隻包含一個消息的批次也有可能被發送。是以就算把批次大小設定得很大, 也不會造成延遲,隻是會占用更多的記憶體而已。但如果設定得太小,因為生産者需要更頻繁地發送消息,會增加一些額外的開銷。

6. linger.ms

該參數指定了生産者在發送批次之前等待更多消息加入批次的時間。 KafkaProducer 會在批次填滿或 linger.ms達到上限時把批次發送出去。預設情況下,隻要有可用的線程, 生産者就會把消息發送出去,就算批次裡隻有一個消息。把 linger.ms設定成比0大的數, 讓生産者在發送批次之前等待一會兒,使更多的消息加入到這個批次 。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發送更多的消息,每個消息的開銷就變小了)。

7. client.id

該參數可以是任意的字元串,伺服器會用它來識别消息的來源,還可以用在日志和配額名額裡。

8. max.in.flight.requests.per.connection

該參數指定了生産者在收到伺服器晌應之前可以發送多少個消息。它的值越高,就會占用越多的記憶體,不過也會提升吞吐量。 把它設為 1 可以保證消息是按照發送的順序寫入伺服器的,即使發生了重試。

9. timeout.ms、 request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms指定了生産者在發送資料時等待伺服器傳回響應的時間,metadata.fetch.timeout.ms指定了生産者在擷取中繼資料(比如目标分區的首領是誰)時等待伺服器傳回響應的時間。如果等待響應逾時,那麼生産者要麼重試發送資料,要麼傳回 一個錯誤 (抛出異常或執行回調)。timeout.ms 指定了 broker 等待同步副本傳回消息确認的時間,與 asks 的配置相比對一一如果在指定時間内沒有收到同步副本的确認,那麼 broker就會傳回 一個錯誤 。

10. max.block.ms

該參數指定了在調用 send() 方法或使用 parttitionFor() 方能擷取中繼資料時生産者的阻塞 時間。當生産者的發送緩沖區已捕,或者沒有可用的中繼資料時,這些方屈就會阻塞。在阻塞時間達到 max.block.ms時,生産者會抛出逾時異常。

11 . max.request.size

該參數用于控制生産者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求裡所有消息總的大小。例如,假設這個值為 1MB,那麼可以發送的單個最大消息為 1MB,或者生産者可以在單個請求裡發送一個批次,該批次包含了 1000個消息,每個消息大小為 1KB 。另外, broker對可接收的消息最大值也有自己的限制( message.max.bytes),是以兩邊的配置最好可以比對,避免生産者發送的消息被 broker拒絕 。

12. receive.buffer.bytes 和 send.buffer.bytes

這兩個參數分别指定了 TCP socket接收和發送資料包的緩沖區大小 。 如果它們被設為 -1 , 就使用作業系統的預設值。如果生産者或消費者與 broker處于不同的資料中心,那麼可以适當增大這些值,因為跨資料中心的網絡一般都有比較高的延遲和比較低的帶寬。

順序保證

Kafka可以保證同一個分區裡的消息是有序的。也就是說,如果生産者按照一定的順序發送消息, broker就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。在某些情況下 , 順序是非常重要的。如果把retries 設為非零整數,同時把 max.in.flight.requests.per.connection 設為比 1大的數,那麼,如果第一個批次消息寫入失敗,而第二個批次寫入成功, broker會重試寫入第一個批次。如果此時第一個批次也寫入成功,那 麼兩個批次的順序就反過來了。

一般來說,如果某些場景要求消息是有序的,那麼消息是否寫入成功也是 很關鍵的,是以不建議把順序是非常重要的。如果把retries 設為 0。可以把 max.in.flight.requests.per.connection設為 1,這樣在生産者嘗試發送第一批消息時,就不會有其他的消息發送給 broker。不過這樣會嚴重影響生産者的吞吐量 ,是以隻有在 對消息的順序有嚴格要求的情況下才能這麼做。