天天看點

Kafka offset管理

Kafka offset管理

消費者在消費的過程中需要記錄自己消費了多少資料,即消費 Offset。Kafka Offset 是Consumer Position,與 Broker 和 Producer 都無關。每個 Consumer Group、每個 Topic 的每個Partition 都有各自的 Offset,如下圖所示。

Kafka offset管理

通常由如下幾種 Kafka Offset 的管理方式:

  • Spark Checkpoint:在 Spark Streaming 執行Checkpoint 操作時,将 Kafka Offset 一并儲存到 HDFS 中。這種方式的問題在于:當 Spark Streaming 應用更新或更新時,以及當Spark 本身更新時,Checkpoint 可能無法恢複。因而,不推薦采用這種方式。
  • HBASE、Redis 等外部 NOSQL 資料庫:這一方式可以支援大吞吐量的 Offset 更新,但它最大的問題在于:使用者需要自行編寫 HBASE 或 Redis 的讀寫程式,并且需要維護一個額外的元件。
  • ZOOKEEPER:老版本的位移offset是送出到zookeeper中的,目錄結構是 :/consumers/<group.id>/offsets/ <topic>/<partitionId> ,但是由于 ZOOKEEPER 的寫入能力并不會随着 ZOOKEEPER 節點數量的增加而擴大,因而,當存在頻繁的 Offset 更新時,ZOOKEEPER 叢集本身可能成為瓶頸。因而,不推薦采用這種方式。
  • KAFKA 自身的一個特殊 Topic(__consumer_offsets)中:這種方式支援大吞吐量的Offset 更新,又不需要手動編寫 Offset 管理程式或者維護一套額外的叢集,因而是迄今為止最為理想的一種實作方式。
Kafka offset管理

另外幾個與 Kafka Offset 管理相關的要點如下:

  1. Kafka 預設是定期幫你自動送出位移的(enable.auto.commit=true)。有時候,我們需要采用自己來管理位移送出,這時候需要設定 enable.auto.commit=false。
  2. 屬性 auto.offset.reset 值含義解釋如下:
    1. earliest :當各分區下有已送出的 Offset 時,從“送出的 Offset”開始消費;無送出的Offset 時,從頭開始消費;
    2. latest : 當各分區下有已送出的 Offset 時,從送出的 Offset 開始消費;無送出的 Offset時,消費新産生的該分區下的數
    3. none : Topic 各分區都存在已送出的 Offset 時,從 Offset 後開始消費;隻要有一個分區不存在已送出的 Offset,則抛出異常。 

kafka-0.10.1.X版本之前: auto.offset.reset 的值為smallest,和,largest.(offest儲存在zk中);

kafka-0.10.1.X版本之後: auto.offset.reset 的值更改為:earliest,latest,和none (offest儲存在kafka的一個特殊的topic名為:__consumer_offsets裡面);

與 Kakfa Offset 管理相關的幾條 SHELL 指令示例如下:

顯示group.id為kline的consumer group的目前offset

kafka-consumer-groups –bootstrap-server host:9092 –group cgroup –describe

查将group-id為cgroup的consumer group的所有partition的目前offset指向第1000條消息

kafka-consumer-groups --bootstrap-server host:9092 --group cgroup --topic test --reset-offsets --to-offset 1000 –execute

參考:

http://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

繼續閱讀