天天看点

Spring Cloud Stream 整合Kafka

Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过

Binder(绑定器)

inputs/outputs

channel完成应用程序和MQ的解耦。Spring Cloud Stream的模型如下图:
Spring Cloud Stream 整合Kafka

  • Binder

负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交互

  • inputs/outputs channel

inputs/outputs channel抽象发布订阅消息的方式,即无论是什么类型的MQ应用程序都通过统一的方式发布订阅消息

Spring Cloud Stream主要配置

  • binder

绑定MQ中间件及配置

  • bindings

管理所有的Topic

  • des1tination

    指定发布订阅的Topic
  • contentType

    指定发布订阅消息的格式
  • group

    指定消费者组,(一条消息只能被一组消息者中的一个消息者消费)

示例

高版本的Spring Cloud Stream提供两种使用方式,一种是使用yml配置的方式绑定

生产/消费

者,另一种是通过

Function

的方式绑定

生产/消费

者。以下代码为使用

Function

生产/消费

  1. 引入依赖
    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>3.1.0</version>
    </dependency>           
  2. 配置yml
    spring:
      cloud:
        stream:
          kafka:
            binder:
              auto-create-topics: true   # 自动创建topics
              brokers: ***.****.***.***:9092
          bindings:
            logP-out-0:    # 对用在ProducersConfig中的生产函数logP
              destination: log  # logP将数据发送的topic
              contentType: application/json
            logC-in-0:    # 对用在ConsumersConfig中的生产函数logC
              destination: log
              group: log_group
            addAgeC-in-0:
              destination: addAge
              group: addAge_group
          function:
            definition: logP;logC;addAgeC  # 指定对应的函数为Spring Cloud Stream中的生产消费通道           
  3. 编写生产者

    方式1

    @Configuration
    public class ProducersConfig {
    
        private BlockingQueue<Person> unbounded = new LinkedBlockingQueue<>();
        
        /**
         * 对应yml中配置的logP-out-0通道,即topic log
         * @return java.util.function.Supplier<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Supplier<Person> logP(){
            return () -> unbounded.poll();
        }
    
        /**
         * 调用本方法向log topic发送消息 
         * 
         * @param person: 
         * @return void
         * @Date 2020-12-27
         **/
        public void log(Person person){
            unbounded.offer(person);
        }
        
    }           
    方式2
    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            // 通过streamBridge直接对应的topic发送消息
            return streamBridge.send("addAge", person);
        }
        
    }           
  4. 编写消费者
    @Configuration
    public class ConsumersConfig {
    
        /**
         * 对应yml中配置的logC-in-0通道,即topic log。
         * 消费topic log中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> logC() {
    
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    
        /**
         * 对应yml中配置的addAgeC-in-0通道,即topic addAge。
         * 消费topic addAge中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> addAgeC(){
    
            return person -> {
    
                person.setAge(person.getAge() + 10);
                System.out.println("Consumer addAge: " + person.toString());
            };
        }
    }           
  5. 发送消息
    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @Autowired
        private ProducersConfig producersConfig;
    
        @PostMapping("/log")
        public void log(@RequestBody Person person){
    
            producersConfig.log(person);
        }
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            System.out.println("Producer addAge: " + person.toString());
            return streamBridge.send("addAge", person);
        }
    
    }