天天看點

使用Spring WebFlux + R2DBC搭建消息推送服務

消息推送服務主要是處理同步給使用者推送短信通知或是異步推送短信通知、微信模闆消息通知等。例如,在使用者注冊時需同步發送短信驗證碼、在訂單發貨時需異步推送微信模闆消息通知或短信通知。

消費推送接口并發量可能不高,要求同步推送消息的場景不多,但仍需要考慮可能會存在流量突增情況。在促銷活動的前後、拉新活動期間等等,都可能需要同步推送大量短信通知,而我們的目标是隻要一個

POD

(一個程序)就能處理整個電商平台的消息推送。

至于可異步的消息推送則通過

MQ

對接,實作削峰填谷,并通過監聽系統的負載情況動态的控制消息消費速度,讓系統處在一個穩定的運作狀态,這是一個理想的假設。

在團隊協同定義完消息推送接口後,消息推送服務相當于隻做一層代理,與網關非常相似,這也是我們考慮使用

WebFlux

的原因之一。以此降低消息推送服務的部署成本。

Spring WebFlux

Spring WebMvc

同為

Web

架構,不同的是,

WebFlux

是完全非阻塞的,能夠實作以少量的線程處理并發請求、以更少的硬體資源擷取系統更高的吞吐量。

但使用反應式程式設計可能不适合複雜業務的開發,也不适合采用了

DDD

領域驅動設計架構的項目,如果要使用,就必須要讓響應式

API

侵入

DDD

的領域服務類、倉儲類。

要使用

Spring WebFlux

提供完全非阻塞的接口,就必須要確定處理一個請求的整個流程都是非阻塞的,隻要有一個步驟導緻線程發生阻塞,

WebFlux

的性能就直線下降,為此你還要給

WebFlux

配置更多的線程,這與使用

WebMvc

并無差異,得不到高性能反而還增加項目的複雜性。

例如,處理接口請求阻塞在操作資料庫上,那麼預設

WebFlux

配置的幾個線程都會被阻塞住,此時,如果想通過增加

WebFlux

的工作線程數來解決問題,那麼不如直接切換回

WebMvc

使用

WebFlux

獲得高性能的同時必然要失去些什麼,畢竟是等價替換。是以代碼難以調試、項目代碼複雜度提升難以閱讀、并且會導緻一些強依賴

ThreadLocal

實作特性的架構無法正常工作,我們不得不抛棄這些架構而尋找支援反應式的架構替代。

消息推送服務在處理一次消息推送請求的過程中,可能需要通路

Redis

、資料庫

RDS

、以及第三方接口。

Redis

用于緩存消息模闆,但這塊可以使用記憶體緩存替代以擷取更快的響應速度,後期如果需要通路

Redis

,可以使用

Lettuce

替代

Jedis

請求第三方接口則可以使用

WebFlux

提供的

WebClient

實作,用于替代諸如

httpclient

okhttp

這類

http

用戶端架構,實作可以使用單一長連接配接的非阻塞發送

http

請求。

最後可能需要持久化推送記錄以便于後續報表的統計或其它,是以需要使用

R2DBC

替換

JDBC

實作非阻塞操作資料庫。

R2DBC

jdbc

的關系類似于

WebFlux

WebMvc

的關系,

R2DBC

是實作非阻塞操作資料庫的規範,提供反應式程式設計

API

,目前已有多種實作該規範的資料庫驅動程式包,如

r2dbc-mysql

spring data r2dbc

則是我們用來替代

mybatis

orm

架構。

  • webflux

    的異常處理與全局異常處理

webflux

相容

webmvc

的全局異常處理機制,如果不嫌麻煩,也可以每個接口自行處理異常,例如:

@PostMapping("push/sms")
public Mono<GenericResponse<MessagePushResultDto>> genericSendSmsMsg(
            // webflux也支援參數檢驗
            @Validated @RequestBody Mono<XxxCommand> command) {
  return xxxxService.pushMessage(command)
             .flatMap(messagePushResultDto -> Mono.just(GenericResponse.success(messagePushResultDto)))
              // 處理異常,不處理則走全局異常處理
             .onErrorResume(throwable -> Mono.just(GenericResponse.fail(throwable.getMessage())));
    }
           

複制

  • 讓自定義的

    JsonUtils

    接替

    webflux

    解析

    json

    的工作

我們将

Json

解析封裝成獨立的元件,目的是适配多個

json

解析架構,讓切換

json

解析架構隻需要切換依賴

jar

包即可。為此,我們依然需要讓

JsonUtils

替代

WebFlux

json

解析工作。代碼實作如下。

使用Spring WebFlux + R2DBC搭建消息推送服務
  • 使用

    WebClient

    發送

    Post

    請求示例
private Mono<WxmbMessageResponse> sendTemplateMessage(WxmbMessageCommand command, String token) {
        return webClient.post().uri("/cgi-bin/message/template/send?access_token=" + token)
                .accept(MediaType.APPLICATION_JSON).acceptCharset(Charset.defaultCharset())
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(command)
                .retrieve()
                .bodyToMono(WxmbMessageResponse.class);
}
           

複制

  • 使用

    spring-data-r2dbc

    實作增删改查

項目需要添加

mysql

r2dbc

驅動包,以及

spring-data-r2dbc

,同時

spring-data-r2dbc

依賴的

r2dbc-spi、r2dbc-pool

包也會被導入。

<!-- r2dbc mysql驅動-->
<dependency>
   <groupId>dev.miku</groupId>
   <artifactId>r2dbc-mysql</artifactId>
</dependency>
<!-- spring-data-r2dbc的starter包 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
           

複制

spring-data-r2dbc

spring-data-jpa

的使用非常相似,兩者都是實作

spirng-data-commons

下的

repository

API

spring-data-r2dbc

實作的是反應式

API

簡單的

CRUD

可通過繼承

R2dbcRepository<T, ID>

接口實作,例如:

public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
}
           

複制

使用

@Query

自定義查詢實作如下:

public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
    @Query("select * from message where id = ?")
    Mono<MessagePO> selectByMsgId(Long msgId);
}
           

複制

@Query

注解不等于

Mybatis

@Select

注解,

@Query

可以編寫增删改查

SQL

,如果需要執行寫操作,需要配合

@Modifying

注解使用。例如

public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
    @Modifying
    @Query("delete from message where id = :msgId")
    Mono<Integer> deleteByMsgId(Long msgId);
}
           

複制

  • 使用

    spring-data-r2dbc

    實作複雜查詢

對于複雜的查詢,我們也可以直接使用

spring-data-r2dbc

API

實作,例如:

使用Spring WebFlux + R2DBC搭建消息推送服務

DatabaseClient

類似

Mybatis

中的

SqlSession

概念。

關于

spring-data-r2dbc

的使用,推薦閱讀

spring

官方文檔,雖然是英文,但閱讀起來并不難了解,想要學習冷門技術,就必須要啃英文文檔,因為你會發現,這方面的部落格文章少之又少,還避免不了一些部落格文章使用的

spring-data-r2dbc

版本與自己使用的版本不同存在

API

差異導緻“

copy

”的代碼畫紅線問題。

spring-data-r2dbc 1.1.0

版本官方文檔連結:

https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/html/#reference

,也可到

spring.io

官網搜尋。