天天看點

Spring Cloud Stream 整合Kafka

Spring Cloud Stream是一個建構消息驅動微服務的架構,抽象了MQ的使用方式, 提供統一的API操作。Spring Cloud Stream通過

Binder(綁定器)

inputs/outputs

channel完成應用程式和MQ的解耦。Spring Cloud Stream的模型如下圖:
Spring Cloud Stream 整合Kafka

  • Binder

負責綁定應用程式和MQ中間件,即指定應用程式是和KafKa互動還是和RabbitMQ互動或者和其他的MQ中間件互動

  • inputs/outputs channel

inputs/outputs channel抽象釋出訂閱消息的方式,即無論是什麼類型的MQ應用程式都通過統一的方式釋出訂閱消息

Spring Cloud Stream主要配置

  • binder

綁定MQ中間件及配置

  • bindings

管理所有的Topic

  • des1tination

    指定釋出訂閱的Topic
  • contentType

    指定釋出訂閱消息的格式
  • group

    指定消費者組,(一條消息隻能被一組消息者中的一個消息者消費)

示例

高版本的Spring Cloud Stream提供兩種使用方式,一種是使用yml配置的方式綁定

生産/消費

者,另一種是通過

Function

的方式綁定

生産/消費

者。以下代碼為使用

Function

生産/消費

  1. 引入依賴
    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>3.1.0</version>
    </dependency>           
  2. 配置yml
    spring:
      cloud:
        stream:
          kafka:
            binder:
              auto-create-topics: true   # 自動建立topics
              brokers: ***.****.***.***:9092
          bindings:
            logP-out-0:    # 對用在ProducersConfig中的生産函數logP
              destination: log  # logP将資料發送的topic
              contentType: application/json
            logC-in-0:    # 對用在ConsumersConfig中的生産函數logC
              destination: log
              group: log_group
            addAgeC-in-0:
              destination: addAge
              group: addAge_group
          function:
            definition: logP;logC;addAgeC  # 指定對應的函數為Spring Cloud Stream中的生産消費通道           
  3. 編寫生産者

    方式1

    @Configuration
    public class ProducersConfig {
    
        private BlockingQueue<Person> unbounded = new LinkedBlockingQueue<>();
        
        /**
         * 對應yml中配置的logP-out-0通道,即topic log
         * @return java.util.function.Supplier<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Supplier<Person> logP(){
            return () -> unbounded.poll();
        }
    
        /**
         * 調用本方法向log topic發送消息 
         * 
         * @param person: 
         * @return void
         * @Date 2020-12-27
         **/
        public void log(Person person){
            unbounded.offer(person);
        }
        
    }           
    方式2
    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            // 通過streamBridge直接對應的topic發送消息
            return streamBridge.send("addAge", person);
        }
        
    }           
  4. 編寫消費者
    @Configuration
    public class ConsumersConfig {
    
        /**
         * 對應yml中配置的logC-in-0通道,即topic log。
         * 消費topic log中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> logC() {
    
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    
        /**
         * 對應yml中配置的addAgeC-in-0通道,即topic addAge。
         * 消費topic addAge中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> addAgeC(){
    
            return person -> {
    
                person.setAge(person.getAge() + 10);
                System.out.println("Consumer addAge: " + person.toString());
            };
        }
    }           
  5. 發送消息
    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @Autowired
        private ProducersConfig producersConfig;
    
        @PostMapping("/log")
        public void log(@RequestBody Person person){
    
            producersConfig.log(person);
        }
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            System.out.println("Producer addAge: " + person.toString());
            return streamBridge.send("addAge", person);
        }
    
    }