天天看點

kafka學習 -- 用戶端Consumer API學習

寫在前面

這裡我會總結一下,Springboot 內建 spring-kafka中,consumer 的相關配置,Api

這裡的東西,比 Producer 稍微多一些

內建相關配置

server:
  port: 9000
spring:
  kafka:
    bootstrap-servers: 192.168.1.74:9092
    consumer:
      group-id: group_id
      # 手動送出
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 60000
    listener:
      type: batch
      log-container-config: false
      concurrency: 3
      # 手動送出
      ack-mode: manual_immediate      

一、@KafkaListener

Spring Kafka 內建Kafka,消息的監聽者配置,使用特别簡單,隻有一個注解即可實作監聽

kafka學習 -- 用戶端Consumer API學習

雖然隻有一個注解配置,需要注意消息的序列化,以及相關監聽處理

二、代碼示例

@KafkaListener(topics = "users", groupId = "group_id")
    public void consume(String message, Acknowledgment acknowledgment) throws IOException {
        try {
            logger.info(String.format("#### -> Consumed message -> %s", message));
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            // 手動送出 offset
            acknowledgment.acknowledge();
        }
    }      
import com.common.Bar2;
import com.common.Foo2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 這裡定義了消費者組,和多個 topic 的對應
 *
 * @author Gary Russell
 * @since 5.1
 */
@Component
@KafkaListener(id = "multiGroup", topics = {"foos", "bars", "test"})
public class MultiMethods {

    private final Logger logger = LoggerFactory.getLogger(MultiMethods.class);

    @KafkaHandler
    public void foo(Foo2 foo) {
        System.out.println("Received: " + foo);
    }

    @KafkaHandler
    public void bar(Bar2 bar) {
        System.out.println("Received: " + bar);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        System.out.println("Received unknown: " + object);
    }


    @KafkaHandler
    public void tests(ConsumerRecord record) {
        logger.info("Received -> key :{} ", record.key());
    }

}      

三、更多Kafka,,用戶端、服務端,監控可參考下文

  • ​​kafka學習 – kafka connect​​
  • ​​kafka學習 – 用戶端Consumer API學習​​
  • ​​kafka學習 – 用戶端Producer API學習​​
  • ​​kafka學習 – 服務端指令學習​​
  • ​​Springboot內建Kafka​​