天天看點

kafka auto.offset.reset latest earliest 詳解

一,latest和earliest差別

1,earliest 當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,從頭開始消費

2,latest 當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,消費新産生的該分區下的資料

送出過offset,latest和earliest沒有差別,但是在沒有送出offset情況下,用latest直接會導緻無法讀取舊資料。

二,建立topic

  1. # bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank  
  2. Created topic "tank".  
  3. # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank  
  4. Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:  
  5.  Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2  
  6.  Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0  
  7.  Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1  

三,生産資料和接收生産資料

  1. [root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank  
  2. >1  
  3. >2  
  4. >3  
  5. >4  
  6. >5  
  7. >6  
  8. 。。。。。。。。。省略。。。。。。。。。  
  9. [root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning  
  10. 1  
  11. 2  
  12. 3  
  13. 4  
  14. 5  
  15. 6  
  16. 。。。。。。。。省略。。。。。。。。  

四,測試代碼

  1. object tank {  
  2.     def main(args: Array[String]): Unit = {  
  3.         val pros: Properties = new Properties  
  4.         pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")  
  5.         /*分組由消費者決定,完全自定義,沒有要求*/  
  6.         pros.put("group.id", "tank")  
  7.         //設定為true 表示offset自動托管到kafka内部的一個特定名稱為__consumer_offsets的topic  
  8.         pros.put("enable.auto.commit", "false")  
  9.         pros.put("auto.commit.interval.ms", "1000")  
  10.         pros.put("max.poll.records", "5")  
  11.         pros.put("session.timeout.ms", "30000")  
  12.         //隻有當offset不存在的時候,才用latest或者earliest  
  13.         pros.put("auto.offset.reset", "latest")  
  14.         pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  
  15.         pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  
  16.         val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)  
  17.         /*這裡填寫主題名稱*/  
  18.         consumer.subscribe(util.Arrays.asList("tank"))  
  19.         val system = akka.actor.ActorSystem("system")  
  20.         system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))  
  21.     }  
  22.     object tankTest {  
  23.         def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {  
  24.             val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))  
  25.             if (!records.isEmpty) {  
  26.                 for (record <- records) {  
  27.                     if (record.value != null && !record.value.equals("")) {  
  28.                         myLog.syncLog(record.value + "\t準備開啟消費者出列資料", "kafka", "get")  
  29.                     }  
  30.                 }  
  31.                 consumer.commitSync()  
  32.             }  
  33.         }  
  34. }  

五,測試1,過程如下

1,檢視offset

  1. # bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe  
  2. Error: Consumer group 'tank' does not exist.  

在沒有送出offset的情況,會報這個錯誤

2,latest模式運作,拉取不到資料

2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-1 to offset 11.

2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-0 to offset 11.

2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-2 to offset 11.

3,再用kafka-console-producer.sh生産資料,latest是可以拉到的,并且是拉取最新的資料(程式運作以後的資料),以前送出的資料是拉取不到的。

4,檢視offset不報錯了

  1. Consumer group 'tank' has no active members.  
  2. TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID  
  3. tank            1          12              14              2               -               -               -  
  4. tank            0          12              14              2               -               -               -  
  5. tank            2          13              15              2               -               -               -  

5,将auto.offset.reset設定成earliest,第一次生産的資料也取不到

在這裡要注意:如果kafka隻接收資料,從來沒來消費過,程式一開始不要用latest,不然以前的資料就接收不到了。應當先earliest,然後二都都可以。

六,測試2

1,重新建立topic,重複上面的第二,第三步

2,代碼端先earliest,最早送出的資料是可以擷取到的,再生産資料也是可以擷取到的。

3,将auto.offset.reset設定成latest,再生産資料也是可以擷取到的。

七,結論