天天看點

Spring Cloud Stream消費失敗後的處理政策(二):自定義錯誤處理邏輯

應用場景

上一篇

《Spring Cloud Stream消費失敗後的處理政策(一):自動重試》

介紹了預設就會生效的消息重試功能。對于一些因環境原因、網絡抖動等不穩定因素引發的問題可以起到比較好的作用。但是對于諸如代碼本身存在的邏輯錯誤等,無論重試多少次都不可能成功的問題,是無法修複的。對于這樣的情況,前文中說了可以利用日志記錄消息内容,配合告警來做補救,但是很顯然,這樣做非常原始,并且太過笨拙,處理複雜度過高。是以,我們需要需求更好的辦法,本文将介紹針對該類問題的一種處理方法:自定義錯誤處理邏輯。

https://blog.didispace.com/spring-cloud-starter-finchley-7-3/#%E5%8A%A8%E6%89%8B%E8%AF%95%E8%AF%95 動手試試

準備一個會消費失敗的例子,可以直接沿用前文的工程,也可以建立一個,然後建立如下代碼的邏輯:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生産接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /**
     * 消息消費邏輯
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}      

内容很簡單,既包含了消息的生産,也包含了消息消費。消息消費的時候主動抛出了一個異常來模拟消息的消費失敗。

在啟動應用之前,還要記得配置一下輸入輸出通道對應的實體目标(exchange或topic名)、并設定一下分組,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

spring.cloud.stream.bindings.example-topic-output.destination=test-topic      

完成了上面配置之後,啟動應用并通路

localhost:8080/sendMessage?message=hello

接口來發送一個消息到MQ中了,此時可以看到消費失敗後抛出了異常,跟上一篇文章的結果一樣,消息消費失敗,記錄了日志,消息資訊丢棄。

下面,針對消息消費失敗,在

TestListener

中針對消息消費邏輯建立一段錯誤處理邏輯,比如:

@Slf4j
@Component
static class TestListener {

    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received payload : " + payload);
        throw new RuntimeException("Message consumer failed!");
    }

    /**
     * 消息消費失敗的降級處理邏輯
     *
     * @param message
     */
    @ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
    public void error(Message<?> message) {
        log.info("Message consumer failed, call fallback!");
    }

}      

通過使用

@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")

指定了某個通道的錯誤處理映射。其中,inputChannel的配置中對應關系如下:

  • test-topic

    :消息通道對應的目标(destination,即:

    spring.cloud.stream.bindings.example-topic-input.destination

    的配置)
  • stream-exception-handler

    :消息通道對應的消費組(group,即:

    spring.cloud.stream.bindings.example-topic-input.group

再啟動應用并通路

localhost:8080/sendMessage?message=hello

接口來發送一個消息到MQ中,此時可以看到日志如下:

2018-12-11 12:00:35.500  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://[email protected]:5672/, localPort= 54391]
2018-12-11 12:00:35.527  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Received: hello,
2018-12-11 12:00:38.541  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Message consumer failed, call fallback!      

雖然消費邏輯中輸出了消息内容之後抛出了異常,但是會進入到error函數中,執行錯誤處理邏輯(這裡隻是答應了一句話),使用者可以根據需要讀取消息内容以及異常詳情做更進一步的細化處理。

https://blog.didispace.com/spring-cloud-starter-finchley-7-3/#%E6%B7%B1%E5%85%A5%E6%80%9D%E8%80%83 深入思考

由于error邏輯是通過編碼方式來實作的,是以這段邏輯相對來說比較死。通常,隻有業務上有明确的錯誤處理邏輯的時候,這種方法才可以比較好的被應用到。不然能做的可能也隻是将消息記錄下來,然後具體的分析原因後再去做補救措施。是以這種方法也不是萬能的,主要适用于有明确錯誤處理方案的方式來使用(這種場景并不多),另外。。。

注意:有坑! 這個方案在目前版本(2.0.x)其實還有一個坑,這種方式并不能很好的處理異常消息,會有部分消息得不到正确的處理,由于應用場景也不多,是以目前不推薦使用這種方法來做(完全可以用原始的異常捕獲機制來處理,隻是沒有這種方式那麼優雅)。目前看官方issue是在Spring Cloud Stream的2.1.0版本中會修複,後續釋出之後可以使用該功能,具體點選檢視: Issue #1357

而對于沒有特定的錯誤處理方案的,也隻能通過記錄和後續處理來解決,可能這樣的方式也隻是比從日志中抓去簡單那麼一些,并沒有得到很大的提升。但是,不要緊,因為下一篇我們将繼續介紹其他更好的處理方案。

https://blog.didispace.com/spring-cloud-starter-finchley-7-3/#%E4%BB%A3%E7%A0%81%E7%A4%BA%E4%BE%8B 代碼示例

本文示例讀者可以通過檢視下面倉庫的中的

stream-exception-handler-2

項目:

如果您對這些感興趣,歡迎star、follow、收藏、轉發給予支援!

https://blog.didispace.com/spring-cloud-starter-finchley-7-3/#%E4%BB%A5%E4%B8%8B%E4%B8%93%E9%A2%98%E6%95%99%E7%A8%8B%E4%B9%9F%E8%AE%B8%E6%82%A8%E4%BC%9A%E6%9C%89%E5%85%B4%E8%B6%A3 以下專題教程也許您會有興趣