天天看點

rocketmq-spring : 實戰與源碼解析一網打盡

作者:勇哥java實戰分享

RocketMQ 是大家耳熟能詳的消息隊列,開源項目 rocketmq-spring 可以幫助開發者在 Spring Boot 項目中快速整合 RocketMQ。

這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實作消息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯。

rocketmq-spring : 實戰與源碼解析一網打盡

1 SDK 簡介

rocketmq-spring : 實戰與源碼解析一網打盡

項目位址:

https://github.com/apache/rocketmq-spring

rocketmq-spring 的本質是一個 Spring Boot starter 。

Spring Boot 基于“約定大于配置”(Convention over configuration)這一理念來快速地開發、測試、運作和部署 Spring 應用,并能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以指令行的方式運作,不需再部署到獨立容器中。

Spring Boot starter 構造的啟動器使用起來非常友善,開發者隻需要在 pom.xml 引入 starter 的依賴定義,在配置檔案中編寫約定的配置即可。

下面我們看下 rocketmq-spring-boot-starter 的配置:

1、引入依賴

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.3</version>
</dependency>
           

2、約定配置

rocketmq-spring : 實戰與源碼解析一網打盡

接下來,我們分别按照生産者和消費者的順序,詳細的講解消息收發的操作過程。

2 生産者

首先我們添加依賴後,進行如下三個步驟:

1、配置檔案中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
      group: platform-sms-server-group
    # access-key: myaccesskey
    # secret-key: mysecretkey
  topic: sms-common-topic
           

生産者配置非常簡單,主要配置名字服務位址和生産者組。

2、需要發送消息的類中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.topic}")
private String smsTopic;
           

3、發送消息,消息體可以是自定義對象,也可以是 Message 對象

rocketMQTemplate 類包含多鐘發送消息的方法:

  1. 同步發送 syncSend
  2. 異步發送 asyncSend
  3. 順序發送 syncSendOrderly
  4. oneway發送 sendOneWay

下面的代碼展示如何同步發送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
         rocketMQTemplate.syncSend(
            destination, 
            MessageBuilder.withPayload(messageContent).
            setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
            build()
          );
if (sendResult != null) {
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
       // send message success ,do something 
    }
}
           

syncSend 方法的第一個參數是發送的目标,格式是:topic + ":" + tags ,

第二個參數是:spring-message 規範的 message 對象 ,而 MessageBuilder 是一個工具類,方法鍊式調用建立消息對象。

3 消費者

1、配置檔案中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  consumer1:
    group: platform-sms-worker-common-group
    topic: sms-common-topic
           

2、實作消息監聽器

@Component
@RocketMQMessageListener(
    consumerGroup = "${rocketmq.consumer1.group}",  //消費組
    topic = "${rocketmq.consumer1.topic}"       //主題
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
       System.out.println("普通短信:" + message);
    }
}
           

消費者實作類也可以實作 RocketMQListener<MessageExt>, 在 onMessage 方法裡通過 RocketMQ 原生消息對象 MessageExt 擷取更詳細的消息資料 。

public void onMessage(MessageExt message) {
    try {
        String body = new String(message.getBody(), "UTF-8");
        logger.info("普通短信:" + message);
    } catch (Exception e) {
        logger.error("common onMessage error:", e);
    }
}
           

4 源碼概覽

rocketmq-spring : 實戰與源碼解析一網打盡

最新源碼中,我們可以看到源碼中包含四個子產品:

1、rocketmq-spring-boot-parent

該子產品是父子產品,定義項目所有依賴的 jar 包。

2、rocketmq-spring-boot

核心子產品,實作了 starter 的核心邏輯。

3、rocketmq-spring-boot-starter

SDK 子產品,簡單封裝,外部項目引用。

4、rocketmq-spring-boot-samples

示例代碼子產品。這個子產品非常重要,當使用者使用 SDK 時,可以參考示例快速開發。

5 starter 實作

我們重點分析下 rocketmq-spring-boot 子產品的核心源碼:

rocketmq-spring : 實戰與源碼解析一網打盡

spring-boot-starter 實作需要包含如下三個部分:

1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;

2、定義spring.factories 檔案

在 resources 包下建立 META-INF 目錄後,建立 spring.factories 檔案,并在檔案中定義自動加載類,檔案内容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
           

spring boot 會根據檔案中配置的自動化配置類來自動初始化相關的 Bean、Component 或 Service。

3、實作自動加載類

在 RocketMQAutoConfiguration 類的具體實作中,我們重點分析下生産者和消費者是如何分别啟動的。

▍生産者發送模闆類:RocketMQTemplate

RocketMQAutoConfiguration 類定義了兩個預設的 Bean :

rocketmq-spring : 實戰與源碼解析一網打盡
rocketmq-spring : 實戰與源碼解析一網打盡

首先SpringBoot項目中配置檔案中的配置值會根據屬性條件綁定到 RocketMQProperties 對象 中,然後使用 RocketMQ 的原生 API 分别建立生産者 Bean 和拉取消費者 Bean , 分别将兩個 bean 設定到 RocketMQTemplate 對象中。

兩個重點需要強調:

  • 發送消息時,将 spring-message 規範下的消息對象封裝成 RocketMQ 消息對象
  • 預設拉取消費者 litePullConsumer 。拉取消費者一般用于大資料批量處理場景 。 原生使用方式

RocketMQTemplate 類封裝了拉取消費者的receive方法,以友善開發者使用。

rocketmq-spring : 實戰與源碼解析一網打盡

▍自定義消費者類

下圖是并發消費者的例子:

rocketmq-spring : 實戰與源碼解析一網打盡

消費者示例代碼

那麼 rocketmq-spring 是如何自動啟動消費者呢 ?

rocketmq-spring : 實戰與源碼解析一網打盡

spring 容器首先注冊了消息監聽器後置處理器,然後調用 ListenerContainerConfiguration 類的 registerContainer 方法 。

對比并發消費者的例子,我們可以看到: DefaultRocketMQListenerContainer 是對 DefaultMQPushConsumer 消費邏輯的封裝。

rocketmq-spring : 實戰與源碼解析一網打盡

封裝消費消息的邏輯,同時滿足 RocketMQListener 泛化接口支援不同參數,比如 String 、MessageExt 、自定義對象 。

首先DefaultRocketMQListenerContainer初始化之後, 擷取 onMessage 方法的參數類型 。

rocketmq-spring : 實戰與源碼解析一網打盡

然後消費者調用 consumeMessage 處理消息時,封裝了一個 handleMessage 方法 ,将原生 RocketMQ 消息對象 MessageExt 轉換成 onMessage 方法定義的參數對象,然後調用 rocketMQListener 的 onMessage 方法。

rocketmq-spring : 實戰與源碼解析一網打盡

mnjh9

上圖右側标紅的代碼也就是該方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));           

6 寫到最後

開源項目 rocketmq-spring 有很多值得學習的地方 ,我們可以從如下四個層面逐層進階:

1、學會如何使用 :參考 rocketmq-spring-boot-samples 子產品的示例代碼,學會如何發送和接收消息,快速編碼;

2、子產品設計:學習項目的子產品分層 (父子產品、SDK 子產品、核心實作子產品、示例代碼子產品);

3、starter 設計思路 :定義自動配置檔案 spring.factories 、設計配置屬性類 、在 RocketMQ client 的基礎上實作優雅的封裝、深入了解 RocketMQ 源碼等;

4、舉一反三:當我們了解了 rocketmq-spring 的源碼,我們可以嘗試模仿該項目寫一個簡單的 spring boot starter。

如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!