天天看点

springboot连接kafka超时处理_springboot源码架构解析KafkaAutoConfiguration

说在前面

前期回顾

sharding-jdbc源码解析 更新完毕

spring源码解析 更新完毕

spring-mvc源码解析 更新完毕

spring-tx源码解析 更新完毕

spring-boot源码解析 更新完毕

rocketmq源码解析 更新完毕

dubbbo源码解析 更新完毕

netty源码解析 更新完毕

spring源码架构更新完毕

spring-mvc源码架构更新完毕

springboot源码架构更新中

github https://github.com/tianheframe

sharding-jdbc源码解析 更新完毕

rocketmq源码解析 更新完毕

seata 源码解析 更新完毕

dubbo 源码解析 更新完毕

netty 源码解析 更新完毕

源码解析

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

kafka自动配置

监测类KafkaTemplate,解析配置文件KafkaProperties,引入KafkaAnnotationDrivenConfiguration BeanDefinition

org.springframework.boot.autoconfigure.kafka.KafkaProperties

配置文件以spring.kafka开头

private List<String> bootstrapServers = new ArrayList<String>(      Collections.singletonList("localhost:9092"));
           

逗号分隔的主机列表:用于建立到Kafka集群的初始连接的端口对。

private final Consumer consumer = new Consumer();
           

consumer

private final Producer producer = new Producer();
           

producer

private final Listener listener = new Listener();
           

listener

private final Template template = new Template();
           

template

org.springframework.boot.autoconfigure.kafka.KafkaProperties.Consumer

private Integer autoCommitInterval;
           

如果“enable.auto.commit”为真,则消费者偏移量自动提交到Kafka的频率(以毫秒为单位)。

private String autoOffsetReset;
           

如果Kafka中没有初始偏移量,或者当前偏移量在服务器上不存在,该怎么办?

private List<String> bootstrapServers;
           

逗号分隔的主机列表:用于建立到Kafka集群的初始连接的端口对。

private Boolean enableAutoCommit;
           

如果为真,使用者的偏移量将在后台定期提交。

private Integer fetchMaxWait;
           

如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器将在响应fetch请求之前阻塞的最长时间(以毫秒为单位)。

private Integer fetchMinSize;
           

服务器为获取请求返回的最小数据量(以字节为单位)。

private String groupId;
           

标识此使用者所属的使用者组的唯一字符串。

private Integer heartbeatInterval;
           

向消费者协调器发送心跳之间的预期时间(以毫秒为单位)。

private Integer maxPollRecords;
           

最大拉取的记录数

org.springframework.boot.autoconfigure.kafka.KafkaProperties.Producer

private String acks;
           

在考虑完成请求之前,生产者要求领导者收到的确认的次数。

private Integer batchSize;
           

默认批大小(以字节为单位)。较小的批大小将使批处理不那么常见,并可能降低吞吐量(批大小为零将完全禁用批处理)。

private List<String> bootstrapServers;
           

逗号分隔的主机列表:用于建立到Kafka集群的初始连接的端口对。

private Long bufferMemory;
           

生产者可以用来缓冲等待发送到服务器的记录的总内存字节数。

private Integer retries;
           

重试次数

org.springframework.boot.autoconfigure.kafka.KafkaProperties.Template

private String defaultTopic;
           

默认topic

org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener

private AckMode ackMode;
           

ack模式

private Long pollTimeout;
           

拉取超时时间

private Integer ackCount;
           

ack次数

private Long ackTime;
           

ack时间

org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration

kafka注解驱动支持

监测EnableKafka注解

private final KafkaProperties properties;
           

kafka配置文件

@Bean  @ConditionalOnMissingBean  public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {    ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();    configurer.setKafkaProperties(this.properties);    return configurer;  }
           

没初始化过ConcurrentKafkaListenerContainerFactoryConfigurer初始化

@Bean  @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")  public ConcurrentKafkaListenerContainerFactory, ?> kafkaListenerContainerFactory(      ConcurrentKafkaListenerContainerFactoryConfigurer configurer,      ConsumerFactory<Object, Object> kafkaConsumerFactory) {    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();    configurer.configure(factory, kafkaConsumerFactory);    return factory;  }
           

没初始化过kafkaListenerContainerFactory初始化

org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#configure配置指定的Kafka侦听器容器工厂。可以进一步调优工厂,并覆盖默认设置。

public void configure(      ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,      ConsumerFactory<Object, Object> consumerFactory) {    listenerContainerFactory.setConsumerFactory(consumerFactory);    Listener container = this.properties.getListener();    ContainerProperties containerProperties = listenerContainerFactory        .getContainerProperties();    if (container.getAckMode() != null) {//      ack模式      containerProperties.setAckMode(container.getAckMode());    }    if (container.getAckCount() != null) {//      ack次数      containerProperties.setAckCount(container.getAckCount());    }    if (container.getAckTime() != null) {//      ack时间      containerProperties.setAckTime(container.getAckTime());    }    if (container.getPollTimeout() != null) {//      拉取时间      containerProperties.setPollTimeout(container.getPollTimeout());    }    if (container.getConcurrency() != null) {      listenerContainerFactory.setConcurrency(container.getConcurrency());    }  }
           
@Configuration  @EnableKafka  @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)  protected static class EnableKafkaConfiguration {  }
           

开启kafka注解支持,没检测到internalKafkaListenerAnnotationProcessor加载配置

private final KafkaProperties properties;
           

kafka配置文件

@Bean  @ConditionalOnMissingBean(KafkaTemplate.class)  public KafkaTemplate, ?> kafkaTemplate(      ProducerFactory<Object, Object> kafkaProducerFactory,      ProducerListener<Object, Object> kafkaProducerListener) {    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(        kafkaProducerFactory);    kafkaTemplate.setProducerListener(kafkaProducerListener);    kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());    return kafkaTemplate;  }
           

初始化KafkaTemplate

@Bean  @ConditionalOnMissingBean(ProducerListener.class)  public ProducerListener<Object, Object> kafkaProducerListener() {    return new LoggingProducerListener<Object, Object>();  }
           

初始化ProducerListener

@Bean  @ConditionalOnMissingBean(ConsumerFactory.class)  public ConsumerFactory, ?> kafkaConsumerFactory() {    return new DefaultKafkaConsumerFactory<Object, Object>(        this.properties.buildConsumerProperties());  }
           

初始化ConsumerFactory

@Bean  @ConditionalOnMissingBean(ProducerFactory.class)  public ProducerFactory, ?> kafkaProducerFactory() {    return new DefaultKafkaProducerFactory<Object, Object>(        this.properties.buildProducerProperties());  }
           

初始化ProducerFactory

说在最后

本次解析仅代表个人观点,仅供参考。

springboot连接kafka超时处理_springboot源码架构解析KafkaAutoConfiguration

扫码进入技术微信群

springboot连接kafka超时处理_springboot源码架构解析KafkaAutoConfiguration
springboot连接kafka超时处理_springboot源码架构解析KafkaAutoConfiguration
springboot连接kafka超时处理_springboot源码架构解析KafkaAutoConfiguration

钉钉技术群

springboot连接kafka超时处理_springboot源码架构解析KafkaAutoConfiguration

qq技术群

springboot连接kafka超时处理_springboot源码架构解析KafkaAutoConfiguration