2019 年 1 月,孵化 6 個月的 RocketMQ-Spring 作為 Apache RocketMQ 的子項目正式畢業,釋出了第一個 Release 版本 2.0.1。該項目是把 RocketMQ 的用戶端使用 Spring Boot 的方式進行了封裝,可以讓使用者通過簡單的 annotation 和标準的 Spring Messaging API 編寫代碼來進行消息的發送和消費。當時 RocketMQ 社群同學請 Spring 社群的同學對 RocketMQ-Spring 代碼進行 review,引出一段 羅美琪(RocketMQ)和春波特(Spring Boot)的故事
。
時隔兩年,RocketMQ-Spring 正式釋出 2.2.0。在這期間,RocketMQ-Spring 疊代了數個版本,以 RocketMQ-Spring 為基礎實作的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了
Spring 的官網 ,Spring 布道師 baeldung 向國外同學介紹 如何使用 RocketMQ-Spring ,越來越多國内外的同學開始使用 RocketMQ-Spring 收發消息,RocketMQ-Spring 倉庫的 star 數也在短短兩年時間内超越了 Spring-Kafka 和 Spring-AMQP(注:兩者均由 Spring 社群維護),成為 Apache RocketMQ 最受歡迎的生态項目之一。
遵循 Spring Messaging API 規範
Spring Messaging 提供了一套抽象的 API,對消息發送端和消息接收端的模式進行規定,不同的消息中間件提供商可以在這個模式下提供自己的 Spring 實作:在消息發送端需要實作的是一個 XXXTemplate 形式的 Java Bean,結合 Spring Boot 的自動化配置選項提供多個不同的發送消息方法;在消息的消費端是一個 XXXMessageListener 接口(實作方式通常會使用一個注解來聲明一個消息驅動的 POJO),提供回調方法來監聽和消費消息,這個接口同樣可以使用 Spring Boot 的自動化選項和一些定制化的屬性。
1. 發送端
RocketMQ-Spring 在遵循 Spring Messaging API 規範的基礎上結合 RocketMQ 自身的功能特點提供了相應的 API。在消息的發送端,RocketMQ-Spring 通過實作 RocketMQTemplate 完成消息的發送。如下圖所示,RocketMQTemplate 繼承 AbstractMessageSendingTemplate 抽象類,來支援 Spring Messaging API 标準的消息轉換和發送方法,這些方法最終會代理給 doSend 方法,doSend 方法會最終調用 syncSend,由 DefaultMQProducer 實作。
除 Spring Messaging API 規範中的方法,RocketMQTemplate 還實作了 RocketMQ 原生用戶端的一些方法,來支援更加豐富的消息類型。值得注意的是,相比于原生用戶端需要自己去建構 RocketMQ Message(比如将對象序列化成 byte 數組放入 Message 對象),RocketMQTemplate 可以直接将對象、字元串或者 byte 數組作為參數發送出去(對象序列化操作由 RocketMQ-Spring 内置完成),在消費端約定好對應的 Schema 即可正常收發。
RocketMQTemplate Send API:
SendResult syncSend(String destination, Object payload)
SendResult syncSend(String destination, Message<?> message)
void asyncSend(String destination, Message<?> message, SendCallback sendCallback)
void asyncSend(String destination, Message<?> message, SendCallback sendCallback)
……
2. 消費端
在消費端,需要實作一個包含 @RocketMQMessageListener 注解的類(需要實作 RocketMQListener 接口,并實作 onMessage 方法,在注解中進行 topic、consumerGroup 等屬性配置),這個 Listener 會一對一的被放置到 DefaultRocketMQListenerContainer 容器對象中,容器對象會根據消費的方式(并發或順序),将 RocketMQListener 封裝到具體的 RocketMQ 内部的并發或者順序接口實作。在容器中建立 RocketMQ DefaultPushConsumer 對象,啟動并監聽定制的 Topic 消息,完成約定 Schema 對象的轉換,回調到 Listener 的 onMessage 方法。
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message); }
}
除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQ-Spring 實作了 RocketMQ Lite Pull Consumer。通過在配置檔案中進行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主動 Pull 消息。
配置檔案resource/application.properties:
rocketmq.name-server=localhost:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
Pull Consumer代碼:
while(!isStop) {
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.println(messages);
}
豐富的消息類型
RocketMQ Spring 消息類型支援方面與 RocketMQ 原生用戶端完全對齊,包括同步/異步/one-way、順序、延遲、批量、事務以及 Request-Reply 消息。在這裡,主要介紹較為特殊的事務消息和 request-reply 消息。
1. 事務消息
RocketMQ 的事務消息不同于 Spring Messaging 中的事務消息,依然采用 RocketMQ 原生事務消息的方案。如下所示,發送事務消息時需要實作一個包含 @RocketMQTransactionListener 注解的類,并實作 executeLocalTransaction 和 checkLocalTransaction 方法,進而來完成執行本地事務以及檢查本地事務執行結果。
// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...;
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field 'transName'
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, commit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, commit or unknown
return RocketMQLocalTransactionState.COMMIT;
}
}
在 2.1.0 版本中,RocketMQ-Spring 重構了事務消息的實作,如下圖所示,舊版本中每一個 group 對應一個 TransactionProducer,而在新版本中改為每一個 RocketMQTemplate 對應一個 TransationProducer,進而解決了并發使用多個事務消息的問題。當使用者需要在單程序使用多個事務消息時,可以使用 ExtRocketMQTemplate 來完成(一般情況下,推薦一個程序使用一個 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同程序中需要使用多個 Producer / LitePullConsumer 的場景,可以為 ExtRocketMQTemplate 指定與标準模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在對應的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 為 ExtRocketMQTemplate 的 BeanName。
2. Request-Reply 消息
在 2.1.0 版本中,RocketMQ-Spring 開始支援 Request-Reply 消息。Request-Reply 消息指的是上遊服務投遞消息後進入等待被通知的狀态,直到消費端傳回結果并傳回給發送端。在 RocketMQ-Spring 中,發送端通過 RocketMQTemplate 的 sendAndReceivce 方法進行發送,如下所示,主要有同步和異步兩種方式。異步方式中通過實作 RocketMQLocalRequestCallback 進行回調。
// 同步發送request并且等待String類型的傳回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
// 異步發送request并且等待User類型的傳回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
@Override public void onSuccess(User message) {
……
}
@Override public void onException(Throwable e) {
……
}
});
在消費端,仍然需要實作一個包含 @RocketMQMessageListener 注解的類,但需要實作的接口是 RocketMQReplyListener 接口(普通消息為 RocketMQListener 接口),其中 T 表示接收值的類型,R 表示傳回值的類型,接口需要實作帶傳回值的 onMessage 方法,傳回值的内容傳回給對應的 Producer。
@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
……
return "reply string";
}
}
RocketMQ-Spring 遵循 Spring 約定大于配置(Convention over configuration)的理念,通過啟動器(Spring Boot Starter)的方式,在 pom 檔案引入依賴(groupId:org.apache.rocketmq,artifactId:rocketmq-spring-boot-starter)便可以在 Spring Boot 中內建所有 RocketMQ 用戶端的所有功能,通過簡單的注解使用即可完成消息的收發。在
RocketMQ-Spring Github Wiki中有更加詳細的用法和常見問題解答。
據統計,從 RocketMQ-Spring 釋出第一個正式版本以來,RocketMQ-Spring 完成 16 個 bug 修複,37 個 imporvement,其中包括事務消息重構,消息過濾、消息序列化、多執行個體 RocketMQTemplate 優化等重要優化,歡迎更多的小夥伴能參與到 RocketMQ 社群的建設中來,羅美琪(RocketMQ)和春波特(Spring Boot)的故事還在繼續...