在之前的章節中,所有消費者和生産者均通過@EnableBinding定義,此方式能夠快速的建構生産消費關系,但仔細想想,如果我們需要根據一定的條件決策消息生産者将消息發往哪個通道,貌似目前簡單粗暴的方式無法滿足。如此常見的場景,springcloud必然會幫我們想到,通過BinderAwareChannelResolver的bean執行個體即可實作動态通道的選擇,其會伴随@EnableBinding注解自動完成注冊。
本章概要
1、BinderAwareChannelResolver的應用
2、ExpressionEvaluatingRouter的應用
BinderAwareChannelResolver的應用
- 首先來看BinderAwareChannelResolver的直接應用,為了友善場景模拟,采用一個rest api方式觸發消息的生産發送。
消費者Receiver工程改造
1、在MySink中添加如下兩個動态接收通道,dynamic1-channel與dynamic1-channel
package com.cloud.shf.stream.sink;
public interface MySink {
/*********************************動态通道選擇示例******************************/
String DYNAMIC1_CHANNEL = "dynamic1-channel";
String DYNAMIC2_CHANNEL = "dynamic2-channel";
@Input(DYNAMIC1_CHANNEL)
SubscribableChannel dynamic1Input();
@Input(DYNAMIC2_CHANNEL)
SubscribableChannel dynamic2Input();
}
2、在SinkReceiver.class中添加對上述兩個通道的監聽,并列印接收内容
/*********************************動态通道選擇示例******************************/
@StreamListener(value = MySink.DYNAMIC1_CHANNEL)
public void dynamic1Receiver(@Payload User user) {
LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC1_CHANNEL, user.getAge());
}
@StreamListener(value = MySink.DYNAMIC2_CHANNEL)
public void dynamic2Receiver(@Payload User user) {
LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC2_CHANNEL, user.getAge());
}
生産者Sender工程改造
添加一個DynamicDestinationController類,提供一個rest-api協助進行場景模拟
package com.cloud.shf.stream.controller;
@EnableBinding
@Controller
public class DynamicDestinationController {
@Autowired
private BinderAwareChannelResolver resolver;
/************************************方式一************************************/
@RequestMapping(path = "/{dest}", method = RequestMethod.POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@PathVariable("dest") String dest,
@RequestBody String body,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, dest, contentType);
}
private void sendMessage(String body, String dest, Object contentType) {
resolver.resolveDestination(dest).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
Note
- 直接注入BinderAwareChannelResolver的bean執行個體即可
- 通過PathVariable屬性dest值模拟通道名稱
- boby作為消息體
- contentType作為消息的頭資訊
服務驗證
1、啟動receiver、sender兩個工程;
2、多次通過curl請求api如下(curl -H "Content-Type: application/json" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/dynamic*-channel)
此時可以看到receiver工程的控制台列印如下
其列印的來源通道與請求中的占位符完全比對,繼續觀察sender控制台日志,由于原來并沒有在sernder中定義相關通道描述,故首次觸發指定通道即可看到如下日志記錄:
小節,由此可以看到,根據占位符dest動态路由成功,準确的被發送至預期的消息通道。實際應用中,如果我們預先知道可能的動态路由通道名稱,則可以通過spring.cloud.stream.dynamicDestinations配置白名單,隻有預設定的通道名稱方會被動态綁定,避免建立大量無效的通道資訊,浪費資源。
ExpressionEvaluatingRouter的應用
- 通過下圖可以BinderAwareChannelResolver類的定義
其實作了Spring Integration的DestinationResolver接口,并且BinderAwareChannelResolver執行個體可以被注入在其他的components執行個體中,本小節将實作将BinderAwareChannelResolver執行個體注入在ExpressionEvaluatingRouter中實作消息通道的動态綁定。
1、在DynamicDestinationController類中添加如下實作
/************************************方式二************************************/
@RequestMapping(path = "/", method = RequestMethod.POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody User body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType, @RequestHeader(name = "dest", required = false) String dest) {
Map<String, Object> headers = new HashMap<>(2);
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
if (!StringUtils.isEmpty(dest)) {
headers.put("dest", dest);
}
sendMessage(body, headers);
}
private void sendMessage(User body, Map<String, Object> headers) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(headers)));
}
@Bean(name = "router-channel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "router-channel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("headers[dest]"));
//作用于通過spel表達式沒有擷取到對應的通道資訊
router.setDefaultOutputChannelName("dynamic1-channel");
router.setChannelResolver(resolver);
return router;
}
Note
- 通過頭資訊中的dest屬性作為動态綁定的依據;如果未設定dest則采用預設dynamic1-channel作為消息通道
- 通過Spel表達式擷取頭資訊中的dest屬性值(headers[dest])
- 将BinderAwareChannelResolver注入至ExpressionEvaluatingRouter執行個體中
- 其中org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor實作了對Message的處理,故可以通過此處看到我們消息包含的消息體、消息體具體資訊,進而更好的編寫Spel表達式,主要代碼如下:
2、再次通過curl多次請求如下(curl -H "Content-Type: application/json" -H "dest:dynamic1-channel" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/)
receiver工程控制台如下: