天天看點

rocketONS-starter:阿裡ONS消息服務的輕量級Spring Boot Starter

作者:青澀星球yR

介紹

rocketONS-starter 是一個基于阿裡雲ONS消息服務的輕量級Spring Boot Starter。它為您提供了一個簡單高效的封裝,使您能夠快速建立RocketMQ生産者和消費者,實作應用之間的消息傳遞。适用于高并發、高吞吐、低延遲的分布式消息場景。

軟體架構

rocketONS-starter 基于Spring的ApplicationContext容器管理,自動掃描consumer監聽器,并注冊啟動消費者,用于接收和處理來自阿裡雲ONS服務分發的消息。根據配置檔案動态建立消費者和生産者,自定義消費者的啟停開關,并自動序列化和解析消息實體。同時,它還支援消息過濾、順序消息和延時消息等進階功能,滿足不同場景的需求。

主要特點

  • 簡單易用:提供簡潔的配置和API,快速上手,輕松實作消息生産和消費。
  • 高性能:基于阿裡雲ONS消息服務,享受高并發、高吞吐、低延遲的分布式消息服務。
  • 彈性擴充:根據業務需求自由添加消費者和生産者,實作彈性擴充。
  • 進階功能支援:支援消息過濾、順序消息和延時消息等進階功能,滿足不同場景的需求。

安裝

将以下依賴添加到您的項目中:

<dependency>
    <groupId>io.gitee.zhucan123</groupId>
    <artifactId>rocket-ons-spring-boot-starter</artifactId>
    <version>1.0.8</version>
</dependency>
複制代碼           

使用說明

1. 将配置添加到項目中

yamlCopy code
rocket:
  address: http://xxxx
  secretKey: xxxx
  accessKey: xxxx
  topic: xxxx
  groupSuffix: GID_
  enable: true
  delay: 1000
複制代碼           
參數名 類型 是否必填 預設值 描述
accessKey String - 用于身份認證的AccessKeyId,建立于阿裡雲賬号管理控制台。
secretKey String - 用于身份認證的AccessKeySecret,建立于阿裡雲賬号管理控制台。
address String - 設定TCP協定接入點。
groupSuffix String GID_ 控制台建立的Group ID的字首,通常以"GID_"開頭。
topic String - 當生産者未指定topic時使用的預設綁定topic。
delay Integer 1000 消息發送延遲毫秒數。
enable Boolean true 是否啟用starter。

2. 在主程式中啟用rocketONS-starter

@EnableRocketONS
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}
複制代碼           

3. 示例代碼:使用consumer

@ConsumerListener(tags = "msg_tag", consumers = 2)
@OnsConfiguration(topic = "topic-example", group = "GID_${example.group}")
public class ExampleConsumerListener implements RocketListener<MessageData> {

  @Override
  public Action consume(Message message, MessageData messageBody, ConsumeContext consumeContext) {

    // 處理業務邏輯

    return Action.CommitMessage;
  }
}
複制代碼           
  • @OnsConfiguration:注冊成一個Spring容器,并設定消費者綁定的topic和group。可設定固定值,也可使用${propertiesKey}的方式讀取配置檔案中的配置。
  • @ConsumerListener:辨別這是一個ONS消息消費者監聽器。可以設定tags來進行消息過濾,設定consumers來指定消費者線程數。

4. 示例代碼:使用producer

@Service
public class ExampleProducerService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(MessageData messageData) {
        rocketMQTemplate.syncSend("topic-example:msg_tag", messageData);
    }
}
複制代碼           

通過注入RocketMQTemplate,使用syncSend方法同步發送消息。方法參數中的字元串格式為topic:tag,表示發送到指定topic并設定消息tag。

進階功能

1. 順序消息

使用RocketMQTemplate的syncSendOrderly方法發送順序消息,確定消費者按照發送順序進行消息處理。

rocketMQTemplate.syncSendOrderly("topic-example:msg_tag", messageData, orderId);
複制代碼           

2. 延時消息

在發送消息時,通過設定messageDelayLevel參數指定延時級别。

rocketMQTemplate.syncSend("topic-example:msg_tag", messageData, messageDelayLevel);
複制代碼           

3. 消息過濾

使用@ConsumerListener注解的tags屬性來實作消息過濾。設定對應的tag值,消費者将隻會消費帶有該tag的消息。

@ConsumerListener(tags = "msg_tag", consumers = 2)
public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
複制代碼           

請注意,您需要在生産消息時設定相應的tag。

rocketMQTemplate.syncSend("topic-example:msg_tag", messageData); 
複制代碼           

4. 異步發送消息

除了同步發送消息,RocketMQTemplate還提供了異步發送消息的方法。使用asyncSend方法發送消息,提供一個回調函數來處理發送結果。

rocketMQTemplate.asyncSend("topic-example:msg_tag", messageData, new SendCallback() {
  @Override 
  public void onSuccess(SendResult sendResult) { 
    // 處理發送成功的邏輯
  }
  @Override 
  public void onException(Throwable e) {
    // 處理發送失敗的邏輯 
  } 
});
複制代碼           

5. 廣播模式

廣播模式允許您将消息發送給所有消費者。要啟用廣播模式,需要在消費者監聽器中設定broadcast屬性為true。

javaCopy code
@ConsumerListener(tags = "msg_tag", consumers = 2, broadcast = true)
public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
複制代碼           

6. 消息重試政策

當消費者處理消息失敗時,可以通過傳回Action.ReconsumeLater來觸發消息重試。您可以在配置檔案中設定重試政策。

yamlCopy code
rocket:
  retry: 3
  delay: 1000
複制代碼           
參數名 類型 是否必填 預設值 描述
retry Integer 3 消息重試次數。
delay Integer 1000 消息發送延遲毫秒數。

7. 自定義序列化與反序列化

預設情況下,rocketONS-starter 使用 Java 序列化和反序列化消息。但您可以通過實作MessageSerializer接口來自定義序列化和反序列化政策。 public class CustomMessageSerializer implements MessageSerializer { @Override public byte[] serialize(Object obj) { // 實作自定義的序列化邏輯 } @Override public T deserialize(byte[] data, Class clazz) { // 實作自定義的反序列化邏輯 } }

然後,在配置檔案中指定自定義序列化器。

rocket:
  serializer: com.example.CustomMessageSerializer
複制代碼           

注意事項

  • 確定在生産和消費環境中使用相同的序列化和反序列化政策。
  • 當使用Action.ReconsumeLater觸發消息重試時,務必注意避免重試風暴,以免影響整體性能。

8. 延遲消息

RocketMQ 支援延遲消息發送,可以在發送時指定延遲級别。要發送延遲消息,請使用 sendDelay 方法。

int delayLevel = 3; // 延遲級别,具體延遲時間需要參考 RocketMQ 的延遲級别配置
rocketMQTemplate.sendDelay("topic-example:msg_tag", messageData, delayLevel);
複制代碼           

9. 事務消息

RocketMQ 支援發送事務消息,可以在發送消息時關聯一個本地事務。使用 sendMessageInTransaction 方法發送事務消息。

rocketMQTemplate.sendMessageInTransaction("topic-example:msg_tag", messageData, new LocalTransactionExecuter() {
    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
        // 執行本地事務
        // 傳回事務狀态
    }
}, null);
複制代碼           

10. 內建測試

為了友善內建測試,您可以使用 RocketMQTestListener 注解啟動一個測試消費者。測試消費者會把收到的消息存儲在記憶體中,以便測試時驗證。

@RocketMQTestListener(topics = "topic-example", tags = "msg_tag")
public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
複制代碼           

在測試用例中,您可以使用 RocketMQTestListener.getMessages() 方法擷取收到的消息。

11. 性能調優

為了提高系統性能,您可以通過以下方式調優 RocketMQ:

  • 調整線程池大小。
  • 優化網絡參數,如連接配接逾時時間。
  • 調整消費者拉取批次大小。
  • 優化消息堆積參數。

具體參數配置請參考 RocketMQ 官方文檔。

12. 社群與支援

  • 有問題請在 碼雲 送出 issue。
  • 歡迎參與項目開發,可通過 Fork 倉庫并送出 Pull Request。
  • 更多資訊請關注作者 碼雲首頁。

作者:すれ燦蔔

連結:https://juejin.cn/post/7215780684635537463

來源:稀土掘金

著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

繼續閱讀