一、前言
本文針對 Kafka 的消費者代碼,均由 Kafka 原生的用戶端 API 構成,而非 spring-kafka 或 SpringCloudStream 等對于原生 API 進行封裝的示例。
Kafka 的消費方式,根據不同的需求可以做出多樣的選擇。
- 可以根據是否手動指定分區,而分為 Subscribe 和 Assign 這兩種模式
- 可以根據消費特定條件,而分為最新偏移量、指定偏移量和指定時間戳開始消費這三種方式
二、各種消費方式
2.1 Subscribe模式(訂閱主題)
對 Kafka 消費者來說,Subscribe 模式是最簡單的方式,往往也是最常用的方式,即僅需要訂閱一個主題即可。
public static void testSubscribe() {
Properties properties = new Properties();
// Kafka叢集位址
properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
// 消費者組,僅在subscribe模式下生效,用于分區自動再均衡,而assign模式直接指定分區
properties.put("group.id", "test_group");
// 反序列化器
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 訂閱topic
String topic = "test_topic";
consumer.subscribe(Pattern.compile(topic));
while (true) {
// 每1000ms輪詢一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
log.info("本次輪詢到:{}條", records.count());
for (ConsumerRecord<String, String> record : records) {
log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
record.offset(), record.value());
}
}
}
2.1.1 解析
通過 Kafka 原生的消費者 API 來消費資料,主要分為三個步驟:
- 配置必要資訊以構造消費者執行個體
- 訂閱主題(或後文中的指定分區)
- 輪詢消息
2.1.2 注意事項
- 一般情況下,配置 group.id 僅在 Subscribe 模式下生效,一般認為消費者組的概念主要對消息分區自動再均衡起作用。
- 一個消費者可以消費多個主題和多個分區,但一個分區隻能同時被同一個消費者組裡的一個消費者消費。
- 通過 public void subscribe(Pattern pattern, ConsumerRebalanceListener listener),可監聽消費者組内的分區再均衡,進而實作自定義的業務。
2.2 Assign模式(手動指定分區)
上面說了,除了 Subscribe 模式,還有 Assign 模式,用來手動指定要消費的消息分區。
public static void testAssignOffset() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 預設每次輪詢最多取多少條消息,預設500
properties.put("max.poll.records", 1);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "test_topic";
TopicPartition tp = new TopicPartition(topic, 0);
// 指定分區
consumer.assign(Collections.singletonList(tp));
log.info("本topic下所有的分區:{}", consumer.partitionsFor(topic));
// 擷取消費者被配置設定到的分區(注意,assign模式會直接傳回手動指定的分區,而subscribe模式等到自動配置設定分區後才有傳回)
log.info("本消費者配置設定到的分區:{}", consumer.assignment());
// 為某個指定分區任意位置、起始位置、末尾位置為起始消費位置(offset預設從0開始)
// 注意若配置設定的offset<分區最小的offset(可能kafka删除政策影響,比如預設删除超過7d的資料導緻最小offset值變化),将從最新offset處監聽消費
// consumer.seek(tp, 5);
// consumer.seekToBeginning(Arrays.asList(tp));
// consumer.seekToEnd(Collections.singletonList(tp));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
log.info("本次輪詢到:{}條", records.count());
for (ConsumerRecord<String, String> record : records) {
log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
record.offset(), record.value());
}
}
}
2.2.1 解析
- 在 Assign 模式下,消費者不再需要消費者組的概念,是以 group.id 可以忽略配置。
- 通過消費者配置 max.poll.records,可修改每次輪詢的最大消息數,預設值 500。
- 與 Subscribe 模式不同,Assign 模式通過 public void assign(Collection<TopicPartition> partitions),來指定資料分區集合。
- 通過 public List<PartitionInfo> partitionFor(String topic),可擷取某主題下的所有分區資訊。
- 通過 public Set<TopicPartition> assignment(),可擷取消費者被配置設定到的分區集合。
2.2.2 注意事項
Assign 模式下,可通過 seek/seekToBeginning/seekToEnd 等 API 來指定偏移量 offset 開始消費。
2.3 指定偏移量消費
在 2.2 上一節的代碼基礎上,打開 seek/seekToBeginning/seekToEnd 等注釋,即可指定偏移量進行消費。
2.3.1 注意事項
- 若指定分區的偏移量已在分區上不存在(比如受到 Kafka 清除政策的影響),則将從最新 offset 處監聽消費。
- 因為不是 Subscribe 模式,是以不存在消費者組的概念,是以即便設定了消費者組,也不會觸發消費者組的分區再均衡操作。
2.4 指定時間戳消費
Kafka 不僅支援指定偏移量消費,也支援指定消息的時間戳進行消費。不過根本上也是通過偏移量的消費。
public static void testAssignTimeStamp() throws ParseException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "test_topic";
// 設定消費起始時間
String startTime = "2020-05-19 15:52:41";
Long startTimestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startTime).getTime();
Map<TopicPartition, Long> timestampMap = new HashMap<>();
// 擷取每一個分區資訊
List<PartitionInfo> partitionInfoLst = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfoLst) {
// 設定每一個分區的起始消費時間為指定時間
timestampMap.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), startTimestamp);
}
// 通過時間戳查找給定分區的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampMap);
// 指定分區
consumer.assign(offsetMap.keySet());
// 設定每一個分區的指定時間對應的消費偏移量
for (TopicPartition topicPartition : offsetMap.keySet()) {
consumer.seek(topicPartition, offsetMap.get(topicPartition).offset());
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
log.info("本次輪詢到:{}條", records.count());
for (ConsumerRecord<String, String> record : records) {
log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
record.offset(), record.value());
}
}
}
2.4.1 解析
指定時間戳消費的關鍵步驟如下:
- 配置必要資訊以構造消費者執行個體
- 對要消費的各個主題下的各個分區,設定開始消費的時間戳
- 根據各自分區的時間戳通過 offsetsForTimes 對應擷取各自的偏移量
- 指定消費的消息分區集合
- 指定各自分區開始消費的偏移量
- 輪詢消息
2.4.2 注意事項
- 若指定分區的時間戳對應的偏移量已在分區上不存在(比如受到 Kafka 清除政策的影響),則将從最新 offset 處監聽消費。
- 因為不是 Subscribe 模式,是以不存在消費者組的概念,是以即便設定了消費者組,也不會觸發消費者組的分區再均衡操作。
2.5 Subscribe模式下指定偏移量消費
上面的指定偏移量也好,指定時間戳的消費方式也罷,都是屬于 Assign 模式的。那 Subscribe 模式能否也可以指定偏移量消費呢?答案是可以的。
public static void testSubscribeOffset() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
properties.put("group.id", "test_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "test_topic";
Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(5));
// 手動送出指定offset作為起始消費offset
consumer.commitSync(hashMaps);
consumer.subscribe(Pattern.compile(topic));
while (true) {
// 每1000ms輪詢一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
log.info("本次輪詢到:{}條", records.count());
for (ConsumerRecord<String, String> record : records) {
log.info("-------消息來了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
record.offset(), record.value());
}
}
}
2.5.1 解析
實際上,我們是通過在開啟輪詢之前,手動送出一次偏移量資訊,然後再去輪詢消息的方式達到目的。
2.5.2 注意事項
同樣地,若指定分區的偏移量已在分區上不存在(比如受到 Kafka 清除政策的影響),則将從最新 offset 處監聽消費。