在使用時Kafka時,經常遇到大批量消息在隊列中,如果一個消息一個消息的消費的話效率太低下了,這時候要用到批量消費消息。
批量監聽器
從版本1.1開始,@KafkaListener可以被配置為批量接收從Kafka話題隊列中的Message。要配置監聽器容器工廠以建立批處理偵聽器,需要設定batchListener屬性為true,代碼如下:
@Bean
KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true); // 開啟批量監聽
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //設定每次接收Message的數量
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
批量接收
在@KafkaListener注解中聲明工廠為batchFactory().
@KafkaListener(topics = "teemo", id = "consumer", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<?, ?>> list) {
List<String> messages = new ArrayList<>();
for (ConsumerRecord<?, ?> record : list) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
// 擷取消息
kafkaMessage.ifPresent(o -> messages.add(o.toString()));
}
if (messages.size() > 0) {
// 更新索引
updateES(messages);
}
}
綜合示例
package org.fiend.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
/**
* @author langpf 2020/2/25
*/
@Component
public class KafkaReceiver {
private static Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
/**
* 單個消息接收
* @param record rd
*/
// @KafkaListener(id = "hades", autoStartup = "${listener.auto.startup}", topics = "oop, pui, que", concurrency = "2" )
// @KafkaListener(id = "hades", autoStartup = "false", topics = "oop, pui, que", concurrency = "2" )
// @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
@KafkaListener(topics = {Constants.TOPIC_NAME})
public void listen(ConsumerRecord<?, ?> record) {
String value = (String) record.value();
String topic = record.topic();
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
@KafkaListener(topics = {Constants.TOPIC_NAME})
public void batchListen(List<ConsumerRecord<?, ?>> records) {
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
/**
* 批量接收kafka消息, 接收partition為0的消息
* @param records re
*/
@KafkaListener(id = "id0", topicPartitions = {@TopicPartition(topic = Constants.TOPIC_NAME, partitions = {"0"})})
public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id0 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("p0 Received message={}", message);
}
}
}
/**
* 批量接收kafka消息, 接收partition為1的消息
* @param records re
*/
@KafkaListener(id = "id1", topicPartitions = {@TopicPartition(topic = Constants.TOPIC_NAME, partitions = {"1"})})
public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id1 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("p1 Received message={}", message);
}
}
}
}