天天看點

8.Kafka消費者API

環境準備

  • java環境
  • kafka環境
  • kafka-clients jar包

    或者依賴:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.8.0</version>
</dependency>
           

Kafka API

Consumer 消費資料時的可靠性是很容易保證的,因為資料在 Kafka 中是持久化的,故不用擔心資料丢失問題。

由于 consumer 在消費過程中可能會出現斷電當機等故障,consumer 恢複後,需要從故障前的位置的繼續消費,是以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢複後繼續消費。是以 offset 的維護是 Consumer 消費資料是必須考慮的問題。

8.Kafka消費者API

需要用到的類:

KafkaConsumer:需要建立一個消費者對象,用來消費資料。

ConsumerConfig:擷取所需的一系列配置參數。

ConsuemrRecord:每條資料都要封裝成一個 ConsumerRecord 對象。

小試牛刀

package com.huazai.kafka.example.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author pyh
 * @date 2021/8/2 23:18
 */
public class CustomConsumer {
    private final static String TOPIC = "test_topic";

    public static void main(String[] args) {
        Properties properties = new Properties();

        // kafka連接配接位址,多個位址用“,”隔開
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.132:9092,192.168.64.132:9093,192.168.64.132:9094");
        // 定義消費者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        // 開啟自動送出事務
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自動送出offset間隔時間,機關毫秒(隻有開啟自動送出事務後此配置才生效)
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "500");
        // 反序列化key所用到的類
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化value所用到的類
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 訂閱主題,如果是第一次訂閱該主題,則該主題之前的消息消費不到
        kafkaConsumer.subscribe(Collections.singleton(TOPIC));
        while (true) {
            int consumedCount = 0;
            // 拉取消費記錄
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
            // 周遊解析消費記錄
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                int partition = consumerRecord.partition();
                String key = consumerRecord.key();
                String value = consumerRecord.value();
                System.out.println("正在消費" + partition + "分區記錄:" + value + ",key為:" + key);
                consumedCount++;
            }
            //kafkaConsumer.commitAsync();
            System.out.println("本次共消費記錄數:" + consumedCount);
        }
    }
}
           

啟動消費者程式後,向主題test_topic生産消息然後觀察程式消費情況。此處隻示範使用指令生成消息,java api生産消息參考:7.Kafka生産者API

8.Kafka消費者API

程式控制台輸出如下:

8.Kafka消費者API

自動送出offset

offset記錄每個消費者組上一次消費的位置,以便下一次消費的時候從該offset之後的資料繼續消費。

為了使我們能夠專注于自己的業務邏輯,Kafka 提供了自動送出 offset 的功能。自動送出 offset 的相關參數如下:

enable.auto.commit:是否開啟自動送出 offset 功能。

auto.commit.interval.ms:自動送出 offset 的時間間隔(開啟自動送出offset之後此選項才生效)。

手動送出offset

雖然自動送出 offset 十分簡介便利,但由于其是基于時間送出的,開發人員難以把握offset 送出的時機。是以 Kafka 還提供了手動送出 offset 的 API。

手動送出 offset 的方法有兩種:分别是 commitSync(同步送出)和 commitAsync(異步送出)。兩者的相同點是,都會将本次 poll 的一批資料最高的偏移量送出;不同點是,commitSync 阻塞目前線程,一直到送出成功,并且會自動失敗重試(由不可控因素導緻,也會出現送出失敗);而 commitAsync 則沒有失敗重試機制,故有可能送出失敗。

package com.huazai.kafka.example.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author pyh
 * @date 2021/8/2 23:18
 */
public class CustomConsumer {
    private final static String TOPIC = "test_topic";

    public static void main(String[] args) {
        Properties properties = new Properties();

        // kafka連接配接位址,多個位址用“,”隔開
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.132:9092,192.168.64.132:9093,192.168.64.132:9094");
        // 定義消費者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        // 關閉自動送出事務
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 自動送出offset間隔時間,機關毫秒(隻有開啟自動送出事務後此配置才生效)
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "500");
        // 反序列化key所用到的類
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化value所用到的類
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 訂閱主題,如果是第一次訂閱該主題,則該主題之前的消息消費不到
        kafkaConsumer.subscribe(Collections.singleton(TOPIC));
        while (true) {
            int consumedCount = 0;
            // 拉取消費記錄
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
            // 周遊解析消費記錄
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                int partition = consumerRecord.partition();
                String key = consumerRecord.key();
                String value = consumerRecord.value();
                System.out.println("正在消費" + partition + "分區記錄:" + value + ",key為:" + key);
                consumedCount++;
            }
            // 同步送出offset
            // kafkaConsumer.commitSync();
            //異步送出offset
            kafkaConsumer.commitAsync();
            System.out.println("本次共消費記錄數:" + consumedCount);
        }
    }
}
           

雖然在大部分情況下都使用異步送出offset,但是如果業務要求一定按照順序消費,則由于異步送出offset拉取第一批資料之後送出offset由于沒有阻塞,繼續拉取第二批資料,則會存在送出了消費第二批資料的offset之後才送出消費第一批資料的offset。而如果是同步送出offset,則第一批資料的offset沒送出完之前,則阻塞而不能繼續拉取下一批資料。

如果消費者組沒有開啟自動送出offset,也沒有手動送出offset,則下一次重新消費者,則會繼續消費到重複的資料(最新送出的一次offset到最後一條範圍内的資料)。

自動重置offset

自動重置offset配置:auto.offset.reset

有以下兩個可選的值:

latest(預設):自動重置到最新一條資料的offset

earliest:自動重置到最早一條還沒過期資料的offset

log.retention.hours指定消息過期時間,機關小時,預設為168小時,即7天

觸發自動重置offset需要滿足以下兩個條件:

1、指定消費者組第一次消費該topic的資料,即指定的消費者組對該topic還沒有初始化過offset。

2、目前消費者組的offset對應的資料已過期,即消費者組的offset對應的過期資料已不能消費。

面試題:如何重新消費到最新的資料?

将auto.offset.reset的值配置為earliest,并且重新換一個新的消費者組消費。