天天看點

SpringCloud - Stream 動态綁定消息通道

在之前的章節中,所有消費者和生産者均通過@EnableBinding定義,此方式能夠快速的建構生産消費關系,但仔細想想,如果我們需要根據一定的條件決策消息生産者将消息發往哪個通道,貌似目前簡單粗暴的方式無法滿足。如此常見的場景,springcloud必然會幫我們想到,通過BinderAwareChannelResolver的bean執行個體即可實作動态通道的選擇,其會伴随@EnableBinding注解自動完成注冊。

本章概要

1、BinderAwareChannelResolver的應用

2、ExpressionEvaluatingRouter的應用

BinderAwareChannelResolver的應用

  • 首先來看BinderAwareChannelResolver的直接應用,為了友善場景模拟,采用一個rest api方式觸發消息的生産發送。

消費者Receiver工程改造

1、在MySink中添加如下兩個動态接收通道,dynamic1-channel與dynamic1-channel

package com.cloud.shf.stream.sink;
public interface MySink {
    /*********************************動态通道選擇示例******************************/
    String DYNAMIC1_CHANNEL = "dynamic1-channel";
    String DYNAMIC2_CHANNEL = "dynamic2-channel";
 
    @Input(DYNAMIC1_CHANNEL)
    SubscribableChannel dynamic1Input();
 
    @Input(DYNAMIC2_CHANNEL)
    SubscribableChannel dynamic2Input();
}      

2、在SinkReceiver.class中添加對上述兩個通道的監聽,并列印接收内容

/*********************************動态通道選擇示例******************************/
@StreamListener(value = MySink.DYNAMIC1_CHANNEL)
public void dynamic1Receiver(@Payload User user) {
    LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC1_CHANNEL, user.getAge());
}
 
@StreamListener(value = MySink.DYNAMIC2_CHANNEL)
public void dynamic2Receiver(@Payload User user) {
    LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC2_CHANNEL, user.getAge());
}      

生産者Sender工程改造

添加一個DynamicDestinationController類,提供一個rest-api協助進行場景模拟

package com.cloud.shf.stream.controller;
@EnableBinding
@Controller
public class DynamicDestinationController {
 
    @Autowired
    private BinderAwareChannelResolver resolver;
 
    /************************************方式一************************************/
    @RequestMapping(path = "/{dest}", method = RequestMethod.POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@PathVariable("dest") String dest,
                              @RequestBody String body,
                              @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, dest, contentType);
    }
 
    private void sendMessage(String body, String dest, Object contentType) {
        resolver.resolveDestination(dest).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}      

Note

  • 直接注入BinderAwareChannelResolver的bean執行個體即可
  • 通過PathVariable屬性dest值模拟通道名稱
  • boby作為消息體
  • contentType作為消息的頭資訊

服務驗證

1、啟動receiver、sender兩個工程;

2、多次通過curl請求api如下(curl -H "Content-Type: application/json" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/dynamic*-channel)

SpringCloud - Stream 動态綁定消息通道

此時可以看到receiver工程的控制台列印如下

SpringCloud - Stream 動态綁定消息通道

其列印的來源通道與請求中的占位符完全比對,繼續觀察sender控制台日志,由于原來并沒有在sernder中定義相關通道描述,故首次觸發指定通道即可看到如下日志記錄:

SpringCloud - Stream 動态綁定消息通道

小節,由此可以看到,根據占位符dest動态路由成功,準确的被發送至預期的消息通道。實際應用中,如果我們預先知道可能的動态路由通道名稱,則可以通過spring.cloud.stream.dynamicDestinations配置白名單,隻有預設定的通道名稱方會被動态綁定,避免建立大量無效的通道資訊,浪費資源。

ExpressionEvaluatingRouter的應用

  • 通過下圖可以BinderAwareChannelResolver類的定義
SpringCloud - Stream 動态綁定消息通道

其實作了Spring Integration的DestinationResolver接口,并且BinderAwareChannelResolver執行個體可以被注入在其他的components執行個體中,本小節将實作将BinderAwareChannelResolver執行個體注入在ExpressionEvaluatingRouter中實作消息通道的動态綁定。

1、在DynamicDestinationController類中添加如下實作

/************************************方式二************************************/
@RequestMapping(path = "/", method = RequestMethod.POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody User body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType, @RequestHeader(name = "dest", required = false) String dest) {
    Map<String, Object> headers = new HashMap<>(2);
    headers.put(MessageHeaders.CONTENT_TYPE, contentType);
    if (!StringUtils.isEmpty(dest)) {
        headers.put("dest", dest);
    }
    sendMessage(body, headers);
}
 
private void sendMessage(User body, Map<String, Object> headers) {
    routerChannel().send(MessageBuilder.createMessage(body,
            new MessageHeaders(headers)));
}
 
@Bean(name = "router-channel")
public MessageChannel routerChannel() {
    return new DirectChannel();
}
 
@Bean
@ServiceActivator(inputChannel = "router-channel")
public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("headers[dest]"));
    //作用于通過spel表達式沒有擷取到對應的通道資訊
    router.setDefaultOutputChannelName("dynamic1-channel");
    router.setChannelResolver(resolver);
    return router;
}      

Note

  • 通過頭資訊中的dest屬性作為動态綁定的依據;如果未設定dest則采用預設dynamic1-channel作為消息通道
  • 通過Spel表達式擷取頭資訊中的dest屬性值(headers[dest])
  • 将BinderAwareChannelResolver注入至ExpressionEvaluatingRouter執行個體中
  • 其中org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor實作了對Message的處理,故可以通過此處看到我們消息包含的消息體、消息體具體資訊,進而更好的編寫Spel表達式,主要代碼如下:
    SpringCloud - Stream 動态綁定消息通道

2、再次通過curl多次請求如下(curl -H "Content-Type: application/json" -H "dest:dynamic1-channel" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/)

SpringCloud - Stream 動态綁定消息通道

receiver工程控制台如下:

繼續閱讀