天天看點

Kafka的不同消費方式和消費者代碼示例一、前言二、各種消費方式

一、前言

本文針對 Kafka 的消費者代碼,均由 Kafka 原生的用戶端 API 構成,而非 spring-kafka 或 SpringCloudStream 等對于原生 API 進行封裝的示例。

Kafka 的消費方式,根據不同的需求可以做出多樣的選擇。

  • 可以根據是否手動指定分區,而分為 Subscribe 和 Assign 這兩種模式
  • 可以根據消費特定條件,而分為最新偏移量、指定偏移量和指定時間戳開始消費這三種方式

二、各種消費方式

2.1 Subscribe模式(訂閱主題)

對 Kafka 消費者來說,Subscribe 模式是最簡單的方式,往往也是最常用的方式,即僅需要訂閱一個主題即可。

public static void testSubscribe() {
    Properties properties = new Properties();
    // Kafka叢集位址
    properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
    // 消費者組,僅在subscribe模式下生效,用于分區自動再均衡,而assign模式直接指定分區
    properties.put("group.id", "test_group");
    // 反序列化器
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    // 訂閱topic
    String topic = "test_topic";
    consumer.subscribe(Pattern.compile(topic));
    while (true) {
        // 每1000ms輪詢一次
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
        log.info("本次輪詢到:{}條", records.count());
        for (ConsumerRecord<String, String> record : records) {
            log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
                    record.offset(), record.value());
        }
    }
}
           

2.1.1 解析

通過 Kafka 原生的消費者 API 來消費資料,主要分為三個步驟:

  1. 配置必要資訊以構造消費者執行個體
  2. 訂閱主題(或後文中的指定分區)
  3. 輪詢消息

2.1.2 注意事項

  1. 一般情況下,配置 group.id 僅在 Subscribe 模式下生效,一般認為消費者組的概念主要對消息分區自動再均衡起作用。
  2. 一個消費者可以消費多個主題和多個分區,但一個分區隻能同時被同一個消費者組裡的一個消費者消費。
  3. 通過 public void subscribe(Pattern pattern, ConsumerRebalanceListener listener),可監聽消費者組内的分區再均衡,進而實作自定義的業務。

2.2 Assign模式(手動指定分區)

上面說了,除了 Subscribe 模式,還有 Assign 模式,用來手動指定要消費的消息分區。

public static void testAssignOffset() {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    // 預設每次輪詢最多取多少條消息,預設500
    properties.put("max.poll.records", 1);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    String topic = "test_topic";
    TopicPartition tp = new TopicPartition(topic, 0);
    // 指定分區
    consumer.assign(Collections.singletonList(tp));
    log.info("本topic下所有的分區:{}", consumer.partitionsFor(topic));
    // 擷取消費者被配置設定到的分區(注意,assign模式會直接傳回手動指定的分區,而subscribe模式等到自動配置設定分區後才有傳回)
    log.info("本消費者配置設定到的分區:{}", consumer.assignment());
    // 為某個指定分區任意位置、起始位置、末尾位置為起始消費位置(offset預設從0開始)
    // 注意若配置設定的offset<分區最小的offset(可能kafka删除政策影響,比如預設删除超過7d的資料導緻最小offset值變化),将從最新offset處監聽消費
    // consumer.seek(tp, 5);
    // consumer.seekToBeginning(Arrays.asList(tp));
    // consumer.seekToEnd(Collections.singletonList(tp));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
        log.info("本次輪詢到:{}條", records.count());
        for (ConsumerRecord<String, String> record : records) {
            log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
                    record.offset(), record.value());
        }
    }
}
           

2.2.1 解析

  • 在 Assign 模式下,消費者不再需要消費者組的概念,是以 group.id 可以忽略配置。
  • 通過消費者配置 max.poll.records,可修改每次輪詢的最大消息數,預設值 500。
  • 與 Subscribe 模式不同,Assign 模式通過 public void assign(Collection<TopicPartition> partitions),來指定資料分區集合。
  • 通過 public List<PartitionInfo> partitionFor(String topic),可擷取某主題下的所有分區資訊。
  • 通過 public Set<TopicPartition> assignment(),可擷取消費者被配置設定到的分區集合。

2.2.2 注意事項

Assign 模式下,可通過 seek/seekToBeginning/seekToEnd 等 API 來指定偏移量 offset 開始消費。

2.3 指定偏移量消費

在 2.2 上一節的代碼基礎上,打開 seek/seekToBeginning/seekToEnd 等注釋,即可指定偏移量進行消費。

2.3.1 注意事項

  1. 若指定分區的偏移量已在分區上不存在(比如受到 Kafka 清除政策的影響),則将從最新 offset 處監聽消費。
  2. 因為不是 Subscribe 模式,是以不存在消費者組的概念,是以即便設定了消費者組,也不會觸發消費者組的分區再均衡操作。

2.4 指定時間戳消費

Kafka 不僅支援指定偏移量消費,也支援指定消息的時間戳進行消費。不過根本上也是通過偏移量的消費。

public static void testAssignTimeStamp() throws ParseException {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    String topic = "test_topic";
    // 設定消費起始時間
    String startTime = "2020-05-19 15:52:41";
    Long startTimestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startTime).getTime();
    Map<TopicPartition, Long> timestampMap = new HashMap<>();
    // 擷取每一個分區資訊
    List<PartitionInfo> partitionInfoLst = consumer.partitionsFor(topic);
    for (PartitionInfo partitionInfo : partitionInfoLst) {
        // 設定每一個分區的起始消費時間為指定時間
        timestampMap.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), startTimestamp);
    }
    // 通過時間戳查找給定分區的偏移量
    Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampMap);
    // 指定分區
    consumer.assign(offsetMap.keySet());
    // 設定每一個分區的指定時間對應的消費偏移量
    for (TopicPartition topicPartition : offsetMap.keySet()) {
        consumer.seek(topicPartition, offsetMap.get(topicPartition).offset());
    }
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
        log.info("本次輪詢到:{}條", records.count());
        for (ConsumerRecord<String, String> record : records) {
            log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
                    record.offset(), record.value());
        }
    }
}
           

2.4.1 解析

指定時間戳消費的關鍵步驟如下:

  1. 配置必要資訊以構造消費者執行個體
  2. 對要消費的各個主題下的各個分區,設定開始消費的時間戳
  3. 根據各自分區的時間戳通過 offsetsForTimes 對應擷取各自的偏移量
  4. 指定消費的消息分區集合
  5. 指定各自分區開始消費的偏移量
  6. 輪詢消息

2.4.2 注意事項

  1. 若指定分區的時間戳對應的偏移量已在分區上不存在(比如受到 Kafka 清除政策的影響),則将從最新 offset 處監聽消費。
  2. 因為不是 Subscribe 模式,是以不存在消費者組的概念,是以即便設定了消費者組,也不會觸發消費者組的分區再均衡操作。

2.5 Subscribe模式下指定偏移量消費

上面的指定偏移量也好,指定時間戳的消費方式也罷,都是屬于 Assign 模式的。那 Subscribe 模式能否也可以指定偏移量消費呢?答案是可以的。

public static void testSubscribeOffset() {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
    properties.put("group.id", "test_group");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    String topic = "test_topic";
    Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
    hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(5));
    // 手動送出指定offset作為起始消費offset
    consumer.commitSync(hashMaps);
    consumer.subscribe(Pattern.compile(topic));
    while (true) {
        // 每1000ms輪詢一次
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
        log.info("本次輪詢到:{}條", records.count());
        for (ConsumerRecord<String, String> record : records) {
            log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
                    record.offset(), record.value());
        }
    }
}
           

2.5.1 解析

實際上,我們是通過在開啟輪詢之前,手動送出一次偏移量資訊,然後再去輪詢消息的方式達到目的。

2.5.2 注意事項

同樣地,若指定分區的偏移量已在分區上不存在(比如受到 Kafka 清除政策的影響),則将從最新 offset 處監聽消費。