天天看點

Kafka系列之Kafka消費者:從Kafka中讀取資料

本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家

應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的消息。在我們深入這些API之前,先來看下幾個比較重要的概念。

Kafka消費者相關的概念

消費者與消費組

假設這麼個場景:我們從Kafka中讀取消息,并且進行檢查,最後産生結果資料。我們可以建立一個消費者執行個體去做這件事情,但如果生産者寫入消息的速度比消費者讀取的速度快怎麼辦呢?這樣随着時間增長,消息堆積越來越嚴重。對于這種場景,我們需要增加多個消費者來進行水準擴充。

Kafka消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題時,每個消費者會收到不同分區的消息。假設有一個T1主題,該主題有4個分區;同時我們有一個消費組G1,這個消費組隻有一個消費者C1。那麼消費者C1将會收到這4個分區的消息,如下所示:

Kafka系列之Kafka消費者:從Kafka中讀取資料

如果我們增加新的消費者C2到消費組G1,那麼每個消費者将會分别收到兩個分區的消息,如下所示:

Kafka系列之Kafka消費者:從Kafka中讀取資料

如果增加到4個消費者,那麼每個消費者将會分别收到一個分區的消息,如下所示:

Kafka系列之Kafka消費者:從Kafka中讀取資料

但如果我們繼續增加消費者到這個消費組,剩餘的消費者将會空閑,不會收到任何消息:

Kafka系列之Kafka消費者:從Kafka中讀取資料

總而言之,我們可以通過增加消費組的消費者來進行水準擴充提升消費能力。這也是為什麼建議建立主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數量不應該比分區數多,因為多出來的消費者是空閑的,沒有任何幫助。

Kafka一個很重要的特性就是,隻需寫入一次消息,可以支援任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對于上面的例子,假如我們新增了一個新的消費組G2,而這個消費組有兩個消費者,那麼會是這樣的:

Kafka系列之Kafka消費者:從Kafka中讀取資料

在這個場景中,消費組G1和消費組G2都能收到T1主題的全量消息,在邏輯意義上來說它們屬于不同的應用。

最後,總結起來就是:如果應用需要讀取全量消息,那麼請為該應用設定一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裡增加消費者。

消費組與分區重平衡

可以看到,當新的消費者加入消費組,它會消費一個或多個分區,而這些分區之前是由其他消費者負責的;另外,當消費者離開消費組(比如重新開機、當機等)時,它所消費的分區會配置設定給其他分區。這種現象稱為重平衡(rebalance)。重平衡是Kafka一個很重要的性質,這個性質保證了高可用和水準擴充。不過也需要注意到,在重平衡期間,所有消費者都不能消費消息,是以會造成整個消費組短暫的不可用。而且,将分區進行重平衡也會導緻原來的消費者狀态過期,進而導緻消費者需要重新更新狀态,這段期間也會降低消費性能。後面我們會讨論如何安全的進行重平衡以及如何盡可能避免。

消費者通過定期發送心跳(hearbeat)到一個作為組協調者(group coordinator)的broker來保持在消費組記憶體活。這個broker不是固定的,每個消費組都可能不同。當消費者拉取消息或者送出時,便會發送心跳。

如果消費者超過一定時間沒有發送心跳,那麼它的會話(session)就會過期,組協調者會認為該消費者已經當機,然後觸發重平衡。可以看到,從消費者當機到會話過期是有一定時間的,這段時間内該消費者的分區都不能進行消息消費;通常情況下,我們可以進行優雅關閉,這樣消費者會發送離開的消息到組協調者,這樣組協調者可以立即進行重平衡而不需要等待會話過期。

在0.10.1版本,Kafka對心跳機制進行了修改,将發送心跳與拉取消息進行分離,這樣使得發送心跳的頻率不受拉取的頻率影響。另外更高版本的Kafka支援配置一個消費者多長時間不拉取消息但仍然保持存活,這個配置可以避免活鎖(livelock)。活鎖,是指應用沒有故障但是由于某些原因不能進一步消費。

建立Kafka消費者

讀取Kafka消息隻需要建立一個kafkaConsumer,建立過程與KafkaProducer非常相像。我們需要使用四個基本屬性,bootstrap.servers、key.deserializer、value.deserializer和group.id。其中,bootstrap.servers與建立KafkaProducer的含義一樣;key.deserializer和value.deserializer是用來做反序列化的,也就是将位元組數組轉換成對象;group.id不是嚴格必須的,但通常都會指定,這個參數是消費者的消費組。

下面是一個代碼樣例:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
           

假如想要每次都需要從頭消費某個主題,則需要滿足兩個條件 1.消費者id是一個新的消費者id

2.将“auto.offset.reset“參數設定為“earliest”

訂閱主題

建立完消費者後我們便可以訂閱主題了,隻需要通過調用subscribe()方法即可,這個方法接收一個主題清單,非常簡單:

這個例子中隻訂閱了一個customerCountries主題。另外,我們也可以使用正規表達式來比對多個主題,而且訂閱之後如果又有比對的新主題,那麼這個消費組會立即對其進行消費。正規表達式在連接配接Kafka與其他系統時非常有用。比如訂閱所有的測試主題:

拉取循環

消費資料的API和處理方式很簡單,我們隻需要循環不斷拉取消息即可。Kafka對外暴露了一個非常簡潔的poll方法,其内部實作了協作、分區重平衡、心跳、資料拉取等功能,但使用時這些細節都被隐藏了,我們也不需要關注這些。下面是一個代碼樣例:

try {
   while (true) {  //1)
       ConsumerRecords<String, String> records = consumer.poll(100);  //2)
       for (ConsumerRecord<String, String> record : records)  //3)
       {
           log.debug("topic = %s, partition = %s, offset = %d,
              customer = %s, country = %s\n",
              record.topic(), record.partition(), record.offset(),
              record.key(), record.value());
           int updatedCount = 1;
           if (custCountryMap.countainsValue(record.value())) {
               updatedCount = custCountryMap.get(record.value()) + 1;
           }
           custCountryMap.put(record.value(), updatedCount)
           JSONObject json = new JSONObject(custCountryMap);
           System.out.println(json.toString(4))
       }
   }
} finally {
      consumer.close(); //4
}
           

其中,代碼中标注了幾點,說明如下:

  • 1)這個例子使用無限循環消費并處理資料,這也是使用Kafka最多的一個場景,後面我們會讨論如何更好的退出循環并關閉。
  • 2)這是上面代碼中最核心的一行代碼。我們不斷調用poll拉取資料,如果停止拉取,那麼Kafka會認為此消費者已經死亡并進行重平衡。參數值是一個逾時時間,指明線程如果沒有資料時等待多長時間,0表示不等待立即傳回。
  • 3)poll()方法傳回記錄的清單,每條記錄包含key/value以及主題、分區、位移資訊。
  • 4)主動關閉可以使得Kafka立即進行重平衡而不需要等待會話過期。

另外需要提醒的是,消費者對象不是線程安全的,也就是不能夠多個線程同時使用一個消費者對象;而且也不能夠一個線程有多個消費者對象。簡而言之,一個線程一個消費者,如果需要多個消費者那麼請使用多線程來進行一一對應。

消費者配置

上面的例子中隻設定了幾個最基本的消費者參數,bootstrap.servers,group.id,key.deserializer和value.deserializer,其他的參數可以看Kafka文檔。雖然我們很多情況下隻是使用預設設定就行,但了解一些比較重要的參數還是很有幫助的。

fetch.min.bytes

這個參數允許消費者指定從broker讀取消息時最小的資料量。當消費者從broker讀取消息時,如果資料量小于這個門檻值,broker會等待直到有足夠的資料,然後才傳回給消費者。對于寫入量不高的主題來說,這個參數可以減少broker和消費者的壓力,因為減少了往返的時間。而對于有大量消費者的主題來說,則可以明顯減輕broker壓力。

fetch.max.wait.ms

上面的fetch.min.bytes參數指定了消費者讀取的最小資料量,而這個參數則指定了消費者讀取時最長等待時間,進而避免長時間阻塞。這個參數預設為500ms。

max.partition.fetch.bytes

這個參數指定了每個分區傳回的最多位元組數,預設為1M。也就是說,KafkaConsumer.poll()傳回記錄清單時,每個分區的記錄位元組數最多為1M。如果一個主題有20個分區,同時有5個消費者,那麼每個消費者需要4M的空間來處理消息。實際情況中,我們需要設定更多的空間,這樣當存在消費者當機時,其他消費者可以承擔更多的分區。

需要注意的是,max.partition.fetch.bytes必須要比broker能夠接收的最大的消息(由max.message.size設定)大,否則會導緻消費者消費不了消息。另外,在上面的樣例可以看到,我們通常循環調用poll方法來讀取消息,如果max.partition.fetch.bytes設定過大,那麼消費者需要更長的時間來處理,可能會導緻沒有及時poll而會話過期。對于這種情況,要麼減小max.partition.fetch.bytes,要麼加長會話時間。

session.timeout.ms

這個參數設定消費者會話過期時間,預設為3秒。也就是說,如果消費者在這段時間内沒有發送心跳,那麼broker将會認為會話過期而進行分區重平衡。這個參數與heartbeat.interval.ms有關,heartbeat.interval.ms控制KafkaConsumer的poll()方法多長時間發送一次心跳,這個值需要比session.timeout.ms小,一般為1/3,也就是1秒。更小的session.timeout.ms可以讓Kafka快速發現故障進行重平衡,但也加大了誤判的機率(比如消費者可能隻是處理消息慢了而不是當機)。

auto.offset.reset

這個參數指定了當消費者第一次讀取分區或者上一次的位置太老(比如消費者下線時間太久)時的行為,可以取值為latest(從最新的消息開始消費)或者earliest(從最老的消息開始消費)。

enable.auto.commit

這個參數指定了消費者是否自動送出消費位移,預設為true。如果需要減少重複消費或者資料丢失,你可以設定為false。如果為true,你可能需要關注自動送出的時間間隔,該間隔由auto.commit.interval.ms設定。

partition.assignment.strategy

我們已經知道當消費組存在多個消費者時,主題的分區需要按照一定政策配置設定給消費者。這個政策由PartitionAssignor類決定,預設有兩種政策:

  • 範圍(Range):對于每個主題,每個消費者負責一定的連續範圍分區。假如消費者C1和消費者C2訂閱了兩個主題,這兩個主題都有3個分區,那麼使用這個政策會導緻消費者C1負責每個主題的分區0和分區1(下标基于0開始),消費者C2負責分區2。可以看到,如果消費者數量不能整除分區數,那麼第一個消費者會多出幾個分區(由主題數決定)。
  • 輪詢(RoundRobin):對于所有訂閱的主題分區,按順序一一的配置設定給消費者。用上面的例子來說,消費者C1負責第一個主題的分區0、分區2,以及第二個主題的分區1;其他分區則由消費者C2負責。可以看到,這種政策更加均衡,所有消費者之間的分區數的內插補點最多為1。

partition.assignment.strategy設定了配置設定政策,預設為org.apache.kafka.clients.consumer.RangeAssignor(使用範圍政策),你可以設定為org.apache.kafka.clients.consumer.RoundRobinAssignor(使用輪詢政策),或者自己實作一個配置設定政策然後将partition.assignment.strategy指向該實作類。

client.id

這個參數可以為任意值,用來指明消息從哪個用戶端發出,一般會在列印日志、衡量名額、配置設定配額時使用。

max.poll.records

這個參數控制一個poll()調用傳回的記錄數,這個可以用來控制應用在拉取循環中的處理資料量。

receive.buffer.bytes、send.buffer.bytes

這兩個參數控制讀寫資料時的TCP緩沖區,設定為-1則使用系統的預設值。如果消費者與broker在不同的資料中心,可以一定程度加大緩沖區,因為資料中心間一般的延遲都比較大。

送出(commit)與位移(offset)

當我們調用poll()時,該方法會傳回我們沒有消費的消息。當消息從broker傳回消費者時,broker并不跟蹤這些消息是否被消費者接收到;Kafka讓消費者自身來管理消費的位移,并向消費者提供更新位移的接口,這種更新位移方式稱為送出(commit)。

在正常情況下,消費者會發送分區的送出資訊到Kafka,Kafka進行記錄。當消費者當機或者新消費者加入時,Kafka會進行重平衡,這會導緻消費者負責之前并不屬于它的分區。重平衡完成後,消費者會重新擷取分區的位移,下面來看下兩種有意思的情況。

假如一個消費者在重平衡前後都負責某個分區,如果送出位移比之前實際處理的消息位移要小,那麼會導緻消息重複消費,如下所示:

Kafka系列之Kafka消費者:從Kafka中讀取資料

假如在重平衡前某個消費者拉取分區消息,在進行消息處理前送出了位移,但還沒完成處理當機了,然後Kafka進行重平衡,新的消費者負責此分區并讀取送出位移,此時會“丢失”消息,如下所示:

Kafka系列之Kafka消費者:從Kafka中讀取資料

是以,送出位移的方式會對應用有比較大的影響,下面來看下不同的送出方式。

自動送出

這種方式讓消費者來管理位移,應用本身不需要顯式操作。當我們将enable.auto.commit設定為true,那麼消費者會在poll方法調用後每隔5秒(由auto.commit.interval.ms指定)送出一次位移。和很多其他操作一樣,自動送出也是由poll()方法來驅動的;在調用poll()時,消費者判斷是否到達送出時間,如果是則送出上一次poll傳回的最大位移。

需要注意到,這種方式可能會導緻消息重複消費。假如,某個消費者poll消息後,應用正在處理消息,在3秒後Kafka進行了重平衡,那麼由于沒有更新位移導緻重平衡後這部分消息重複消費。

送出目前位移

為了減少消息重複消費或者避免消息丢失,很多應用選擇自己主動送出位移。設定auto.commit.offset為false,那麼應用需要自己通過調用commitSync()來主動送出位移,該方法會送出poll傳回的最後位移。

為了避免消息丢失,我們應當在完成業務邏輯後才送出位移。而如果在處理消息時發生了重平衡,那麼隻有目前poll的消息會重複消費。下面是一個自動送出的代碼樣例:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}
           

上面代碼poll消息,并進行簡單的列印(在實際中有更多的處理),最後完成處理後進行了位移送出。

異步送出

手動送出有一個缺點,那就是當發起送出調用時應用會阻塞。當然我們可以減少手動送出的頻率,但這個會增加消息重複的機率(和自動送出一樣)。另外一個解決辦法是,使用異步送出的API。以下為使用異步送出的方式,應用發了一個送出請求然後立即傳回:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    
    consumer.commitAsync();
}
           

但是異步送出也有個缺點,那就是如果伺服器傳回送出失敗,異步送出不會進行重試。相比較起來,同步送出會進行重試直到成功或者最後抛出異常給應用。異步送出沒有實作重試是因為,如果同時存在多個異步送出,進行重試可能會導緻位移覆寫。舉個例子,假如我們發起了一個異步送出commitA,此時的送出位移為2000,随後又發起了一個異步送出commitB且位移為3000;commitA送出失敗但commitB送出成功,此時commitA進行重試并成功的話,會将實際上将已經送出的位移從3000復原到2000,導緻消息重複消費。

是以,基于這種性質,一般情況下對于異步送出,我們可能會通過回調的方式記錄送出結果:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        } 
    });
}
           

而如果想進行重試同時又保證送出順序的話,一種簡單的辦法是使用單調遞增的序号。每次發起異步送出時增加此序号,并且将此時的序号作為參數傳給回調方法;當消息送出失敗回調時,檢查參數中的序号值與全局的序号值,如果相等那麼可以進行重試送出,否則放棄(因為已經有更新的位移送出了)。

混合同步送出與異步送出

正常情況下,偶然的送出失敗并不是什麼大問題,因為後續的送出成功就可以了。但是在某些情況下(例如程式退出、重平衡),我們希望最後的送出成功,是以一種非常普遍的方式是混合異步送出和同步送出,如下所示:

try {
    while (true) {
       ConsumerRecords<String, String> records = consumer.poll(100);
       for (ConsumerRecord<String, String> record : records) {
           System.out.printf("topic = %s, partition = %s, offset = %d,
           customer = %s, country = %s\n",
           record.topic(), record.partition(),
           record.offset(), record.key(), record.value());
       }
       
       consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}
           

在正常處理流程中,我們使用異步送出來提高性能,但最後使用同步送出來保證位移送出成功。

送出特定位移

commitSync()和commitAsync()會送出上一次poll()的最大位移,但如果poll()傳回了批量消息,而且消息數量非常多,我們可能會希望在處理這些批量消息過程中送出位移,以免重平衡導緻從頭開始消費和處理。幸運的是,commitSync()和commitAsync()允許我們指定特定的位移參數,參數為一個分區與位移的map。由于一個消費者可能會消費多個分區,是以這種方式會增加一定的代碼複雜度,如下所示:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;

....

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
} }
           

代碼中在處理poll()消息的過程中,不斷儲存分區與位移的關系,每處理1000條消息就會異步送出(也可以使用同步送出)。

重平衡監聽器(Rebalance Listener)

在分區重平衡前,如果消費者知道它即将不再負責某個分區,那麼它可能需要将已經處理過的消息位移進行送出。Kafka的API允許我們在消費者新增分區或者失去分區時進行處理,我們隻需要在調用subscribe()方法時傳入ConsumerRebalanceListener對象,該對象有兩個方法:

  • public void onPartitionRevoked(Collection partitions):此方法會在消費者停止消費消費後,在重平衡開始前調用。
  • public void onPartitionAssigned(Collection partitions):此方法在分區配置設定給消費者後,在消費者開始讀取消息前調用。

下面來看一個onPartitionRevoked9)的例子,該例子在消費者失去某個分區時送出位移(以便其他消費者可以接着消費消息并處理):

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    }
    
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
             System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
   log.error("Unexpected error", e);
} finally {
   try {
       consumer.commitSync(currentOffsets);
   } finally {
       consumer.close();
       System.out.println("Closed consumer and we are done");
   }
}
           

代碼中實作了onPartitionsRevoked()方法,當消費者失去某個分區時,會送出已經處理的消息位移(而不是poll()的最大位移)。上面代碼會送出所有的分區位移,而不僅僅是失去分區的位移,但這種做法沒什麼壞處。

從指定位移開始消費

在此之前,我們使用poll()來從最後的送出位移開始消費,但我們也可以從一個指定的位移開始消費。

如果想從分區開始端重新開始消費,那麼可以使用seekToBeginning(TopicPartition tp);如果想從分區的最末端消費最新的消息,那麼可以使用seekToEnd(TopicPartition tp)。而且,Kafka還支援我們從指定位移開始消費。從指定位移開始消費的應用場景有很多,其中最典型的一個是:位移存在其他系統(例如資料庫)中,并且以其他系統的位移為準。

考慮這麼個場景:我們從Kafka中讀取消費,然後進行處理,最後把結果寫入資料庫;我們既不想丢失消息,也不想資料庫中存在重複的消息資料。對于這樣的場景,我們可能會按如下邏輯處理:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        processRecord(record);
        storeRecordInDB(record);
        consumer.commitAsync(currentOffsets);
    }
}
           

這個邏輯似乎沒什麼問題,但是要注意到這麼個事實,在持久化到資料庫成功後,送出位移到Kafka可能會失敗,那麼這可能會導緻消息會重複處理。對于這種情況,我們可以優化方案,将持久化到資料庫與送出位移實作為原子性操作,也就是要麼同時成功,要麼同時失敗。但這個是不可能的,是以我們可以在儲存記錄到資料庫的同時,也儲存位移,然後在消費者開始消費時使用資料庫的位移開始消費。這個方案是可行的,我們隻需要通過seek()來指定分區位移開始消費即可。下面是一個改進的樣例代碼:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        //在消費者負責的分區被回收前送出資料庫事務,儲存消費的記錄和位移
        commitDBTransaction();
    }
    
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //在開始消費前,從資料庫中擷取分區的位移,并使用seek()來指定開始消費的位移
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition));
    } 
}

    consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
    //在subscribe()之後poll一次,并從資料庫中擷取分區的位移,使用seek()來指定開始消費的位移
    consumer.poll(0);
    for (TopicPartition partition: consumer.assignment())
        consumer.seek(partition, getOffsetFromDB(partition));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            processRecord(record);
            //儲存記錄結果
            storeRecordInDB(record);
            //儲存位移
            storeOffsetInDB(record.topic(), record.partition(), record.offset());
        }
        //送出資料庫事務,儲存消費的記錄以及位移
        commitDBTransaction();
    }
           

具體邏輯見代碼注釋,此處不再贅述。另外注意的是,seek()隻是指定了poll()拉取的開始位移,這并不影響在Kafka中儲存的送出位移(當然我們可以在seek和poll之後送出位移覆寫)。

優雅退出

下面我們來讨論下消費者如何優雅退出。

在一般情況下,我們會在一個主線程中循環poll消息并進行處理。當需要退出poll循環時,我們可以使用另一個線程調用consumer.wakeup(),調用此方法會使得poll()抛出WakeupException。如果調用wakup時,主線程正在處理消息,那麼在下一次主線程調用poll時會抛出異常。主線程在抛出WakeUpException後,需要調用consumer.close(),此方法會送出位移,同時發送一個退出消費組的消息到Kafka的組協調者。組協調者收到消息後會立即進行重平衡(而無需等待此消費者會話過期)。

下面是一個優雅退出的樣例代碼:

//注冊JVM關閉時的回調鈎子,當JVM關閉時調用此鈎子。
Runtime.getRuntime().addShutdownHook(new Thread() {
          public void run() {
              System.out.println("Starting exit...");
              //調用消費者的wakeup方法通知主線程退出
              consumer.wakeup();
              try {
                  //等待主線程退出
                  mainThread.join();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          } 
});

...

try {
    // looping until ctrl-c, the shutdown hook will cleanup on exit
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        System.out.println(System.currentTimeMillis() + "--  waiting for data...");
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
        }
        for (TopicPartition tp: consumer.assignment())
            System.out.println("Committing offset at position:" + consumer.position(tp));
        consumer.commitSync();
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
    System.out.println("Closed consumer and we are done");
}
           

反序列化

如前所述,Kafka生産者負責将對象序列化成位元組數組并發送到Kafka。消費者則需要将位元組數組轉換成對象,這就是反序列化做的事情。序列化與反序列化需要比對,如果序列化使用IntegerSerializer,但使用StringDeserializer來反序列化,那麼會反序列化失敗。是以作為開發者,我們需要關注寫入到主題使用的是什麼序列化格式,并且保證寫入的資料能夠被消費者反序列化成功。如果使用Avro與模式注冊中心(Schema Registry)來序列化與反序列化,那麼事情會輕松許多,因為AvroSerializer會保證所有寫入的資料都是結構相容的,并且能夠被反序列化出來。

下面先來看下如何自定義反序列化,後面會進一步讨論如何使用Avro。

自定義反序列化

首先,假設序列化的對象為Customer:

public class Customer {
     private int customerID;
     private String customerName;
     public Customer(int ID, String name) {
         this.customerID = ID;
         this.customerName = name;
     }
     public int getID() {
         return customerID;
     }
     public String getName() {
         return customerName;
     } 
}
           

根據之前的序列化政策,我們的反序列化代碼如下:

import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerDeserializer implements Deserializer<Customer> {
    @Override
    public void configure(Map configs, boolean isKey) {
     // nothing to configure
    }

    @Override
    public Customer deserialize(String topic, byte[] data) {
        int id;
        int nameSize;
        String name;
        try {
            if (data == null)
                return null;
            if (data.length < 8)
                throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
            ByteBuffer buffer = ByteBuffer.wrap(data);
            id = buffer.getInt();
            String nameSize = buffer.getInt();
            byte[] nameBytes = new Array[Byte](nameSize);
            buffer.get(nameBytes);
            name = new String(nameBytes, 'UTF-8');
            return new Customer(id, name);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }
    @Override
    public void close() {
            // nothing to close
    } 
}
           

消費者使用這個反序列化的代碼如下:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer");

KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
    ConsumerRecords<String, Customer> records = consumer.poll(100);
    for (ConsumerRecord<String, Customer> record : records)
    {
    System.out.println("current customer Id: " + record.value().getId() + " and current customer name: " + record.value().getName());
    } 
}
           

最後提醒下,我們并不推薦實作自定義的序列化與反序列化,因為往往這些方案并不成熟,難以維護和更新,而且容易出錯。我們可以使用JSON、Thrift、Protobuf或者Avro的成熟的解決方案。

使用Avro反序列化

假設我們使用之前生産者Avro序列化時使用的Customer,那麼使用Avro反序列化的話,我們的樣例代碼如下:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//使用KafkaAvroDeserializer來反序列化Avro消息
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
//這裡增加了schema.registry.url參數,擷取生産者注冊的消息模式
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {
    //這裡使用之前生産者使用的Avro生成的Customer類
    ConsumerRecords<String, Customer> records = consumer.poll(1000);
    for (ConsumerRecord<String, Customer> record: records) {
        System.out.println("Current customer name is: " + record.value().getName());
    }
    consumer.commitSync();
}
           

單個消費者

一般情況下我們都是使用消費組(即便隻有一個消費者)來消費消息的,因為這樣可以在增加或減少消費者時自動進行分區重平衡。這種方式是推薦的方式。在知道主題和分區的情況下,我們也可以使用單個消費者來進行消費。對于這種情況,我們需要自己給消費者配置設定消費分區,而不是讓消費者訂閱(成為消費組)主題。

下面是一個給單個消費者指定分區進行消費的代碼樣例:

List<PartitionInfo> partitionInfos = null;
//擷取主題下所有的分區。如果你知道所指定的分區,可以跳過這一步
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos)
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    //為消費者指定分區
    consumer.assign(partitions);

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record: records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
}
           

除了需要主動擷取分區以及沒有分區重平衡,其他的處理邏輯都是一樣的。需要注意的是,如果添加了新的分區,這個消費者是感覺不到的,需要通過consumer.partitionsFor()來重新擷取分區。

繼續閱讀