天天看點

Rocketmq spring 上手:如何在優雅地Spring 中實作消息的發送和消費

簡介: 本文将對rocktmq-spring-boot的設計實作做一個簡單的介紹,讀者可以通過本文了解将RocketMQ Client端內建為spring-boot-starter架構的開發細節,然後通過一個簡單的示例來一步一步的講解如何使用這個spring-boot-starter工具包來配置,發送和消費RocketMQ消息。

本文将對rocktmq-spring-boot的設計實作做一個簡單的介紹,讀者可以通過本文了解将RocketMQ Client端內建為spring-boot-starter架構的開發細節,然後通過一個簡單的示例來一步一步的講解如何使用這個spring-boot-starter工具包來配置,發送和消費RocketMQ消息。

作者簡介:遼天,阿裡巴巴技術專家,Apache RocketMQ 核心控,擁有多年分布式系統研發經驗,對Microservice、Messaging和Storage等領域有深刻了解, 目前專注 RocketMQ 核心優化以及 Messaging 生态建設。

通過本文,您将了解到:

  • Spring的消息架構介紹
  • rocketmq-spring-boot具體實作
  • 使用示例

前言

上世紀90年代末,随着Java EE(Enterprise Edition)的出現,特别是Enterprise Java Beans的使用需要複雜的描述符配置和死闆複雜的代碼實作,增加了廣大開發者的學習曲線和開發成本,由此基于簡單的XML配置和普通Java對象(Plain Old Java Objects)的Spring技術應運而生,依賴注入(Dependency Injection), 控制反轉(Inversion of Control)和面向切面程式設計(AOP)的技術更加靈活地解決了傳統Java企業及版本的不足。

随着Spring的持續演進,基于注解(Annotation)的配置逐漸取代了XML檔案配置, 2014年4月1日,Spring Boot 1.0.0正式釋出,它基于“約定大于配置”(Convention over configuration)這一理念來快速地開發、測試、運作和部署Spring應用,并能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以指令行的方式運作,不需再部署到獨立容器中。這種簡便直接快速建構和開發應用的過程,可以使用約定的配置并且簡化部署,受到越來越多的開發者的歡迎。

Apache RocketMQ是業界知名的分布式消息和流進行中間件,簡單地了解,它由Broker伺服器和用戶端兩部分組成:

其中用戶端一個是消息釋出者用戶端(Producer),它負責向Broker伺服器發送消息;

另外一個是消息的消費者用戶端(Consumer),多個消費者可以組成一個消費組,來訂閱和拉取消費Broker伺服器上存儲的消息。

為了利用Spring Boot的快速開發和讓使用者能夠更靈活地使用RocketMQ消息用戶端,Apache RocketMQ社群推出了spring-boot-starter實作。随着分布式事務消息功能在RocketMQ 4.3.0版本的釋出,近期更新了相關的spring-boot代碼,通過注解方式支援分布式事務的回查和事務消息的發送。

本文将對目前的設計實作做一個簡單的介紹,讀者可以通過本文了解将RocketMQ Client端內建為spring-boot-starter架構的開發細節,然後通過一個簡單的示例來一步一步的講解如何使用這個spring-boot-starter工具包來配置,發送和消費RocketMQ消息。

Spring 中的消息架構

順便在這裡讨論一下在Spring中關于消息的兩個主要的架構,即Spring Messaging和Spring Cloud Stream。它們都能夠與Spring Boot整合并提供了一些參考的實作。和所有的實作架構一樣,消息架構的目的是實作輕量級的消息驅動的微服務,可以有效地簡化開發人員對消息中間件的使用複雜度,讓系統開發人員可以有更多的精力關注于核心業務邏輯的處理。

2.1 Spring Messaging

Spring Messaging是Spring Framework 4中添加的子產品,是Spring與消息系統內建的一個擴充性的支援。它實作了從基于JmsTemplate的簡單的使用JMS接口到異步接收消息的一整套完整的基礎架構,Spring AMQP提供了該協定所要求的類似的功能集。 在與Spring Boot的內建後,它擁有了自動配置能力,能夠在測試和運作時與相應的消息傳遞系統進行內建。

單純對于用戶端而言,Spring Messaging提供了一套抽象的API或者說是約定的标準,對消息發送端和消息接收端的模式進行規定,不同的消息中間件提供商可以在這個模式下提供自己的Spring實作:在消息發送端需要實作的是一個XXXTemplate形式的Java Bean,結合Spring Boot的自動化配置選項提供多個不同的發送消息方法;在消息的消費端是一個XXXMessageListener接口(實作方式通常會使用一個注解來聲明一個消息驅動的POJO),提供回調方法來監聽和消費消息,這個接口同樣可以使用Spring Boot的自動化選項和一些定制化的屬性。

如果有興趣深入的了解Spring Messaging及針對不同的消息産品的使用,推薦閱讀這個檔案。參考Spring Messaging的既有實作,RocketMQ的spring-boot-starter中遵循了相關的設計模式并結合RocketMQ自身的功能特點提供了相應的API(如,順序,異步和事務半消息等)。

2.2 Spring Cloud Stream

Spring Cloud Stream結合了Spring Integration的注解和功能,它的應用模型如下:

Rocketmq spring 上手:如何在優雅地Spring 中實作消息的發送和消費
該圖檔引自spring cloud stream

Spring Cloud Stream架構中提供一個獨立的應用核心,它通過輸入(@Input)和輸出(@Output)通道與外部世界進行通信,消息源端(Source)通過輸出通道發送消息,消費目标端(Sink)通過監聽輸入通道來擷取消費的消息。這些通道通過專用的Binder實作與外部代理連接配接。開發人員的代碼隻需要針對應用核心提供的固定的接口和注解方式進行程式設計,而不需要關心運作時具體的Binder綁定的消息中間件。在運作時,Spring Cloud Stream能夠自動探測并使用在classpath下找到的Binder。

這樣開發人員可以輕松地在相同的代碼中使用不同類型的中間件:僅僅需要在建構時包含進不同的Binder。在更加複雜的使用場景中,也可以在應用中打包多個Binder并讓它自己選擇Binder,甚至在運作時為不同的通道使用不同的Binder。

Binder抽象使得Spring Cloud Stream應用可以靈活的連接配接到中間件,加之Spring Cloud Stream使用利用了Spring Boot的靈活配置配置能力,這樣的配置可以通過外部配置的屬性和Spring Boo支援的任何形式來提供(包括應用啟動參數、環境變量和application.yml或者application.properties檔案),部署人員可以在運作時動态選擇通道連接配接destination(例如,Kafka的topic或者RabbitMQ的exchange)。

Binder SPI的方式來讓消息中間件産品使用可擴充的API來編寫相應的Binder,并內建到Spring Cloud Steam環境,目前RocketMQ還沒有提供相關的Binder,我們計劃在下一步将完善這一功能,也希望社群裡有這方面經驗的同學積極嘗試,貢獻PR或建議。

spring-boot-starter的實作

在開始的時候我們已經知道,spring boot starter構造的啟動器對于使用者是非常友善的,使用者隻要在pom.xml引入starter的依賴定義,相應的編譯,運作和部署功能就全部自動引入。是以常用的開源元件都會為Spring的使用者提供一個spring-boot-starter封裝給開發者,讓開發者非常友善內建和使用,這裡我們詳細的介紹一下RocketMQ(用戶端)的starter實作過程。

3.1. spring-boot-starter的實作步驟

對于一個spring-boot-starter實作需要包含如下幾個部分:

  1. 在pom.xml的定義
  • 定義最終要生成的starter元件資訊
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>      
  • 定義依賴包,

它分為兩個部分: A、Spring自身的依賴包; B、RocketMQ的依賴包

<dependencies>
    <!-- spring-boot-start internal depdencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>         
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    
    
    <!-- rocketmq dependencies -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>${rocketmq-version}</version>
    </dependency>
</dependencies>    
    <dependencyManagement>
    <dependencies>
        <!-- spring-boot-start parent depdency definition --> 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>      
  1. 配置檔案類

定義應用屬性配置檔案類RocketMQProperties,這個Bean定義一組預設的屬性值。使用者在使用最終的starter時,可以根據這個類定義的屬性來修改取值,當然不是直接修改這個類的配置,而是spring-boot應用中對應的配置檔案:src/main/resources/application.properties.

  1. 定義自動加載類

定義 src/resources/META-INF/spring.factories檔案中的自動加載類, 其目的是讓spring boot更具文中中所指定的自動化配置類來自動初始化相關的Bean,Component或Service,它的内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration      

在RocketMQAutoConfiguration類的具體實作中,定義開放給使用者直接使用的Bean對象. 包括:

  • RocketMQProperties 加載應用屬性配置檔案的處理類;
  • RocketMQTemplate 發送端使用者發送消息的發送模闆類;
  • ListenerContainerConfiguration 容器Bean負責發現和注冊消費端消費實作接口類,這個類要求:由@RocketMQMessageListener注解标注;實作RocketMQListener泛化接口。
  1. 最後具體的RocketMQ相關的封裝

    在發送端(producer)和消費端(consumer)用戶端分别進行封裝,在目前的實作版本提供了對Spring Messaging接口的相容方式。

3.2. 消息發送端實作

  1. 普通發送端

發送端的代碼封裝在RocketMQTemplate POJO中,下圖是發送端的相關代碼的調用關系圖:

Rocketmq spring 上手:如何在優雅地Spring 中實作消息的發送和消費

為了與Spring Messaging的發送模闆相容,在RocketMQTemplate內建了AbstractMessageSendingTemplate抽象類,來支援相關的消息轉換和發送方法,這些方法最終會代理給doSend()方法;doSend()以及RocoketMQ所特有的一些方法如異步,單向和順序等方法直接添加到RoketMQTempalte中,這些方法直接代理調用到RocketMQ的Producer API來進行消息的發送。

  1. 事務消息發送端

對于事務消息的處理,在消息發送端進行了部分的擴充,參考下圖的調用關系類圖:

RocketMQTemplate裡加入了一個發送事務消息的方法sendMessageInTransaction(), 并且最終這個方法會代理到RocketMQ的TransactionProducer進行調用,在這個Producer上會注冊其關聯的TransactionListener實作類,以便在發送消息後能夠對TransactionListener裡的方法實作進行調用。

3.3. 消息消費端實作

Rocketmq spring 上手:如何在優雅地Spring 中實作消息的發送和消費

在消費端Spring-Boot應用啟動後,會掃描所有包含@RocketMQMessageListener注解的類(這些類需要內建RocketMQListener接口,并實作onMessage()方法),這個Listener會一對一的被放置到DefaultRocketMQListenerContainer容器對象中,容器對象會根據消費的方式(并發或順序),将RocketMQListener封裝到具體的RocketMQ内部的并發或者順序接口實作。在容器中建立RocketMQ Consumer對象,啟動并監聽定制的Topic消息,如果有消費消息,則回調到Listener的onMessage()方法。

上面的一章介紹了RocketMQ在spring-boot-starter方式的實作,這裡通過一個最簡單的消息發送和消費的例子來介紹如何使這個rocketmq-spring-boot-starter。

4.1 RocketMQ服務端的準備

 1.啟動NameServer和Broker

要驗證RocketMQ的Spring-Boot用戶端,首先要確定RocketMQ服務正确的下載下傳并啟動。可以參考RocketMQ主站的快速開始來進行操作。確定啟動NameServer和Broker已經正确啟動。

2.建立執行個體中所需要的Topics

在執行啟動指令的目錄下執行下面的指令行操作

bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic      

4.2. 編譯rocketmq-spring-boot-starter

目前的spring-boot-starter依賴還沒有送出的Maven的中心庫,使用者使用前需要自行下載下傳git源碼,然後執行mvn clean install 安裝到本地倉庫。

git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install      

4.3. 編寫用戶端代碼

使用者如果使用它,需要在消息的釋出和消費用戶端的maven配置檔案pom.xml中添加如下的依賴:

<properties>   <spring-boot-starter-rocketmq-version>1.0.0-SNAPSHOT</spring-boot-starter-rocketmq-version>
</properties>
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>spring-boot-starter-rocketmq</artifactId>
   <version>${spring-boot-starter-rocketmq-version}</version>
</dependency>      

屬性spring-boot-starter-rocketmq-version的取值為:1.0.0-SNAPSHOT, 這與上一步驟中執行安裝到本地倉庫的版本一緻。

  1. 消息發送端的代碼

發送端的配置檔案application.properties

# 定義name-server位址
spring.rocketmq.name-server=localhost:9876
# 定義釋出者組名
spring.rocketmq.producer.group=my-group1
# 定義要發送的topic
spring.rocketmq.topic=string-topic      

發送端的Java代碼

import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
...
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
    // 聲明并引用RocketMQTemplate
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    // 使用application.properties裡定義的topic屬性
    @Value("${spring.rocketmq.springTopic}")
    private String springTopic;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
        // 以同步的方式發送字元串消息給指定的topic
        SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
        // 列印發送結果資訊
        System.out.printf("string-topic syncSend1 sendResult=%s %n", sendResult);
    }
}      

消息消費端代碼

消費端的配置檔案application.properties

# 定義name-server位址
spring.rocketmq.name-server=localhost:9876
# 定義釋出者組名
spring.rocketmq.consumer.group=my-customer-group1
# 定義要發送的topic
spring.rocketmq.topic=string-topic      

消費端的Java代碼

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}
// 聲明消費消息的類,并在注解中指定,相關的消費資訊
@Service
@RocketMQMessageListener(topic = "${spring.rocketmq.topic}", consumerGroup = "${spring.rocketmq.consumer.group}")
class StringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumer received: %s %f", message);
    }
}      

這裡隻是簡單的介紹了使用spring-boot來編寫最基本的消息發送和接收的代碼,如果需要了解更多的調用方式,如: 異步發送,對象消息體,指定tag标簽以及指定事務消息,請參看github的說明文檔和詳細的代碼。我們後續還會對這些進階功能進行陸續的介紹。

參考文檔

1.Spring Boot features-Messaging

2.Enterprise Integration Pattern-組成簡介

3.Spring Cloud Stream Reference Guide

4.

https://dzone.com/articles/creating-custom-springboot-starter-for-twitter4j