Kafka高品質專欄請看 石臻臻的雜貨鋪的Kafka專欄
說明
- 從2.2.4版開始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性将覆寫在使用者工廠中配置的具有相同名稱的所有屬性。您不能通過這種方式指定group.id和client.id屬性。他們将被忽略;
-
可以使用#{…}或屬性占位符(${…})在SpEL上配置注釋上的大多數屬性。
比如:
@KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
clientIdPrefix = "myClientId")
屬性
concurrency
将會從容器中擷取
listen.concurrency
的值,如果不存在就預設用3
@KafkaListener詳解
id 監聽器的id
①. 消費者線程命名規則
填寫:
2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線程:Thread[ consumer-id5-1-C-1
,5,main]-groupId:BASE-DEMO consumer-id5 消費
沒有填寫ID:
2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線程:Thread[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
,5,main] consumer-id7
②.在相同容器中的監聽器ID不能重複
否則會報錯
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.會覆寫消費者工廠的消費組GroupId
假如配置檔案屬性配置了消費組
kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的預設消費組
但是如果設定了
@KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那麼目前消費者的消費組就是
consumer-id7
;
當然如果你不想要他作為groupId的話 可以設定屬性
idIsGroup = false
;那麼還是會使用預設的GroupId;
④. 如果配置了屬性groupId,則其優先級最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")
例如上面代碼中最終這個消費者的消費組
GroupId
是 “groupId-test”
該id屬性(如果存在)将用作Kafka消費者group.id屬性,并覆寫消費者工廠中的已配置屬性(如果存在)您還可以groupId顯式設定或将其設定idIsGroup為false,以恢複使用使用者工廠的先前行為group.id。
groupId 消費組名
指定該消費組的消費組名; 關于消費組名的配置可以看看上面的 id 監聽器的id
如何擷取消費者 group.id
在監聽器中調用
KafkaUtils.getConsumerGroupId()
可以獲得目前的groupId; 可以在日志中列印出來; 可以知道是哪個用戶端消費的;
topics 指定要監聽哪些topic(與topicPattern、topicPartitions 三選一)
可以同時監聽多個
topics = {"SHI_TOPIC3","SHI_TOPIC4"}
topicPattern 比對Topic進行監聽(與topics、topicPartitions 三選一)
topicPartitions 顯式分區配置設定
可以為監聽器配置明确的主題和分區(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
上面例子意思是 監聽
topic1
的0,1分區;監聽
topic2
的第0分區,并且第1分區從offset為100的開始消費;
errorHandler 異常處理
實作
KafkaListenerErrorHandler
; 然後做一些異常處理;
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//do someting
return null;
}
}
調用的時候 填寫beanName;例如
errorHandler="kafkaDefaultListenerErrorHandler"
containerFactory 監聽器工廠
指定生成監聽器的工廠類;
例如我寫一個 批量消費的工廠類
/**
* 監聽器工廠 批量消費
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
//設定為批量消費,每個批次數量在Kafka配置參數中設定ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
使用
containerFactory = "batchFactory"
clientIdPrefix 用戶端字首
會覆寫消費者工廠的屬性; 最為字首後面接
kafka.consumer.client-id
n是數字
-n
concurrency并發數
會覆寫消費者工廠中的concurrency ,這裡的并發數就是多線程消費; 比如說單機情況下,你設定了3; 相當于就是啟動了3個用戶端來配置設定消費分區;分布式情況 總線程數=concurrency*機器數量; 并不是設定越多越好,具體如何設定請看 屬性concurrency的作用及配置(RoundRobinAssignor 、RangeAssignor)
/**
* 監聽器工廠
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.setConcurrency(6);
return factory;
}
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
雖然使用的工廠是
concurrencyFactory
(concurrency配置了6); 但是他最終生成的監聽器數量 是1;
properties 配置其他屬性
kafka中的屬性看
org.apache.kafka.clients.consumer.ConsumerConfig
;
同名的都可以修改掉;
用法
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
, clientIdPrefix = "myClientId5",groupId = "groupId-test",
properties = {
"enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
@KafkaListener使用
KafkaListenerEndpointRegistry
@Autowired
private KafkaListenerEndpointRegistry registry;
//.... 擷取所有注冊的監聽器
registry.getAllListenerContainers();
設定入參驗證器
當您将Spring Boot與驗證啟動器一起使用時,将LocalValidatorFactoryBean自動配置:如下
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
使用
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
TODO…