天天看點

python kafka offset_Kafka送出offset機制

在kafka的消費者中,有一個非常關鍵的機制,那就是offset機制。它使得Kafka在消費的過程中即使挂了或者引發再均衡問題重新配置設定Partation,當下次重新恢複消費時仍然可以知道從哪裡開始消費。它好比看一本書中的書簽标記,每次通過書簽标記(offset)就能快速找到該從哪裡開始看(消費)。

Kafka對于offset的處理有兩種送出方式:(1) 自動送出(預設的送出方式) (2) 手動送出(可以靈活地控制offset)

(1) 自動送出偏移量:

Kafka中偏移量的自動送出是由參數enable_auto_commit和auto_commit_interval_ms控制的,當enable_auto_commit=True時,Kafka在消費的過程中會以頻率為auto_commit_interval_ms向Kafka自帶的topic(__consumer_offsets)進行偏移量送出,具體送出到哪個Partation是以算法:partation=hash(group_id)%50來計算的。

如:group_id=test_group_1,則partation=hash("test_group_1")%50=28

自動送出偏移量示例:

1 importpickle2 importuuid3 from kafka importKafkaConsumer4

5 consumer =KafkaConsumer(6 bootstrap_servers=['192.168.33.11:9092'],7 group_id="test_group_1",8 client_id="{}".format(str(uuid.uuid4())),9 max_poll_records=500,10 enable_auto_commit=True, #預設為True 表示自動送出偏移量

11 auto_commit_interval_ms=100, #控制自動送出偏移量的頻率 機關ms 預設是5000ms

12 key_deserializer=lambdak: pickle.loads(k),13 value_deserializer=lambdav: pickle.loads(v)14 )15

16 #訂閱消費round_topic這個主題

17 consumer.subscribe(topics=('round_topic',))18

19 try:20 whileTrue:21 consumer_records_dict = consumer.poll(timeout_ms=1000)22

23 #consumer.assignment()可以擷取每個分區的offset

24 for partition inconsumer.assignment():25 print('主題:{} 分區:{},需要從下面的offset開始消費:{}'.format(26 str(partition.topic),27 str(partition.partition),28 consumer.position(partition)29 ))30

31 #處理邏輯.

32 for k, record_list inconsumer_records_dict.items():33 print(k)34 for record inrecord_list:35 print("topic = {},partition = {},offset = {},key = {},value = {}".format(36 record.topic, record.partition, record.offset, record.key, record.value)37 )38

39 finally:40 #調用close方法的時候會觸發偏移量的自動送出 close預設autocommit=True

41 consumer.close()

傳回結果:

python kafka offset_Kafka送出offset機制

在上述代碼中,最後調用consumer.close()時候也會觸發自動送出,因為它預設autocommit=True,源碼如下:

1 def close(self, autocommit=True):2 """Close the consumer, waiting indefinitely for any needed cleanup.3

4 Keyword Arguments:5 autocommit (bool): If auto-commit is configured for this consumer,6 this optional flag causes the consumer to attempt to commit any7 pending consumed offsets prior to close. Default: True8 """

9 ifself._closed:10 return

11 log.debug("Closing the KafkaConsumer.")12 self._closed =True13 self._coordinator.close(autocommit=autocommit)14 self._metrics.close()15 self._client.close()16 try:17 self.config['key_deserializer'].close()18 exceptAttributeError:19 pass

20 try:21 self.config['value_deserializer'].close()22 exceptAttributeError:23 pass

24 log.debug("The KafkaConsumer has closed.")

對于自動送出偏移量,如果auto_commit_interval_ms的值設定的過大,當消費者在自動送出偏移量之前異常退出,将導緻kafka未送出偏移量,進而出現重複消費的問題,是以建議auto_commit_interval_ms的值越小越好。

(2) 手動送出偏移量:

鑒于Kafka自動送出offset的不靈活性和不精确性(隻能是按指定頻率的送出),Kafka提供了手動送出offset政策。手動送出能對偏移量更加靈活精準地控制,以保證消息不被重複消費以及消息不被丢失。

對于手動送出offset主要有3種方式:1.同步送出 2.異步送出 3.異步+同步 組合的方式送出

1.同步手動送出偏移量

同步模式下送出失敗的時候一直嘗試送出,直到遇到無法重試的情況下才會結束,同時同步方式下消費者線程在拉取消息會被阻塞,在broker對送出的請求做出響應之前,會一直阻塞直到偏移量送出操作成功或者在送出過程中發生異常,限制了消息的吞吐量。

1 """

2 同步的方式10W條消息 4.58s3 """

4

5 importpickle6 importuuid7 importtime8 from kafka importKafkaConsumer9

10 consumer =KafkaConsumer(11 bootstrap_servers=['192.168.33.11:9092'],12 group_id="test_group_1",13 client_id="{}".format(str(uuid.uuid4())),14 enable_auto_commit=False, #設定為手動送出偏移量.

15 key_deserializer=lambdak: pickle.loads(k),16 value_deserializer=lambdav: pickle.loads(v)17 )18

19 #訂閱消費round_topic這個主題

20 consumer.subscribe(topics=('round_topic',))21

22 try:23 start_time =time.time()24 whileTrue:25 consumer_records_dict = consumer.poll(timeout_ms=100) #在輪詢中等待的毫秒數

26 print("擷取下一輪")27

28 record_num =029 for key, record_list inconsumer_records_dict.items():30 for record inrecord_list:31 record_num += 1

32 print("---->目前批次擷取到的消息個數是:{}<----".format(record_num))33 record_num =034

35 for k, record_list inconsumer_records_dict.items():36 for record inrecord_list:37 print("topic = {},partition = {},offset = {},key = {},value = {}".format(38 record.topic, record.partition, record.offset, record.key, record.value)39 )40

41 try:42 #輪詢一個batch 手動送出一次

43 consumer.commit() #送出目前批次最新的偏移量. 會阻塞 執行完後才會下一輪poll

44 end_time =time.time()45 time_counts = end_time -start_time46 print(time_counts)47 exceptException as e:48 print('commit failed', str(e))49

50 finally:51 consumer.close() #手動送出中close對偏移量送出沒有影響

python kafka offset_Kafka送出offset機制

從上述可以看出,每輪循一個批次,手動送出一次,隻有目前批次的消息送出完成時才會觸發poll來擷取下一輪的消息,經測試10W條消息耗時4.58s

2.異步手動送出偏移量+回調函數

異步手動送出offset時,消費者線程不會阻塞,送出失敗的時候也不會進行重試,并且可以配合回調函數在broker做出響應的時候記錄錯誤資訊。

1 """

2 異步的方式手動送出偏移量(異步+回調函數的模式) 10W條消息 3.09s3 """

4

5 importpickle6 importuuid7 importtime8 from kafka importKafkaConsumer9

10 consumer =KafkaConsumer(11 bootstrap_servers=['192.168.33.11:9092'],12 group_id="test_group_1",13 client_id="{}".format(str(uuid.uuid4())),14 enable_auto_commit=False, #設定為手動送出偏移量.

15 key_deserializer=lambdak: pickle.loads(k),16 value_deserializer=lambdav: pickle.loads(v)17 )18

19 #訂閱消費round_topic這個主題

20 consumer.subscribe(topics=('round_topic',))21

22

23 def _on_send_response(*args, **kwargs):24 """

25 送出偏移量涉及回調函數26 :param args: args[0] --> {TopicPartition:OffsetAndMetadata} args[1] --> Exception27 :param kwargs:28 :return:29 """

30 if isinstance(args[1], Exception):31 print('偏移量送出異常. {}'.format(args[1]))32 else:33 print('偏移量送出成功')34

35

36 try:37 start_time =time.time()38 whileTrue:39 consumer_records_dict = consumer.poll(timeout_ms=10)40

41 record_num =042 for key, record_list inconsumer_records_dict.items():43 for record inrecord_list:44 record_num += 1

45 print("目前批次擷取到的消息個數是:{}".format(record_num))46

47 for record_list inconsumer_records_dict.values():48 for record inrecord_list:49 print("topic = {},partition = {},offset = {},key = {},value = {}".format(50 record.topic, record.partition, record.offset, record.key, record.value))51

52 #避免頻繁送出

53 if record_num !=0:54 try:55 consumer.commit_async(callback=_on_send_response)56 exceptException as e:57 print('commit failed', str(e))58

59 record_num =060

61 finally:62 consumer.close()

python kafka offset_Kafka送出offset機制

對于args參數:args[0]是一個dict,key是TopicPartition,value是OffsetAndMetadata,表示該主題下的partition對應的offset;args[1]在送出成功是True,送出失敗時是一個Exception類。

對于異步送出,由于不會進行失敗重試,當消費者異常關閉或者觸發了再均衡前,如果偏移量還未送出就會造成偏移量丢失。

3.異步+同步 組合的方式送出偏移量

針對異步送出偏移量丢失的問題,通過對消費者進行異步批次送出并且在關閉時同步送出的方式,這樣即使上一次的異步送出失敗,通過同步送出還能夠進行補救,同步會一直重試,直到送出成功。

1 """

2 同步和異步組合的方式送出偏移量3 """

4

5 importpickle6 importuuid7 importtime8 from kafka importKafkaConsumer9

10 consumer =KafkaConsumer(11 bootstrap_servers=['192.168.33.11:9092'],12 group_id="test_group_1",13 client_id="{}".format(str(uuid.uuid4())),14 enable_auto_commit=False, #設定為手動送出偏移量.

15 key_deserializer=lambdak: pickle.loads(k),16 value_deserializer=lambdav: pickle.loads(v)17 )18

19 #訂閱消費round_topic這個主題

20 consumer.subscribe(topics=('round_topic',))21

22

23 def _on_send_response(*args, **kwargs):24 """

25 送出偏移量涉及的回調函數26 :param args:27 :param kwargs:28 :return:29 """

30 if isinstance(args[1], Exception):31 print('偏移量送出異常. {}'.format(args[1]))32 else:33 print('偏移量送出成功')34

35

36 try:37 start_time =time.time()38 whileTrue:39 consumer_records_dict = consumer.poll(timeout_ms=100)40

41 record_num =042 for key, record_list inconsumer_records_dict.items():43 for record inrecord_list:44 record_num += 1

45 print("---->目前批次擷取到的消息個數是:<----".format(record_num))46 record_num =047

48 for k, record_list inconsumer_records_dict.items():49 print(k)50 for record inrecord_list:51 print("topic = {},partition = {},offset = {},key = {},value = {}".format(52 record.topic, record.partition, record.offset, record.key, record.value)53 )54

55 try:56 #輪詢一個batch 手動送出一次

57 consumer.commit_async(callback=_on_send_response)58 end_time =time.time()59 time_counts = end_time -start_time60 print(time_counts)61 exceptException as e:62 print('commit failed', str(e))63

64 exceptException as e:65 print(str(e))66 finally:67 try:68 #同步送出偏移量,在消費者異常退出的時候再次送出偏移量,確定偏移量的送出.

69 consumer.commit()70 print("同步補救送出成功")71 exceptException as e:72 consumer.close()

通過finally在最後不管是否異常都會觸發consumer.commit()來同步補救一次,確定偏移量不會丢失