介紹
rocketONS-starter 是一個基于阿裡雲ONS消息服務的輕量級Spring Boot Starter。它為您提供了一個簡單高效的封裝,使您能夠快速建立RocketMQ生産者和消費者,實作應用之間的消息傳遞。适用于高并發、高吞吐、低延遲的分布式消息場景。
軟體架構
rocketONS-starter 基于Spring的ApplicationContext容器管理,自動掃描consumer監聽器,并注冊啟動消費者,用于接收和處理來自阿裡雲ONS服務分發的消息。根據配置檔案動态建立消費者和生産者,自定義消費者的啟停開關,并自動序列化和解析消息實體。同時,它還支援消息過濾、順序消息和延時消息等進階功能,滿足不同場景的需求。
主要特點
- 簡單易用:提供簡潔的配置和API,快速上手,輕松實作消息生産和消費。
- 高性能:基于阿裡雲ONS消息服務,享受高并發、高吞吐、低延遲的分布式消息服務。
- 彈性擴充:根據業務需求自由添加消費者和生産者,實作彈性擴充。
- 進階功能支援:支援消息過濾、順序消息和延時消息等進階功能,滿足不同場景的需求。
安裝
将以下依賴添加到您的項目中:
<dependency>
<groupId>io.gitee.zhucan123</groupId>
<artifactId>rocket-ons-spring-boot-starter</artifactId>
<version>1.0.8</version>
</dependency>
複制代碼
使用說明
1. 将配置添加到項目中
yamlCopy code
rocket:
address: http://xxxx
secretKey: xxxx
accessKey: xxxx
topic: xxxx
groupSuffix: GID_
enable: true
delay: 1000
複制代碼
參數名 | 類型 | 是否必填 | 預設值 | 描述 |
accessKey | String | 是 | - | 用于身份認證的AccessKeyId,建立于阿裡雲賬号管理控制台。 |
secretKey | String | 是 | - | 用于身份認證的AccessKeySecret,建立于阿裡雲賬号管理控制台。 |
address | String | 是 | - | 設定TCP協定接入點。 |
groupSuffix | String | 否 | GID_ | 控制台建立的Group ID的字首,通常以"GID_"開頭。 |
topic | String | 是 | - | 當生産者未指定topic時使用的預設綁定topic。 |
delay | Integer | 否 | 1000 | 消息發送延遲毫秒數。 |
enable | Boolean | 否 | true | 是否啟用starter。 |
2. 在主程式中啟用rocketONS-starter
@EnableRocketONS
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
複制代碼
3. 示例代碼:使用consumer
@ConsumerListener(tags = "msg_tag", consumers = 2)
@OnsConfiguration(topic = "topic-example", group = "GID_${example.group}")
public class ExampleConsumerListener implements RocketListener<MessageData> {
@Override
public Action consume(Message message, MessageData messageBody, ConsumeContext consumeContext) {
// 處理業務邏輯
return Action.CommitMessage;
}
}
複制代碼
- @OnsConfiguration:注冊成一個Spring容器,并設定消費者綁定的topic和group。可設定固定值,也可使用${propertiesKey}的方式讀取配置檔案中的配置。
- @ConsumerListener:辨別這是一個ONS消息消費者監聽器。可以設定tags來進行消息過濾,設定consumers來指定消費者線程數。
4. 示例代碼:使用producer
@Service
public class ExampleProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(MessageData messageData) {
rocketMQTemplate.syncSend("topic-example:msg_tag", messageData);
}
}
複制代碼
通過注入RocketMQTemplate,使用syncSend方法同步發送消息。方法參數中的字元串格式為topic:tag,表示發送到指定topic并設定消息tag。
進階功能
1. 順序消息
使用RocketMQTemplate的syncSendOrderly方法發送順序消息,確定消費者按照發送順序進行消息處理。
rocketMQTemplate.syncSendOrderly("topic-example:msg_tag", messageData, orderId);
複制代碼
2. 延時消息
在發送消息時,通過設定messageDelayLevel參數指定延時級别。
rocketMQTemplate.syncSend("topic-example:msg_tag", messageData, messageDelayLevel);
複制代碼
3. 消息過濾
使用@ConsumerListener注解的tags屬性來實作消息過濾。設定對應的tag值,消費者将隻會消費帶有該tag的消息。
@ConsumerListener(tags = "msg_tag", consumers = 2)
public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
複制代碼
請注意,您需要在生産消息時設定相應的tag。
rocketMQTemplate.syncSend("topic-example:msg_tag", messageData);
複制代碼
4. 異步發送消息
除了同步發送消息,RocketMQTemplate還提供了異步發送消息的方法。使用asyncSend方法發送消息,提供一個回調函數來處理發送結果。
rocketMQTemplate.asyncSend("topic-example:msg_tag", messageData, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 處理發送成功的邏輯
}
@Override
public void onException(Throwable e) {
// 處理發送失敗的邏輯
}
});
複制代碼
5. 廣播模式
廣播模式允許您将消息發送給所有消費者。要啟用廣播模式,需要在消費者監聽器中設定broadcast屬性為true。
javaCopy code
@ConsumerListener(tags = "msg_tag", consumers = 2, broadcast = true)
public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
複制代碼
6. 消息重試政策
當消費者處理消息失敗時,可以通過傳回Action.ReconsumeLater來觸發消息重試。您可以在配置檔案中設定重試政策。
yamlCopy code
rocket:
retry: 3
delay: 1000
複制代碼
參數名 | 類型 | 是否必填 | 預設值 | 描述 |
retry | Integer | 否 | 3 | 消息重試次數。 |
delay | Integer | 否 | 1000 | 消息發送延遲毫秒數。 |
7. 自定義序列化與反序列化
預設情況下,rocketONS-starter 使用 Java 序列化和反序列化消息。但您可以通過實作MessageSerializer接口來自定義序列化和反序列化政策。 public class CustomMessageSerializer implements MessageSerializer { @Override public byte[] serialize(Object obj) { // 實作自定義的序列化邏輯 } @Override public T deserialize(byte[] data, Class clazz) { // 實作自定義的反序列化邏輯 } }
然後,在配置檔案中指定自定義序列化器。
rocket:
serializer: com.example.CustomMessageSerializer
複制代碼
注意事項
- 確定在生産和消費環境中使用相同的序列化和反序列化政策。
- 當使用Action.ReconsumeLater觸發消息重試時,務必注意避免重試風暴,以免影響整體性能。
8. 延遲消息
RocketMQ 支援延遲消息發送,可以在發送時指定延遲級别。要發送延遲消息,請使用 sendDelay 方法。
int delayLevel = 3; // 延遲級别,具體延遲時間需要參考 RocketMQ 的延遲級别配置
rocketMQTemplate.sendDelay("topic-example:msg_tag", messageData, delayLevel);
複制代碼
9. 事務消息
RocketMQ 支援發送事務消息,可以在發送消息時關聯一個本地事務。使用 sendMessageInTransaction 方法發送事務消息。
rocketMQTemplate.sendMessageInTransaction("topic-example:msg_tag", messageData, new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
// 執行本地事務
// 傳回事務狀态
}
}, null);
複制代碼
10. 內建測試
為了友善內建測試,您可以使用 RocketMQTestListener 注解啟動一個測試消費者。測試消費者會把收到的消息存儲在記憶體中,以便測試時驗證。
@RocketMQTestListener(topics = "topic-example", tags = "msg_tag")
public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
複制代碼
在測試用例中,您可以使用 RocketMQTestListener.getMessages() 方法擷取收到的消息。
11. 性能調優
為了提高系統性能,您可以通過以下方式調優 RocketMQ:
- 調整線程池大小。
- 優化網絡參數,如連接配接逾時時間。
- 調整消費者拉取批次大小。
- 優化消息堆積參數。
具體參數配置請參考 RocketMQ 官方文檔。
12. 社群與支援
- 有問題請在 碼雲 送出 issue。
- 歡迎參與項目開發,可通過 Fork 倉庫并送出 Pull Request。
- 更多資訊請關注作者 碼雲首頁。
作者:すれ燦蔔
連結:https://juejin.cn/post/7215780684635537463
來源:稀土掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。