Spring Boot整合Kafka
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
預設使用
配置
prop新增kafka配置 詳細配置可參考org.springframework.boot.autoconfigure.kafka.KafkaProperties中屬性 生産者,消費者預設序列化都是String格式message
#kafka預設消費者配置
spring.kafka.consumer.bootstrap-servers=10.111.7.124:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
#kafka預設生産者配置
spring.kafka.producer.bootstrap-servers=10.111.7.124:9092
spring.kafka.producer.acks=-1
spring.kafka.client-id=kafka-producer
spring.kafka.producer.batch-size=5
使用
//生産者
@Resource
private KafkaTemplate kafkaTemplate;
public void send() {
HashMap<String, String> map = new HashMap<>();
map.put("sendType","send");
kafkaTemplate.send("test01", JSONUtil.toJsonStr(map));
}
//消費者
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test01",groupId = "group01")
public void listen(String message) {
log.info("message:{}",message);
}
}
使用Json格式化
自定義消費者工廠: 配置全部消費者參數,包含轉換器,使用此項可删除轉換器配置2
**轉換器配置2:**僅配置json轉換器,不需要配置全部消費者參數時可删除自定義消費者配置
addTrustedPackages("*"):預設Json轉換僅信任java.util,java.lang包下類,增加後不進行類型檢查
@Configuration
public class KafkaCustomConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
//自定義消費者工廠
@Bean("customContainerFactory")
public ConcurrentKafkaListenerContainerFactory customContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "customGroup01");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//轉換器配置1
DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props);
JsonDeserializer jsonDeserializer = new JsonDeserializer();
jsonDeserializer.getTypeMapper().addTrustedPackages("*");
consumerFactory.setValueDeserializer(jsonDeserializer);
//指定使用DefaultKafkaConsumerFactory
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//設定可批量拉取消息消費,拉取數量一次3,看需求設定
factory.setConcurrency(3);
factory.setBatchListener(true);
return factory;
}
//轉換器配置2
@Bean
public RecordMessageConverter converter() {
ByteArrayJsonMessageConverter converter = new ByteArrayJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("*");
converter.setTypeMapper(typeMapper);
return converter;
}
/**
* 不使用spring boot的KafkaAutoConfiguration預設方式建立的KafkaTemplate,重新定義
* 與預設配置隻能存在一個
* @return
*/
@Bean("custiomKafkaTemplate")
public KafkaTemplate custiomKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
//0 producer不等待broker同步完成的确認,繼續發送下一條(批)資訊
//1 producer要等待leader成功收到資料并得到确認,才發送下一條message。
//-1 producer得到follwer确認,才發送下一條資料
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
DefaultKafkaProducerFactory produceFactory = new DefaultKafkaProducerFactory(props);
return new KafkaTemplate(produceFactory);
}
}
vo
@Data
public class TestVo {
private String key;
private String value;
}
生産者
//注入自定義KafkaTemplate
@Resource(name = "custiomKafkaTemplate")
private KafkaTemplate custiomKafkaTemplate;
public void customSend() {
TestVo vo = new TestVo();
vo.setKey("sendType");
vo.setValue("send");
custiomKafkaTemplate.send("custom04", vo);
}
消費者
containerFactory:指定自定義消費者工廠beanName
@Slf4j
@Component
public class KafkaConsumer {
//自定義消費者
@KafkaListener(topics = "custom04",containerFactory = "customContainerFactory")
public void customListen(TestVo message) {
log.info("message:{}",message.toString());
}
//僅自定義轉換器
@KafkaListener(topics = "custom04",groupId = "custom04Group")
public void customListen(TestVo message) {
log.info("message:{}",message.toString());
}
}