天天看點

Consumer位移管理-Kafka從入門到精通(十一)

上篇文章說了,sesstion.time.out 、max.poll.interval.ms、max.poll.records和auto.offset.reset等參數。

​​KafkaConsumer-Kafka從入門到精通(十)

訂閱topic

訂閱consumer直接:

Consumer.subscribe(Arrays.asList(“topic1”,“topic2”));

如果使用獨立的consumer(standalone consumer),則可以手動訂閱,

TopicPartition p1 = new TopicPartition(“topic-name”,0);

Consumer.assign(Arrays.asList(p1));

不管用哪種方法,consumer訂閱都是延遲生效的,訂閱的消息隻有在下次poll調用的時候才會生效。

消息輪詢

Poll原理

consumer是用來讀取消息的,而且要能夠同時讀取多個topic的多個分區消息。若要實作并行讀取消息,一種方式使用多線程方式,為每個要讀取的分區都要建立一個專有線程去消費(這就是舊版本cousumer采用的方式),另一種方法采用linuxI/O模型的poll或者select等,使用一個線程同時管理多個socket連接配接,即同時與多個broker通信實作并行讀取。

一旦consumer訂閱了topic,所有的消費邏輯包括coordinator的協調,消費者組的rebalance以及資料的擷取會在主邏輯poll方法中一次調用中被執行,這樣使用者很容易使用一個線程來管理所有的cousumerIO。

對于問題,consumer到底是單線程還是多線程呢?

最新版的kafka是一個多線程或者雙線程的java程序,建立kafkaConsumer的稱為主線程,同時在背景建立一個心跳線程,該線程被稱呼為背景心跳線程。

kafkaConsumer的poll方法在使用者主線程中運作,這同時也表明:消費者組的rebalance、消息擷取、coordinator管理、異步任務結果的處理、位移送出等操作這些都在主線程中的,是以仔細調優參數至關重要。

Poll使用方法

Consumer訂閱topic之後通常以事件循環的方法來擷取消息讀取,poll方法根據目前consumer的消費位移傳回消息集合。當poll首次被調用的時候,新的消費者組會根據位移重設政策(auto.offset.reset)來設定消費者組的位移,一旦consumer開始送出位移,後續的rebalance完成後會将位置設定為上次已送出的位移。傳遞給poll方法的逾時設定參數用于控制consumer等待消息的最大阻塞時間,比如consumer至少需要1M的資料,那麼此刻consumer會以阻塞不斷累計資料到滿足1M,如果不想讓consumer一直阻塞,則可以給一個過期時間,一定時間内如果還沒有滿足,則傳回。

  1. 要麼等資料滿了。
  2. 要麼等待時間超過了指定時間。

前面我們說了consumer是單線程設計(其實還有一個心跳線程,輔助線程看主線程是否保持心跳,暫不考慮,不承擔邏輯),是以consumer應該運作在他的專屬線程中。新版本的java consumer不是線程安全的,如果沒有顯式的同步鎖機制保護,kafka會抛出kafkaConsumer is not safe for multi-threaded access

的異常,如果看到了這樣的報錯,那麼說明kafkaConsumer運用在多線程中,對于目前的kafka設計而言,是不被允許的。

我們可以在while條件指定一個布爾變量isRunning來辨別是否需要退出consumer消費循環并且結束consumer應用。具體應該是将isRunning辨別為volatile,然後其他線程用isRunning=false來控制線程結束。最後千萬不要忘記關閉consumer,這不僅會清楚consumer建立的各種socker資源,還會通知消費者coordinator主動離開進而更快的rebalance。比較推薦在finally代碼裡顯式關閉。

位移管理

Consumer位移

Consumer端要為每個它讀取的分區儲存消費進度,即分區中最新消費消息的位置,該位置就是offset。Consumer需要定期向kafka送出自己的位置資訊,實際上,這個資訊通常是下一條帶消費消息的位置。假設consumer已經讀取了某個分區第n條消息,那麼他應該送出位移為N,因為位移是從0開始,位移n的位子是n+1條消息。這樣conusmer重新開機時會從第n+1條開始消費。

Offset對于consumer非常重要,因為他們是實作消息傳遞語義保證(message delivery semantic)的基石,常見的3中消息傳遞語義保證。

  1. 最多一次(at most once)處理語義:消息可能丢失,但不會被重複處理。
  2. 最少一次(at least once)處理語義:消息不會丢失,但可能處理多次。
  3. 精确一次(exactly)處理語義:消息一定會被處理且隻會處理一次。

顯然,若consumer在消費之前就送出位移,那麼多在位移送出完成之後,消費還未消費就崩潰了,這時候consumer重新開機,則會從新的位移開始消費,則這個已送出的位移會丢失。相反的,若consumer在消費之後再送出,則可以實作at least once。好消息是這個出現多次處理的情況,已經在kafka0.11.0.0版本得到解決。

上次送出位移(last committed offset):consumer最近一次送出的offset值。

目前位置(current position):consumer已讀取但尚未送出的位置。

水位(watermark):也被稱為高水位(high watermark),嚴格來說他不屬于conusmer管理範圍,而屬于分區日志概念,consumer可以讀取水位之下的所有消息,水位之上的則不可以讀取。

日志終端位移(log end offset,leo):日志的最新位移,同樣不屬于consumer範疇,而屬于分區日志管轄。它表示了某個分區副本目前儲存消息對應最大的位移值。值得注意的是,正常情況下leo不會比水位值小。事實上,隻有分區所有副本都儲存某條消息,該分區leader副本才會向上移動水位值。

版本版consumer位移管理

consumer會在kafka叢集所有broker裡選一個broker作為consumer group的coordinator,用于實作組成員管理,消費配置設定方案,位移送出等。和普通的kafka topic相同,該topic配置多個分區,每個分區有多個副本。位移存在的目的就是儲存consumer送出的位移。

當消費者組首次啟動時,由于沒有初識位移資訊,coordinator必須為其确定初始位移值,這就是consumer參數auto.offset.reset的作用。通常consumer要麼從最早位移開始讀取,要麼從最新位移開始讀取。

Consumer送出位移主要機制通過向所屬coordinator發送位移送出請求實作的,每個位移送出都會往_consumer_offsets對應分區上追加一條消息。消息的key是groupid、topic等,而value就是位移值,如果consumer為同一個group的同一個topic分區送出多次位移,那麼就會存在多條key相同但value不同的消息,顯然我們隻關心最新一條。

自動送出和手動送出

位移送出政策對提供消費傳遞語義至關重要,預設情況下consumer自動送出間隔是5s、這就是說若不做特定設定,consumer可以通過參數auto.commit.interval.ms參數可以控制自動送出間隔。

自動送出位移的優勢是降低使用者開發成本使得使用者不比處理位移送出,劣勢使用者不能細顆粒度的處理位移送出,特别是強調精确一次處理語義時,這種情況下,使用者可以手動位移送出。

典型的consumer場景,使用者需要對poll方法傳回的消息集合中消息執行業務處理,使用者想要保證隻有消息被真正處理才去送出位移,如果自動送出則無法保證這種位移時序性,是以這種情況下必須手動送出位移。在建構kafkaConsumer時設定enable.auto.commit=false,然後調用conmmitSync或commitAsync方法即可。

自動送出:預設配置,enable.auto.commit=true。開發簡單,無法實作精确控制,位移送出失敗後不易處理。可能造成資料丢失,最多實作“最少一次”處理語義。能容忍一定消息丢失。

自動送出:設定enable.autocommit=false。手動調用consumer.commitSync或

consumer.commitAsync位移送出,可以實作“最少一次”處理,依賴外部可以實作“精确一次”處理語義。

ConsumerRecords<String, String> records= consumer.poll(lOOO); 
    for (TopicParti tion partition : records.partitions()) { 
    List<ConsumerRecord<String, String partit onRecords = 
                    records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
            System.out.println(record.offset ()+”:”+ record.value());
        }
    long lastOffset = parti tionRecords. get (partitionRecords. 
        size() - 1) . offset() ; 
        consumer.commitSync(Collections singletonMap(part tion, new 
        OffsetAndMetadata(lastOffset + 1)));