天天看點

kafka消費者如何才能從頭開始消費某個topic的全量資料

消費者要從頭開始消費某個topic的全量資料,需要滿足2個條件(spring-kafka):

(1)使用一個全新的"group.id"(就是之前沒有被任何消費者使用過);

(2)指定"auto.offset.reset"參數的值為earliest;      

對應的spring-kafka消費者用戶端配置參數為:

<!-- 指定消費組名 -->
<entry key="group.id" value="fg11"/>
<!-- 從何處開始消費,latest 表示消費最新消息,earliest 表示從頭開始消費,none表示抛出異常,預設latest -->
<entry key="auto.offset.reset" value="earliest"/>      

注意:從kafka-0.9版本及以後,kafka的消費者組和offset資訊就不存zookeeper了,而是存到broker伺服器上,是以,如果你為某個消費者指定了一個消費者組名稱(group.id),那麼,一旦這個消費者啟動,這個消費者組名和它要消費的那個topic的offset資訊就會被記錄在broker伺服器上。

比如我們為消費者A指定了消費者組(group.id)為fg11,那麼可以使用如下指令檢視消費者組的消費情況:

bin/kafka-consumer-groups.sh --bootstrap-server 172.17.6.10:9092 --describe --group fg11      

顯示結果如下:

TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                      HOST              CLIENT-ID 6               6               0    consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4  /192.168.207.127  consumer-1
friend  1          2               2               0    consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4  /192.168.207.127  consumer-1
friend  2          4               4               0    consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4  /192.168.207.127  consumer-1      

其實friend這個topic共有3個分區,消息總數為12條,其實在消費者A啟動之前,這12條消息已經被其他某個組的消費者消費過了。而我們雖然為消費者A指定了一個全新的group.id為fg11,但是如果我們在啟動消費者A之前,指定的"auto.offset.reset"參數的值是latest而不是earliest的話(就算你停止消費者,然後改為earliest也是沒有用的),啟動之後它将不會消費以前的消息,除非friend這個topic的分區中有了新的消息它才會消費。

是以一定要在消費者啟動之前就保證group.id是全新的,而且要指定earliest而不是latest。

繼續閱讀