说在前面
前期回顾
sharding-jdbc源码解析 更新完毕
spring源码解析 更新完毕
spring-mvc源码解析 更新完毕
spring-tx源码解析 更新完毕
spring-boot源码解析 更新完毕
rocketmq源码解析 更新完毕
dubbbo源码解析 更新完毕
netty源码解析 更新完毕
spring源码架构更新完毕
spring-mvc源码架构更新完毕
springboot源码架构更新中
github https://github.com/tianheframe
sharding-jdbc源码解析 更新完毕
rocketmq源码解析 更新完毕
seata 源码解析 更新完毕
dubbo 源码解析 更新完毕
netty 源码解析 更新完毕
源码解析
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
kafka自动配置
监测类KafkaTemplate,解析配置文件KafkaProperties,引入KafkaAnnotationDrivenConfiguration BeanDefinition
org.springframework.boot.autoconfigure.kafka.KafkaProperties
配置文件以spring.kafka开头
private List<String> bootstrapServers = new ArrayList<String>( Collections.singletonList("localhost:9092"));
逗号分隔的主机列表:用于建立到Kafka集群的初始连接的端口对。
private final Consumer consumer = new Consumer();
consumer
private final Producer producer = new Producer();
producer
private final Listener listener = new Listener();
listener
private final Template template = new Template();
template
org.springframework.boot.autoconfigure.kafka.KafkaProperties.Consumer
private Integer autoCommitInterval;
如果“enable.auto.commit”为真,则消费者偏移量自动提交到Kafka的频率(以毫秒为单位)。
private String autoOffsetReset;
如果Kafka中没有初始偏移量,或者当前偏移量在服务器上不存在,该怎么办?
private List<String> bootstrapServers;
逗号分隔的主机列表:用于建立到Kafka集群的初始连接的端口对。
private Boolean enableAutoCommit;
如果为真,使用者的偏移量将在后台定期提交。
private Integer fetchMaxWait;
如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器将在响应fetch请求之前阻塞的最长时间(以毫秒为单位)。
private Integer fetchMinSize;
服务器为获取请求返回的最小数据量(以字节为单位)。
private String groupId;
标识此使用者所属的使用者组的唯一字符串。
private Integer heartbeatInterval;
向消费者协调器发送心跳之间的预期时间(以毫秒为单位)。
private Integer maxPollRecords;
最大拉取的记录数
org.springframework.boot.autoconfigure.kafka.KafkaProperties.Producer
private String acks;
在考虑完成请求之前,生产者要求领导者收到的确认的次数。
private Integer batchSize;
默认批大小(以字节为单位)。较小的批大小将使批处理不那么常见,并可能降低吞吐量(批大小为零将完全禁用批处理)。
private List<String> bootstrapServers;
逗号分隔的主机列表:用于建立到Kafka集群的初始连接的端口对。
private Long bufferMemory;
生产者可以用来缓冲等待发送到服务器的记录的总内存字节数。
private Integer retries;
重试次数
org.springframework.boot.autoconfigure.kafka.KafkaProperties.Template
private String defaultTopic;
默认topic
org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener
private AckMode ackMode;
ack模式
private Long pollTimeout;
拉取超时时间
private Integer ackCount;
ack次数
private Long ackTime;
ack时间
org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration
kafka注解驱动支持
监测EnableKafka注解
private final KafkaProperties properties;
kafka配置文件
@Bean @ConditionalOnMissingBean public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); return configurer; }
没初始化过ConcurrentKafkaListenerContainerFactoryConfigurer初始化
@Bean @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>(); configurer.configure(factory, kafkaConsumerFactory); return factory; }
没初始化过kafkaListenerContainerFactory初始化
org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#configure配置指定的Kafka侦听器容器工厂。可以进一步调优工厂,并覆盖默认设置。
public void configure( ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory, ConsumerFactory<Object, Object> consumerFactory) { listenerContainerFactory.setConsumerFactory(consumerFactory); Listener container = this.properties.getListener(); ContainerProperties containerProperties = listenerContainerFactory .getContainerProperties(); if (container.getAckMode() != null) {// ack模式 containerProperties.setAckMode(container.getAckMode()); } if (container.getAckCount() != null) {// ack次数 containerProperties.setAckCount(container.getAckCount()); } if (container.getAckTime() != null) {// ack时间 containerProperties.setAckTime(container.getAckTime()); } if (container.getPollTimeout() != null) {// 拉取时间 containerProperties.setPollTimeout(container.getPollTimeout()); } if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } }
@Configuration @EnableKafka @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) protected static class EnableKafkaConfiguration { }
开启kafka注解支持,没检测到internalKafkaListenerAnnotationProcessor加载配置
private final KafkaProperties properties;
kafka配置文件
@Bean @ConditionalOnMissingBean(KafkaTemplate.class) public KafkaTemplate, ?> kafkaTemplate( ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>( kafkaProducerFactory); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; }
初始化KafkaTemplate
@Bean @ConditionalOnMissingBean(ProducerListener.class) public ProducerListener<Object, Object> kafkaProducerListener() { return new LoggingProducerListener<Object, Object>(); }
初始化ProducerListener
@Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<Object, Object>( this.properties.buildConsumerProperties()); }
初始化ConsumerFactory
@Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory, ?> kafkaProducerFactory() { return new DefaultKafkaProducerFactory<Object, Object>( this.properties.buildProducerProperties()); }
初始化ProducerFactory
说在最后
本次解析仅代表个人观点,仅供参考。
扫码进入技术微信群
钉钉技术群
qq技术群