消息推送服務主要是處理同步給使用者推送短信通知或是異步推送短信通知、微信模闆消息通知等。例如,在使用者注冊時需同步發送短信驗證碼、在訂單發貨時需異步推送微信模闆消息通知或短信通知。
消費推送接口并發量可能不高,要求同步推送消息的場景不多,但仍需要考慮可能會存在流量突增情況。在促銷活動的前後、拉新活動期間等等,都可能需要同步推送大量短信通知,而我們的目标是隻要一個
POD
(一個程序)就能處理整個電商平台的消息推送。
至于可異步的消息推送則通過
MQ
對接,實作削峰填谷,并通過監聽系統的負載情況動态的控制消息消費速度,讓系統處在一個穩定的運作狀态,這是一個理想的假設。
在團隊協同定義完消息推送接口後,消息推送服務相當于隻做一層代理,與網關非常相似,這也是我們考慮使用
WebFlux
的原因之一。以此降低消息推送服務的部署成本。
Spring WebFlux
與
Spring WebMvc
同為
Web
架構,不同的是,
WebFlux
是完全非阻塞的,能夠實作以少量的線程處理并發請求、以更少的硬體資源擷取系統更高的吞吐量。
但使用反應式程式設計可能不适合複雜業務的開發,也不适合采用了
DDD
領域驅動設計架構的項目,如果要使用,就必須要讓響應式
API
侵入
DDD
的領域服務類、倉儲類。
要使用
Spring WebFlux
提供完全非阻塞的接口,就必須要確定處理一個請求的整個流程都是非阻塞的,隻要有一個步驟導緻線程發生阻塞,
WebFlux
的性能就直線下降,為此你還要給
WebFlux
配置更多的線程,這與使用
WebMvc
并無差異,得不到高性能反而還增加項目的複雜性。
例如,處理接口請求阻塞在操作資料庫上,那麼預設
WebFlux
配置的幾個線程都會被阻塞住,此時,如果想通過增加
WebFlux
的工作線程數來解決問題,那麼不如直接切換回
WebMvc
。
使用
WebFlux
獲得高性能的同時必然要失去些什麼,畢竟是等價替換。是以代碼難以調試、項目代碼複雜度提升難以閱讀、并且會導緻一些強依賴
ThreadLocal
實作特性的架構無法正常工作,我們不得不抛棄這些架構而尋找支援反應式的架構替代。
消息推送服務在處理一次消息推送請求的過程中,可能需要通路
Redis
、資料庫
RDS
、以及第三方接口。
Redis
用于緩存消息模闆,但這塊可以使用記憶體緩存替代以擷取更快的響應速度,後期如果需要通路
Redis
,可以使用
Lettuce
替代
Jedis
。
請求第三方接口則可以使用
WebFlux
提供的
WebClient
實作,用于替代諸如
httpclient
、
okhttp
這類
http
用戶端架構,實作可以使用單一長連接配接的非阻塞發送
http
請求。
最後可能需要持久化推送記錄以便于後續報表的統計或其它,是以需要使用
R2DBC
替換
JDBC
實作非阻塞操作資料庫。
R2DBC
與
jdbc
的關系類似于
WebFlux
與
WebMvc
的關系,
R2DBC
是實作非阻塞操作資料庫的規範,提供反應式程式設計
API
,目前已有多種實作該規範的資料庫驅動程式包,如
r2dbc-mysql
,
spring data r2dbc
則是我們用來替代
mybatis
的
orm
架構。
-
的異常處理與全局異常處理webflux
webflux
相容
webmvc
的全局異常處理機制,如果不嫌麻煩,也可以每個接口自行處理異常,例如:
@PostMapping("push/sms")
public Mono<GenericResponse<MessagePushResultDto>> genericSendSmsMsg(
// webflux也支援參數檢驗
@Validated @RequestBody Mono<XxxCommand> command) {
return xxxxService.pushMessage(command)
.flatMap(messagePushResultDto -> Mono.just(GenericResponse.success(messagePushResultDto)))
// 處理異常,不處理則走全局異常處理
.onErrorResume(throwable -> Mono.just(GenericResponse.fail(throwable.getMessage())));
}
複制
- 讓自定義的
接替JsonUtils
解析webflux
的工作json
我們将
Json
解析封裝成獨立的元件,目的是适配多個
json
解析架構,讓切換
json
解析架構隻需要切換依賴
jar
包即可。為此,我們依然需要讓
JsonUtils
替代
WebFlux
的
json
解析工作。代碼實作如下。

- 使用
發送WebClient
請求示例Post
private Mono<WxmbMessageResponse> sendTemplateMessage(WxmbMessageCommand command, String token) {
return webClient.post().uri("/cgi-bin/message/template/send?access_token=" + token)
.accept(MediaType.APPLICATION_JSON).acceptCharset(Charset.defaultCharset())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(command)
.retrieve()
.bodyToMono(WxmbMessageResponse.class);
}
複制
- 使用
實作增删改查spring-data-r2dbc
項目需要添加
mysql
的
r2dbc
驅動包,以及
spring-data-r2dbc
,同時
spring-data-r2dbc
依賴的
r2dbc-spi、r2dbc-pool
包也會被導入。
<!-- r2dbc mysql驅動-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
<!-- spring-data-r2dbc的starter包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
複制
spring-data-r2dbc
與
spring-data-jpa
的使用非常相似,兩者都是實作
spirng-data-commons
下的
repository
的
API
,
spring-data-r2dbc
實作的是反應式
API
。
簡單的
CRUD
可通過繼承
R2dbcRepository<T, ID>
接口實作,例如:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
}
複制
使用
@Query
自定義查詢實作如下:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Query("select * from message where id = ?")
Mono<MessagePO> selectByMsgId(Long msgId);
}
複制
@Query
注解不等于
Mybatis
的
@Select
注解,
@Query
可以編寫增删改查
SQL
,如果需要執行寫操作,需要配合
@Modifying
注解使用。例如
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Modifying
@Query("delete from message where id = :msgId")
Mono<Integer> deleteByMsgId(Long msgId);
}
複制
- 使用
實作複雜查詢spring-data-r2dbc
對于複雜的查詢,我們也可以直接使用
spring-data-r2dbc
的
API
實作,例如:
DatabaseClient
類似
Mybatis
中的
SqlSession
概念。
關于
spring-data-r2dbc
的使用,推薦閱讀
spring
官方文檔,雖然是英文,但閱讀起來并不難了解,想要學習冷門技術,就必須要啃英文文檔,因為你會發現,這方面的部落格文章少之又少,還避免不了一些部落格文章使用的
spring-data-r2dbc
版本與自己使用的版本不同存在
API
差異導緻“
copy
”的代碼畫紅線問題。
spring-data-r2dbc 1.1.0
版本官方文檔連結:
https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/html/#reference
,也可到
spring.io
官網搜尋。