天天看點

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

1.第一個kafka程式

1.1.建立我們的主題

kafka-topics.bat --zookeeper localhost:2181/kafka --create --topic hello-kafka --replication-factor 1 --partitions 4

(主題不建立,可能會造成程式報錯,也可在程式中配置如:spring.kafka.listener.missing-topics-fatal=false

)

1.2.生産者發送消息

引入jar:

org.apache.kafka

kafka-clients

2.3.0

生産者代碼示例:

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

1.2.1.必選屬性

建立生産者對象時有三個屬性必須指定。

bootstrap.servers

該屬性指定 broker 的位址清單,位址的格式為 host:port。清單裡不需要包含所有的 broker 位址,生産者會從給定的 broker 裡查詢其他 broker 的資訊。

不過最少提供 2 個 broker 的資訊(用逗号分隔,比如: 127.0.0.1:9092,192.168.0.13:9092),一旦其中一個當機,生産者仍能連接配接到叢集上。

key.serializer

生産者接口允許使用參數化類型,可以把 Java 對象作為鍵和值傳 broker,但是 broker 希望收到的消息的鍵和值都是位元組數組,是以,必須提供将對象序列化成位元組數組的序列化器。key.serializer 必須設定為實作org.apache.kafka.common.serialization.Serializer 的接口類,Kafka 的用戶端預設提供了ByteArraySerializer,IntegerSerializer, StringSerializer,也可以實作自定義的序列化器。

value.serializer

同 key.serializer。

1.3.消費者接受消息

代碼示例:

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

1.3.1.必選參數

bootstrap.servers、key.serializer、value.serializer 含義同生産者

group.id

并非完全必需,它指定了消費者屬于哪一個群組,但是建立不屬于任何一個群組的消費者并沒有問題。

新版本特點:poll(Duration)這個版本修改了這樣的設計,會把中繼資料擷取也計入整個逾時時間(更加的合理)

1.4.示範示例

1.預設建立主題,隻有一個分區時,示範生産者和消費者情況。

2.修改主題分區為 2(使用管理指令),再重新示範生産者和消費者情況。

2.Kafka 的生産者

2.1.生産者發送消息的基本流程

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

從建立一個 ProducerRecord 對象開始, Producer Record 對象需要包含目标主題和要發送的内容。我們還可以指定鍵或分區。在發送 ProducerRecord對象時,生産者要先把鍵和值對象序列化成位元組數組,這樣它們才能夠在網絡上傳輸。

接下來,資料被傳給分區器。如果之前在 Producer Record 對象裡指定了分區,那麼分區器就不會再做任何事情,直接把指定的分區傳回。如果沒有指定分區,那麼分區器會根據 Producer Record 對象的鍵來選擇一個分區。選好分區以後,生産者就知道該往哪個主題和分區發送這條記錄了。緊接着,這條記錄被添加到一個記錄批次裡(雙端隊列,尾部寫入),這個批次裡的所有消息會被發送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。

伺服器在收到這些消息時會傳回一個響應。如果消息成功寫入 Kafka ,就傳回一個 RecordMetaData 對象,它包含了主題和分區資訊,以及記錄在分區裡的偏移量。如果寫入失敗, 則會傳回一個錯誤。生産者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還是失敗,就傳回錯誤資訊。

生産者發送消息一般會發生兩類錯誤:

一類是可重試錯誤,比如連接配接錯誤(可通過再次建立連接配接解決)、無主 no leader(可通過分區重新選舉首領解決)。

另一類是無法通過重試解決,比如“消息太大”異常,具體見 message.max.bytes,這類消息不會進行任何重試,直接抛出異常

2.2.Kafka 三種發送方式

我們通過生成者的 send 方法進行發送。send 方法會傳回一個包含 RecordMetadata 的 Future 對象。RecordMetadata 裡包含了目标主題,分區資訊和消息的偏移量。

2.2.1.發送并忘記

忽略 send 方法的傳回值,不做任何處理。大多數情況下,消息會正常到達,而且生産者會自動重試,但有時會丢失消息。

2.2.2.同步發送

獲得 send 方法傳回的 Future 對象,在合适的時候調用 Future 的 get 方法。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

2.2.3.異步發送

實作接口 org.apache.kafka.clients.producer.Callback,然後将實作類的執行個體作為參數傳遞給 send 方法。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

2.3.更多發送配置

生産者有很多屬性可以設定,大部分都有合理的預設值,無需調整。有些參數可能對記憶體使用,性能和可靠性方面有較大影響。可以參考org.apache.kafka.clients.producer 包下的 ProducerConfig 類。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

acks:

Kafk 内部的複制機制是比較複雜的,這裡不談論内部機制(後續章節進行細講),我們隻讨論生産者發送消息時與副本的關系。

指定了必須要有多少個分區副本收到消息,生産者才會認為寫入消息是成功的,這個參數對消息丢失的可能性有重大影響。

acks=0:生産者在寫入消息之前不會等待任 何來自伺服器的響應,容易丢消息,但是吞吐量高。

acks=1:隻要叢集的首領節點收到消息,生産者會收到來自伺服器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新首領沒有選舉出來),生産者會收到一個錯誤響應,為了避免資料丢失,生産者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丢失。預設使用這個配置。

acks=all:隻有當所有參與複制的節點都收到消息,生産者才會收到一個來自伺服器的成功響應。延遲高。

金融業務,主備外加異地災備。是以很多高可用場景一般不是設定 2 個副本,有可能達到 5 個副本,不同機架上部署不同的副本,異地上也部署一套副本。

buffer.memory

設定生産者記憶體緩沖區的大小(結合生産者發送消息的基本流程),生産者用它緩沖要發送到伺服器的消息。如果資料産生速度大于向 broker 發送的速度,導緻生産者空間不足,producer 會阻塞或者抛出異常。預設 33554432 (32M)

max.block.ms

指定了在調用 send()方法或者使用 partitionsFor()方法擷取中繼資料時生産者的阻塞時間。當生産者的發送緩沖區已滿,或者沒有可用的中繼資料時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生産者會抛出逾時異常。預設 60000ms

retries

發送失敗時,指定生産者可以重發消息的次數(預設 Integer.MAX_VALUE)。預設情況下,生産者在每次重試之間等待 100ms,可以通過參數retry.backoff.ms 參數來改變這個時間間隔。

receive.buffer.bytes 和 send.buffer.bytes

指定 TCP socket 接受和發送資料包的緩存區大小。如果它們被設定為-1,則使用作業系統的預設值。如果生産者或消費者處在不同的資料中心,那麼可以适當增大這些值,因為跨資料中心的網絡一般都有比較高的延遲和比較低的帶寬。預設 102400

batch.size

當多個消息被發送同一個分區時,生産者會把它們放在同一個批次裡。該參數指定了一個批次可以使用的記憶體大小,按照位元組數計算。當批次記憶體被填滿後,批次裡的所有消息會被發送出去。但是生産者不一定都會等到批次被填滿才發送,半滿甚至隻包含一個消息的批次也有可能被發送。預設16384(16k) ,如果一條消息超過了批次的大小,會寫不進去。

linger.ms

指定了生産者在發送批次前等待更多消息加入批次的時間。它和 batch.size 以先到者為先。也就是說,一旦我們獲得消息的數量夠 batch.size 的數量了,他将會立即發送而不顧這項設定,然而如果我們獲得消息位元組數比 batch.size 設定要小的多,我們需要“linger”特定的時間以擷取更多的消息。這個設定預設為 0,即沒有延遲。設定 linger.ms=5,例如,将會減少請求數目,但是同時會增加 5ms 的延遲,但也會提升消息的吞吐量。

compression.type

producer 用于壓縮資料的壓縮類型。預設是無壓縮。正确的選項值是 none、gzip、snappy。壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好。snappy 占用 cpu 少,提供較好的性能和可觀的壓縮比,如果比較關注性能和網絡帶寬,用這個。如果帶寬緊張,用 gzip,會占用較多的 cpu,但提供更高的壓縮比。

client.id

當向 server 送出請求時,這個字元串會發送給 server。目的是能夠追蹤請求源頭,以此來允許 ip/port 許可清單之外的一些應用可以發送資訊。這項應用可以設定任意字元串,因為沒有任何功能性的目的,除了記錄和跟蹤。

max.in.flight.requests.per.connection

指定了生産者在接收到伺服器響應之前可以發送多個消息,值越高,占用的記憶體越大,當然也可以提升吞吐量。發生錯誤時,可能會造成資料的發送順序改變,預設是 5 (修改)。

如果需要保證消息在一個分區上的嚴格順序,這個值應該設為 1。不過這樣會嚴重影響生産者的吞吐量。

request.timeout.ms

用戶端将等待請求的響應的最大時間,如果在這個時間内沒有收到響應,用戶端将重發請求;超過重試次數将抛異常,預設 30 秒。

metadata.fetch.timeout.ms

是指我們所擷取的一些中繼資料的第一個時間資料。中繼資料包含:topic,host,partitions。此項配置是指當等待中繼資料 fetch 成功完成所需要的時間,否則會跑出異常給用戶端

max.request.size

控制生産者發送請求最大大小。預設這個值為 1M,如果一個請求裡隻有一個消息,那這個消息不能大于 1M,如果一次請求是一個批次,該批次包含了 1000 條消息,那麼每個消息不能大于 1KB。注意:broker 具有自己對消息記錄尺寸的覆寫,如果這個尺寸小于生産者的這個設定,會導緻消息被拒絕。這個參數和 Kafka 主機的 message.max.bytes 參數有關系。如果生産者發送的消息超過 message.max.bytes 設定的大小,就會被 Kafka 伺服器拒絕。

以上參數不用全記住,一般來說,就記住 acks、batch.size、linger.ms、max.request.size 就行了,因為這 4 個參數重要些,其他參數一般沒有太大必要調整

2.4.順序保證

Kafka 可以保證同一個分區裡的消息是有序的。也就是說,發送消息時,主題隻有且隻有一個分區,同時生産者按照一定的順序發送消息, broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。在某些情況下, 順序是非常重要的。例如,往一個賬戶存入 100 元再取出來,這個與先取錢再存錢是截然不同的!不過,有些場景對順序不是很敏感。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

如果把 retires 設為非零整數,同時把 max.in.flight.requests.per.connection 設為比 1 大的數,那麼,如果第一個批次消息寫入失敗,而第二個批次寫入成功, broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。

一般來說,如果某些場景要求消息是有序的,那麼消息是否寫入成功也是很關鍵的,是以不建議把 retires 設為 0(不重試的話消息可能會因為連接配接關閉等原因會丢) 。是以還是需要重試,同時把 max.in.flight.request.per.connection 設為 1,這樣在生産者嘗試發送第一批消息時,就不會有其他的消息發

送給 broker 。不過這樣會嚴重影響生産者的吞吐量,是以隻有在對消息的順序有嚴格要求的情況下才能這麼做。

2.5.序列化

建立生産者對象必須指定序列化器,預設的序列化器并不能滿足我們所有的場景。我們完全可以自定義序列化器。隻要實作org.apache.kafka.common.serialization.Serializer 接口即可。

2.5.1.自定義序列化需要考慮的問題

自定義序列化容易導緻程式的脆弱性。舉例,在我們上面的實作裡,我們有多種類型的消費者,每個消費者對實體字段都有各自的需求,比如,有的将字段變更為 long 型,有的會增加字段,這樣會出現新舊消息的相容性問題。特别是在系統更新的時候,經常會出現一部分系統更新,其餘系統被迫跟着更新的情況。

解決這個問題,可以考慮使用自帶格式描述以及語言無關的序列化架構。比如 Protobuf,或者 Kafka 官方推薦的 Apache Avro。

Avro 會使用一個 JSON 檔案作為 schema 來描述資料,Avro 在讀寫時會用到這個 schema,可以把這個 schema 内嵌在資料檔案中。這樣,不管資料格式如何變動,消費者都知道如何處理資料。

但是内嵌的消息,自帶格式,會導緻消息的大小不必要的增大,消耗了資源。我們可以使用 schema 系統資料庫機制,将所有寫入的資料用到的 schema儲存在系統資料庫中,然後在消息中引用 schema 的辨別符,而讀取的資料的消費者程式使用這個辨別符從系統資料庫中拉取 schema 來反序列化記錄。

注意:Kafka 本身并不提供 schema 系統資料庫,需要借助第三方,現在已經有很多的開源實作,比如 Confluent Schema Registry,可以從 GitHub 上擷取。

如何使用參考如下網址:

https://cloud.tencent.com/developer/article/1336568

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

2.6.分區

我們在新增 ProducerRecord 對象中可以看到,ProducerRecord 包含了目标主題,鍵和值,Kafka 的消息都是一個個的鍵值對。鍵可以設定為預設的 null。

鍵的主要用途有兩個:一,用來決定消息被寫往主題的哪個分區,擁有相同鍵的消息将被寫往同一個分區,二,還可以作為消息的附加消息。

如果鍵值為 null,并且使用預設的分區器,分區器使用輪詢算法将消息均衡地分布到各個分區上。

如果鍵不為空,并且使用預設的分區器,Kafka 對鍵進行散列(Kafka 自定義的雜湊演算法,具體算法原理不知),然後根據散列值把消息映射到特定的分區上。很明顯,同一個鍵總是被映射到同一個分區。但是隻有不改變主題分區數量的情況下,鍵和分區之間的映射才能保持不變,一旦增加了新的分區,就無法保證了,是以如果要使用鍵來映射分區,那就要在建立主題的時候把分區規劃好,而且永遠不要增加新分區。

2.6.1.自定義分區器

某些情況下,資料特性決定了需要進行特殊分區,比如電商業務,北京的業務量明顯比較大,占據了總業務量的 20%,我們需要對北京的訂單進行單獨分區處理,預設的散列分區算法不合适了, 我們就可以自定義分區算法,對北京的訂單單獨處理,其他地區沿用散列分區算法。或者某些情況下,我們用 value 來進行分區。

3.Kafka 的消費者

3.1.消費者的入門

消費者的含義,同一般消息中間件中消費者的概念。在高并發的情況下,生産者産生消息的速度是遠大于消費者消費的速度,單個消費者很可能會負擔不起,此時有必要對消費者進行橫向伸縮,于是我們可以使用多個消費者從同一個主題讀取消息,對消息進行分流。(買單的故事,群組,消費者的一群人, 消費者:買單的,分區:一筆單,一筆單能被買單一次,當然一個消費者可以買多個單,如果有一個消費者挂掉了,另外的消費者接上)

3.2.消費者群組

Kafka 裡消費者從屬于消費者群組,一個群組裡的消費者訂閱的都是同一個主題,每個消費者接收主題一部分分區的消息。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

上圖,主題 T 有 4 個分區,群組中隻有一個消費者,則該消費者将收到主題 T1 全部 4 個分區的消息。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

如上圖,在群組中增加一個消費者 2,那麼每個消費者将分别從兩個分區接收消息,上圖中就表現為消費者 1 接收分區 1 和分區 3 的消息,消費者 2接收分區 2 和分區 4 的消息。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

如上圖,在群組中有 4 個消費者,那麼每個消費者将分别從 1 個分區接收消息。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

但是,當我們增加更多的消費者,超過了主題的分區數量,就會有一部分的消費者被閑置,不會接收到任何消息。

往消費者群組裡增加消費者是進行橫向伸縮能力的主要方式。是以我們有必要為主題設定合适規模的分區,在負載均衡的時候可以加入更多的消費者。但是要記住,一個群組裡消費者數量超過了主題的分區數量,多出來的消費者是沒有用處的。

如果是多個應用程式,需要從同一個主題中讀取資料,隻要保證每個應用程式有自己的消費者群組就行了。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

3.3.消費者配置

消費者有很多屬性可以設定,大部分都有合理的預設值,無需調整。有些參數可能對記憶體使用,性能和可靠性方面有較大影響。可以參考

org.apache.kafka.clients.consumer 包下 ConsumerConfig 類。

auto.offset.reset

消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下,如何處理。預設值是 latest,從最新的記錄開始讀取,另一個值是 earliest,表示消費者從起始位置讀取分區的記錄。

注意:如果是消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況(因消費者長時間失效,包含的偏移量記錄已經過時并被删除)下,預設值是 latest 的話,消費者将從最新的記錄開始讀取資料( 在消費者啟動之後生成的記錄),可以先啟動生産者,再啟動消費者,觀察到這種情況。觀察代

碼,在子產品 kafka-no-spring 下包 hellokafka 中。

enable .auto.commit

預設值 true,表明消費者是否自動送出偏移。為了盡量避免重複資料和資料丢失,可以改為 false,自行控制何時送出。

partition.assignment.strategy

分區配置設定給消費者的政策。系統提供兩種政策。預設為 Range。允許自定義政策。

Range

把主題的連續分區配置設定給消費者。(如果分區數量無法被消費者整除、第一個消費者會分到更多分區)

RoundRobin

把主題的分區循環配置設定給消費者。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

自定義政策

extends 類 AbstractPartitionAssignor,然後在消費者端增加參數:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 類.class.getName());即可。

max.poll.records

控制每次 poll 方法傳回的的記錄數量。

fetch.min.bytes

每次 fetch 請求時,server 應該傳回的最小位元組數。如果沒有足夠的資料傳回,請求會等待,直到足夠的資料才會傳回。預設為 1 個位元組。多消費者下,可以設大這個值,以降低 broker 的工作負載

fetch.wait.max.ms

如果沒有足夠的資料能夠滿足 fetch.min.bytes,則此項配置是指在應答 fetch 請求之前,server 會阻塞的最大時間。預設為 500 個毫秒。和上面的fetch.min.bytes 結合起來,要麼滿足資料的大小,要麼滿足時間,就看哪個條件先滿足。

max.partition.fetch.bytes

指定了伺服器從每個分區裡傳回給消費者的最大位元組數,預設 1MB。假設一個主題有 20 個分區和 5 個消費者,那麼每個消費者至少要有 4MB 的可用記憶體來接收記錄,而且一旦有消費者崩潰,這個記憶體還需更大。注意,這個參數要比伺服器的 message.max.bytes 更大,否則消費者可能無法讀取消息。

session.timeout.ms

如果 consumer 在這段時間内沒有發送心跳資訊,則它會被認為挂掉了。預設 3 秒。

client.id

當向 server 送出請求時,這個字元串會發送給 server。目的是能夠追蹤請求源頭,以此來允許 ip/port 許可清單之外的一些應用可以發送資訊。這項應用可以設定任意字元串,因為沒有任何功能性的目的,除了記錄和跟蹤。

receive.buffer.bytes 和 send.buffer.bytes

指定 TCP socket 接受和發送資料包的緩存區大小。如果它們被設定為-1,則使用作業系統的預設值。如果生産者或消費者處在不同的資料中心,那麼可以适當增大這些值,因為跨資料中心的網絡一般都有比較高的延遲和比較低的帶寬。

3.4.消費者中的基礎概念

消費者的含義,同一般消息中間件中消費者的概念。在高并發的情況下,生産者産生消息的速度是遠大于消費者消費的速度,單個消費者很可能會負擔不起,此時有必要對消費者進行橫向伸縮,于是我們可以使用多個消費者從同一個主題讀取消息,對消息進行分流。

(買單的故事,群組,消費者的一群人, 消費者:買單的,分區:一筆單,一筆單能被買單一次,當然一個消費者可以買多個單,如果有一個消費者挂掉了,另外的消費者接上)

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

3.4.1.訂閱

建立消費者後,使用 subscribe()方法訂閱主題,這個方法接受一個主題清單為參數,也可以接受一個正規表達式為參數;正規表達式同樣也比對多個主題。如果新建立了新主題,并且主題名字和正規表達式比對,那麼會立即觸發一次再均衡,消費者就可以讀取新添加的主題。比如,要訂閱所有和 test相關的主題,可以 subscribe(“tets.*”)

3.4.2.輪詢

為了不斷的擷取消息,我們要在循環中不斷的進行輪詢,也就是不停調用 poll 方法。

poll 方法的參數為逾時時間,控制 poll 方法的阻塞時間,它會讓消費者在指定的毫秒數内一直等待 broker 傳回資料。poll 方法将會傳回一個記錄(消息)清單,每一條記錄都包含了記錄所屬的主題資訊,記錄所在分區資訊,記錄在分區裡的偏移量,以及記錄的鍵值對。

poll 方法不僅僅隻是擷取資料,在新消費者第一次調用時,它會負責查找群組,加入群組,接受配置設定的分區。如果發生了再均衡,整個過程也是在輪詢期間進行的。

3.4.3.送出和偏移量

當我們調用 poll 方法的時候,broker 傳回的是生産者寫入 Kafka 但是還沒有被消費者讀取過的記錄,消費者可以使用 Kafka 來追蹤消息在分區裡的位置,我們稱之為 偏移量。消費者更新自己讀取到哪個消息的操作,我們稱之為 送出。

消費者是如何送出偏移量的呢?消費者會往一個叫做_consumer_offset 的特殊主題發送一個消息,裡面會包括每個分區的偏移量。

3.5.消費者中的核心概念

3.5.1.多線程安全問題

KafkaConsumer 的實作不是線程安全的,是以我們在多線程的環境下,使用 KafkaConsumer 的執行個體要小心,應該每個消費資料的線程擁有自己的

3.5.2.群組協調

消費者要加入群組時,會向群組協調器發送一個 JoinGroup 請求,第一個加入群主的消費者成為群主,群主會獲得群組的成員清單,并負責給每一個消費者配置設定分區。配置設定完畢後,群主把配置設定情況發送給群組協調器,協調器再把這些資訊發送給所有的消費者,每個消費者隻能看到自己的配置設定資訊,隻有群主知道群組裡所有消費者的配置設定資訊。群組協調的工作會在消費者發生變化(新加入或者掉線),主題中分區發生了變化(增加)時發生。

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

3.5.3.分區再均衡

當消費者群組裡的消費者發生變化,或者主題裡的分區發生了變化,都會導緻再均衡現象的發生。從前面的知識中,我們知道,Kafka 中,存在着消費者對分區所有權的關系,這樣無論是消費者變化,比如增加了消費者,新消費者會讀取原本由其他消費者讀取的分區,消費者減少,原本由它負責的分區要由其他消費者來讀取,增加了分區,哪個消費者來讀取這個新增的分區,這些行為,都會導緻分區所有權的變化,這種變化就被稱為 再均衡。

再均衡對 Kafka 很重要,這是消費者群組帶來高可用性和伸縮性的關鍵所在。不過一般情況下,盡量減少再均衡,因為再均衡期間,消費者是無法讀取消息的,會造成整個群組一小段時間的不可用。

消費者通過向稱為群組協調器的 broker(不同的群組有不同的協調器)發送心跳來維持它和群組的從屬關系以及對分區的所有權關系。如果消費者長時間不發送心跳,群組協調器認為它已經死亡,就會觸發一次再均衡。

在 0.10.1 及以後的版本中,心跳由單獨的線程負責,相關的控制參數為 max.poll.interval.ms。

3.6.Kafka 中的消費安全

一般情況下,我們調用 poll 方法的時候,broker 傳回的是生産者寫入 Kafka 同時 kafka 的消費者送出偏移量,這樣可以確定消費者消息消費不丢失也不重複,是以一般情況下 Kafka 提供的原生的消費者是安全的,但是事情會這麼完美嗎?

3.7.消費者送出偏移量導緻的問題

當我們調用 poll 方法的時候,broker 傳回的是生産者寫入 Kafka 但是還沒有被消費者讀取過的記錄,消費者可以使用 Kafka 來追蹤消息在分區裡的位置,我們稱之為 偏移量。消費者更新自己讀取到哪個消息的操作,我們稱之為 送出。

消費者是如何送出偏移量的呢?消費者會往一個叫做_consumer_offset 的特殊主題發送一個消息,裡面會包括每個分區的偏移量。發生了再均衡之後,消費者可能會被配置設定新的分區,為了能夠繼續工作,消費者者需要讀取每個分區最後一次送出的偏移量,然後從指定的地方,繼續做處理。

分區再均衡的例子:某軟體公司,有一個項目,有兩塊的工作,有兩個碼農,一個負責一塊,幹得好好的。突然一天,小王桌子一拍不幹了,老子中了 5 百萬了,不跟你們玩了,立馬收拾完電腦就走了。然後你今天剛好入職,一個蘿蔔一個坑,你就入坑了。這個過程我們就好比我們的分區再均衡,分區就是一個項目中的不同塊的工作,消費者就是碼農,一個碼農不玩了,另一個碼農立馬頂上,這個過程就發生了分區再均衡

1)如果送出的偏移量小于消費者實際處理的最後一個消息的偏移量,處于兩個偏移量之間的消息會被重複處理,

2)如果送出的偏移量大于用戶端處理的最後一個消息的偏移量,那麼處于兩個偏移量之間的消息将會丢失

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

是以, 處理偏移量的方式對用戶端會有很大的影響 。KafkaConsumer API 提供了很多種方式來送出偏移量 。

3.7.1.自動送出(重複消費不可避免)

最簡單的送出方式是讓消費者自動送出偏移量。如果 enable.auto.comnit 被設為 true,消費者會自動把從 poll()方法接收到的最大偏移量送出上去。

送出時間間隔由 auto.commit.interval.ms 控制,預設值是 5s。自動送出是在輪詢裡進行的,消費者每次在進行輪詢時會檢査是否該送出偏移量了,如果是,那麼就會送出從上一次輪詢傳回的偏移量。

不過,在使用這種簡便的方式之前,需要知道它将會帶來怎樣的結果。

假設我們仍然使用預設的5s送出時間間隔, 在最近一次送出之後的3s發生了再均衡,再均衡之後,消費者從最後一次送出的偏移量位置開始讀取消息。

這個時候偏移量已經落後了 3s,是以在這 3s 内到達的消息會被重複處理。可以通過修改送出時間間隔來更頻繁地送出偏移量, 減小可能出現重複消息的時間窗, 不過這種情況是無法完全避免的 。

在使用自動送出時,每次調用輪詢方法都會把上一次調用傳回的最大偏移量送出上去,它并不知道具體哪些消息已經被處理了,是以在再次調用之前最好確定所有目前調用傳回的消息都已經處理完畢(enable.auto.comnit 被設為 true 時,在調用 close()方法之前也會進行自動送出)。一般情況下不會有什麼

問題,不過在處理異常或提前退出輪詢時要格外小心。

自動送出雖然友善,但是很明顯是一種基于時間送出的方式,不過并沒有為我們留有餘地來避免重複處理消息。

3.7.2.手動送出(同步)

我們通過控制偏移量送出時間來消除丢失消息的可能性,并在發生再均衡時減少重複消息的數量。消費者 API 提供了另一種送出偏移量的方式,開發者可以在必要的時候送出目前偏移量,而不是基于時間間隔。

把 auto.commit. offset 設為 false,自行決定何時送出偏移量。使用 commitsync()送出偏移量最簡單也最可靠。這個方法會送出由 poll()方法傳回的最新偏移量,送出成功後馬上傳回,如果送出失敗就抛出異常。

注意:commitsync()将會送出由 poll()傳回的最新偏移量,是以在處理完所有記錄後要確定調用了 commitsync(),否則還是會有丢失消息的風險。如果發生了再均衡,從最近批消息到發生再均衡之間的所有消息都将被重複處理。

隻要沒有發生不可恢複的錯誤,commitSync()方法會阻塞,會一直嘗試直至送出成功,如果失敗,也隻能記錄異常日志。

3.7.3.異步送出

手動送出時,在 broker 對送出請求作出回應之前,應用程式會一直阻塞。這時我們可以使用異步送出 API,我們隻管發送送出請求,無需等待 broker的響應。

在成功送出或碰到無法恢複的錯誤之前, commitsync()會一直重試,但是 commitAsync 不會。它之是以不進行重試,是因為在它收到伺服器響應的時候,可能有一個更大的偏移量已經送出成功。

假設我們發出一個請求用于送出偏移量 2000,,這個時候發生了短暫的通信問題,伺服器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,并成功送出了偏移量 3000。如果 commitAsync()重新嘗試送出偏移量 2000,它有可能在偏移量 3000 之後送出成功。這個時候如果發生再均衡,

就會出現重複消息。

commitAsync()也支援回調,在 broker 作出響應時會執行回調。回調經常被用于記錄送出錯誤或生成度量名額。

3.7.4.同步和異步組合

因為同步送出一定會成功、異步可能會失敗,是以一般的場景是同步和異步一起來做。

一般情況下,針對偶爾出現的送出失敗,不進行重試不會有太大問題,因為如果送出失敗是因為臨時問題導緻的,那麼後續的送出總會有成功的。但如果這是發生在關閉消費者或 再均衡前的最後一次送出,就要確定能夠送出成功。

是以,在消費者關閉前一般會組合使用 commitAsync()和 commitsync()。具體使用,參見子產品 kafka-no-spring 下包 commit 包中代碼 SyncAndAsync。

3.7.5.特定送出

在我們前面的送出中,送出偏移量的頻率與處理消息批次的頻率是一樣的。但如果想要更頻繁地送出該怎麼辦?

如果 poll()方法傳回一大批資料,為了避免因再均衡引起的重複處理整批消息,想要在批次中間送出偏移量該怎麼辦?這種情況無法通過調用commitSync()或 commitAsync()來實作,因為它們隻會送出最後一個偏移量,而此時該批次裡的消息還沒有處理完。

消費者 API 允許在調用 commitsync()和 commitAsync()方法時傳進去希望送出的分區和偏移量的 map。假設我們處理了半個批次的消息,最後一個來自主題“customers”,分區 3 的消息的偏移量是 5000,你可以調用 commitsync()方法來送出它。不過,因為消費者可能不隻讀取一個分區,因為我們需要跟蹤所有分區的偏移量,是以在這個層面上控制偏移量的送出會讓代碼變複雜。

3.8.分區再均衡

kafka 重複消費和資料丢失_第一個kafka程式,詳談生産者消費者,順序消費重複消費問題...1.第一個kafka程式2.Kafka 的生産者3.Kafka 的消費者

3.8.1.再均衡監聽器

在送出偏移量一節中提到過,消費者在退出和進行分區再均衡之前,會做一些清理工作比如,送出偏移量、關閉檔案句柄、資料庫連接配接等。

在為消費者配置設定新分區或移除舊分區時,可以通過消費者 API 執行一些應用程式代碼,在調用 subscribe()方法時傳進去一個 ConsumerRebalancelistener執行個體就可以了。

ConsumerRebalancelistener 有兩個需要實作的方法。

1) public void onPartitionsRevoked( Collection< TopicPartition> partitions)方法會在再均衡開始之前和消費者停止讀取消息之後被調用。如果在這裡送出偏移量,下一個接管分區的消費者就知道該從哪裡開始讀取了

2) public void onPartitionsAssigned( Collection< TopicPartition> partitions)方法會在重新配置設定分區之後和消費者開始讀取消息之前被調用。

3.8.2.從特定偏移量處開始記錄

到目前為止,我們知道了如何使用 poll()方法從各個分區的最新偏移量處開始處理消息。不過,有時候我們也需要從特定的偏移量處開始讀取消息。

如果想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息,可以使 seekToBeginning(Collectiontp)和seekToEnd( Collectiontp)這兩個方法。

不過,Kaka 也為我們提供了用于查找特定偏移量的 API。它有很多用途,比如向後回退幾個消息或者向前跳過幾個消息(對時間比較敏感的應用程式在處理滞後的情況下希望能夠向前跳過若幹個消息)。在使用 Kafka 以外的系統來存儲偏移量時,它将給我們帶來更大的驚喜--讓消息的業務處理和偏移量的提

交變得一緻。試想一下這樣的場景:應用程式從 Kaka 讀取事件(可能是網站的使用者點選事件流),對它們進行處理(可能是使用自動程式清理點選操作并添加會話資訊),然後把結果儲存到資料庫。假設我們真的不想丢失任何資料,也不想在資料庫裡多次儲存相同的結果。

我們可能會,毎處理一條記錄就送出一次偏移量。盡管如此,在記錄被儲存到資料庫之後以及偏移量被送出之前,應用程式仍然有可能發生崩潰,導緻重複處理資料,資料庫裡就會出現重複記錄。

如果儲存記錄和偏移量可以在一個原子操作裡完成,就可以避免出現上述情況。記錄和偏移量要麼都被成功送出,要麼都不送出。如果記錄是儲存在資料庫裡而偏移量是送出到Kafka上,那麼就無法實作原子操作不過,如果在同一個事務裡把記錄和偏移量都寫到資料庫裡會怎樣呢?那麼我們就會知道記錄和偏移量要麼都成功送出,要麼都沒有,然後重新處理記錄。

現在的問題是:如果偏移量是儲存在資料庫裡而不是 Kafka 裡,那麼消費者在得到新分區時怎麼知道該從哪裡開始讀取?這個時候可以使用 seek()方法。在消費者啟動或配置設定到新分區時,可以使用 seck()方法查找儲存在資料庫裡的偏移量。我們可以使用使用 Consumer Rebalancelistener 和 seek()方法確定我們是從資料庫裡儲存的偏移量所指定的位置開始處理消息的。

3.9.優雅退出

如果确定要退出循環,需要通過另一個線程調用 consumer. wakeup()方法。如果循環運作在主線程裡,可以在 ShutdownHook 裡調用該方法。要記住,consumer. wakeup()是消費者唯一一個可以從其他線程裡安全調用的方法。調用 consumer. wakeup()可以退出 poll(),并抛出 WakeupException 異常。我們不需要處理 Wakeup Exception,因為它隻是用于跳出循環的一種方式。不過,在退出線程之前調用 consumer.close()是很有必要的,它會送出任何還沒有送出的東西,并向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡,而不需要等待會話逾時。

3.10.反序列化

不過就是序列化過程的一個反向,原理和實作可以參考生産者端的實作,同樣也可以自定義反序列化器。

3.11.獨立消費者

到目前為止,我們讨論了消費者群組,分區被自動配置設定給群組裡的消費者,在群組裡新增或移除消費者時自動觸發再均衡。不過有時候可能隻需要一個消費者從一個主題的所有分區或者某個特定的分區讀取資料。這個時候就不需要消費者群組和再均衡了,隻需要把主題或者分區配置設定給消費者,然後開始讀取消息并送出偏移量。

如果是這樣的話,就不需要訂閱主題,取而代之的是為自己配置設定分區。一個消費者可以訂閱主題(并加入消費者群組),或者為自己配置設定分區,但不能同時做這兩件事情。

獨立消費者相當于自己來配置設定分區,但是這樣做的好處是自己控制,但是就沒有動态的支援了,包括加入消費者(分區再均衡之類的),新增分區,這些都需要代碼中去解決,是以一般情況下不推薦使用。