天天看點

Spring Cloud Stream同一通道根據消息内容分發不同的消費邏輯

應用場景

有的時候,我們對于同一通道中的消息處理,會通過判斷頭資訊或者消息内容來做一些差異化處理,比如:可能在消息頭資訊中帶入消息版本号,然後通過if判斷來執行不同的處理邏輯,其代碼結構可能是這樣的:

@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
    if("1.0".equals(version)) {
        // Version 1.0
    }
    if("2.0".equals(version)) {
        // Version 2.0
    }
}      

那麼當消息處理邏輯複雜的時候,這段邏輯就會變得特别複雜。針對這個問題,在

@StreamListener

注解中提供了一個不錯的屬性

condition

,可以用來優化這樣的處理結構。

https://blog.didispace.com/spring-cloud-starter-finchley-7-6/#%E5%8A%A8%E6%89%8B%E8%AF%95%E8%AF%95 動手試試

下面通過編寫一個簡單的例子來具體體會一下這個屬性的用法:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生産接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
            return "ok";
        }

    }

    /**
     * 消息消費邏輯
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
        public void receiveV1(String payload, @Header("version") String version) {
            log.info("Received v1 : " + payload + ", " + version);
        }

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
        public void receiveV2(String payload, @Header("version") String version) {
            log.info("Received v2 : " + payload + ", " + version);
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}      

内容很簡單,既包含了消息的生産,也包含了消息消費。在

/sendMessage

接口的定義中,發送了兩條消息,一條消息的頭資訊中包含version=1.0,另外一條消息的頭資訊中包含version=2.0。在消息監聽類

TestListener

中,對

TestTopic.INPUT

通道定義了兩個

@StreamListener

,這兩個監聽邏輯有不同的condition,這裡的表達式表示會根據消息頭資訊中的

version

值來做不同的處理邏輯分發。

在啟動應用之前,還要記得配置一下輸入輸出通道對應的實體目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic      

完成了上面配置之後,就可以啟動應用,并嘗試通路

localhost:8080/sendMessage?message=hello

接口來發送一個消息到MQ中了。此時可以看到類似下面的日志:

2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0
2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0      

從日志中可以看到,兩條帶有不同頭資訊的消息,分别通過不同的監聽處理邏輯輸出了對應的日志列印。

https://blog.didispace.com/spring-cloud-starter-finchley-7-6/#%E4%BB%A3%E7%A0%81%E7%A4%BA%E4%BE%8B 代碼示例

本文示例讀者可以通過檢視下面倉庫的中的

stream-content-route

項目:

如果您對這些感興趣,歡迎star、follow、收藏、轉發給予支援!

https://blog.didispace.com/spring-cloud-starter-finchley-7-6/#%E4%BB%A5%E4%B8%8B%E4%B8%93%E9%A2%98%E6%95%99%E7%A8%8B%E4%B9%9F%E8%AE%B8%E6%82%A8%E4%BC%9A%E6%9C%89%E5%85%B4%E8%B6%A3 以下專題教程也許您會有興趣