作者:蔣文豪
響應式系統不是今天的主題,我們要讨論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。
一、什麼是響應式程式設計
什麼是響應式程式設計,它是一種程式設計範式?還是一種設計模式?抑或是其他?響應式系統和響應式程式設計有什麼關系?又比如,響應式程式設計它适用于什麼場景?解決什麼問題?
微軟于2011年率先建設了.Net上的Rx庫,以簡化容易出錯的異步和事件驅動程式設計,邁出了響應式程式設計的第一步,随後業界為許多程式設計語言提供了對應的實作。
.Net上的Rx庫位址:https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)
什麼是響應式,我們從一個例子開始。
在上面的表格中,建立了單元格之間的關系:A1 = B1 + C1,建立關系之後A1将響應任何對于B1和C1的變化,毫無疑問,這就是一種響應式行為。
我覺得這個例子很棒的地方在于,它顯然很簡單,同時又足夠深刻,首先,它充分的展現了響應式的概念,其次,變化發生時,肯定觸發了某些過程的執行,說明背後存在關系的建立和沿着關系傳播的變化,再次,稍微深入一點看,B1和C1的變化可以是一系列的變化,可以很自然的引申到流的概念,最後,它有一個很進階的抽象,對使用方來說,整個過程是聲明式的。
當然,舉例子來說明一個概念的時候,本質上是用一個外延在解釋一個概念的内涵,往往是會将内涵縮小的,是以我們可以嘗試推廣這個外延。列出這個例子的特征:
描述了一個單元格裡的整數等于另外兩個單元格裡整數相加,當後者每次發生變化時,變化都會傳播到第一個單元格,并進行求值。
關鍵詞為整數、等于、相加、變化、每次、傳播、求值。前三個關鍵詞僅僅和例子相關,可以直接去掉。變化可以推廣為資料,每次可以在邏輯上等價于流的概念,流可以有0個、1個或多個資料,傳播可以推廣為通信(在這個意義上,函數調用、RPC、socket、MQ都是通信),求值推廣為執行一個過程。是以我們可以得出響應式程式設計的定義:
通過聲明式的通信定義,将資料流與過程組合起來,進而實作資料驅動過程的一種複合程式設計範式。
時至今日,業界對于響應式的定義仍然是不統一的,是以這是我自己的了解。響應式的基礎概念是資料流,理念是過程的執行是通過響應資料來驅動的,核心是構造資料和過程的響應關系,并且能夠讓資料沿着關系傳播驅動過程,是以響應式程式設計本質上是一種對通信的抽象,說它是一種程式設計範式,是因為它提供一種對于資料與過程組合方式的看法,說它是複合範式而不是基本範式,是因為它不像OOP或者FP一樣提供的是對于資料和過程的看法,而是以兩者為基礎,是以可以有對象響應式和函數響應式。
當我們基于響應式建構系統時,就是響應式系統,響應式系統的建構原則可以參考此處(位址:https://www.reactiveprinciples.org/patterns/communicate-facts.html),總的來說,系統會分割成一個一個的分區,分區内部對狀态進行本地化,分區之間通過通信進行異步解耦,可以通過控制這個通信的過程,實作系統的彈性擴縮容和部分元件失敗的回彈性。
響應式系統不是今天的主題,我們要讨論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。
二、響應式程式設計的複雜度
響應式程式設計的複雜度來自于4個方面:
- 可以有0次、1次或多次資料産生,也就是資料流;
- 除了資料之外,還有能夠辨別錯誤和完成(正常結束);
- 資料流和資料流、資料流和過程的組合複雜度很高;
- 在上面的基礎上,需要處理整個過程中線程切換、并發同步、資料緩沖等問題。
為了支援資料流的概念,可以産生0次、1次或多次資料産生,API設計需要把資料回調和結果回調分開,通常也會把錯誤回調和完成回調分開,這種接口被稱為流式接口,一個标準的流式接口設計如下所示:
typealias Func = () -> ()
typealias OnData<Data> = (Data) -> ()
typealias OnError = (Error) -> ()
typealias OnComplete = Func
typealias StreamFunc<Data> = (@escaping OnData<Data>, @escaping OnError, @escaping OnComplete) -> ()
顯然,流式接口是普通異步接口将一次結果向多次結果的推廣,這種推廣同時也增加了邏輯的複雜度。
我們可以通過一個邏輯上簡單的例子來看一下流式接口的使用過程,為了關注于核心的複雜度,隻會展現前3個方面,一方面是由于加入第4點的話會導緻代碼過于冗長混淆關注點,另一方面相信各位對第4點本身的複雜度和它引起的衆多問題已經非常熟悉了。
這個例子很簡單,隻有三步:
1)假設需要為一個店鋪提供一個訂單展示頁面,這些訂單來自兩個不同的平台“鵝鵝鵝”和“鴨鴨鴨”,他們各自提供了查詢的接口(listOrders,為了簡單假設他們提供的模型和接口完全一緻);
2)訂單清單需要展示使用者的昵稱等資訊,需要通過對應平台的另外一個接口(queryUserInfo)查詢;
3)由于SDK緩存、持久化、網絡請求政策,資料無法一次性擷取,這兩個接口可能存在多次資料回調。
進一步簡化問題,我們忽略變更處理、UI渲染和使用者互動處理,僅僅考慮資料加載,這需要組合2個階段的4次接口調用,先分别請求兩個平台的訂單,使用訂單請求對應平台的userInfo,最後合并成完整資料:
// 資料怎麼回調,什麼情況結束,onError和onComplete分别在什麼情況回調,保證有且僅有一次回調
func load(onData : OnData<[OrderObject]>?, onError : OnError?, onComplete : OnComplete?) {
let orderServices = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
// 記錄整體請求的完成狀态
var listOrderFinish = false
var queryUserFinish = false
// 記錄各個請求的結果
var listOrderResults = orderServices.map{_ in false}
var queryUserResults = [Bool]()
for (index, orderService) in orderServices.enumerated() {
orderService.listOrders { orders in
// 已結束不處理
if (listOrderFinish) {
return;
}
let index = queryUserResults.count
queryUserResults[index] = false
if let userService = getUserService(site: orderService.site){
let userIds = orders.map { order in
order.userId
}
userService.queryUserInfo(userIds: userIds) { userInfoDict in
if (listOrderFinish && queryUserFinish) {
return;
}
let orderObjects = orders.map { order in
OrderObject(order: order, userInfo: userInfoDict[order.userId])
}
onData?(orderObjects)
} onError: { error in
// 如果是第一個錯誤,直接回調,同時标記為結束
if (!listOrderFinish || !queryUserFinish) {
listOrderFinish = true
queryUserFinish = true
onError?(error)
}
} onComplete: {
// 外層結束,内層也結束,才是最終結束
if (!listOrderFinish || !queryUserFinish) {
queryUserResults[index] = true
// 所有都結束,回調
if (listOrderFinish && !queryUserResults.contains(false)) {
listOrderFinish = true
onComplete?()
}
}
}
} else {
let orderObjects = orders.map { order in
OrderObject(order: order)
}
onData?(orderObjects)
queryUserResults[index] = true
// 所有都結束,回調
if (listOrderFinish && !queryUserResults.contains(false)) {
listOrderFinish = true
onComplete?()
}
}
} onError: { error in
// 如果是第一個錯誤,直接回調,同時标記為結束
if (!listOrderFinish) {
listOrderFinish = true
onError?(error)
}
} onComplete: {
// 注意,即使所有的請求都結束了,也不能回調結束,因為這裡的結束隻是代表Order請求結束,userInfo請求不一定結束
if (!listOrderFinish) {
listOrderResults[index] = true
// 所有都結束,回調
if (!listOrderResults.contains(false)) {
listOrderFinish = true
}
}
}
}
}
在這個接口的實作中,資料回調最簡單,在沒有結束的情況下,多次回調的資料可以直接回調,問題是如何保證錯誤和完成有且僅有一次回調,且結果回調後不再回調資料,即:
- 什麼時候回調錯誤?
- 什麼時候回調完成?
如果我們認為一個接口出錯,就回調錯誤,這是最簡單的錯誤處理,隻需要檢查和設定結束狀态,在沒有結束時的第一個錯誤進行回調即可,注意,我們需要在userInfo的請求中也做類似的處理,并保證錯誤回調後不再執行任何回調。
完成的回調要比錯誤複雜的多,我們可以來思考一下:
- 首先,我們不能在listOrders的onComplete裡面取回調完成,因為這裡不能代表queryUserInfo這個接口也完成了;
- 其次,我們也不能簡單的通過所有queryUserInfo都完成了就回調完成,因為listOrders在完成前仍然有可能傳回新的訂單資料。
也就是說,這裡的完成需要在queryUserInfo進行判斷,并且也需要考慮外層請求的完成情況,比普通異步接口的級聯要多了兩個次元。這僅僅是2種接口4次請求,在真實的程式設計中,接口數量會多得多,并且需要把第4點加進來,線程/隊列、并發、同步、緩沖區,還要處理新資料推送響應,再考慮調試、監控、排查,複雜度顯然會繼續大幅增長,保證這個過程的正确性是一件痛苦的事情。
三、響應式程式設計的複雜度使用Rx/Combine簡化響應式程式設計
為了解決這些問題,業界搞出了Reactive Streams規範(位址:https://www.reactive-streams.org/),也出現了若幹的實作,都以工具庫的形式提供,包括Rx系列、Reactor,以及蘋果功能類似的Combine。作為一個iOS開發,我對RxSwift和Combine比較了解,兩者主要的差別在于Combine多了一個Subscription的抽象來協調Publisher和Subscriber之間的行為,尤其是Back Pressure相關的控制,但總的來說,都提供了對于異步資料流的抽象群組合能力,用法上也很類似,這裡以RxSwift為例來重寫上面的過程。
第一步,實作一個将流式函數轉換成Observable的工具類,這個是通用的,非常直覺:
func makeObservable<Data>(f : @escaping StreamFunc<Data>) -> Observable<Data> {
Observable<Data>.create { observer in
f { data in
observer.onNext(data)
} _: { error in
observer.onError(error)
} _: {
observer.onCompleted()
}
return Disposables.create()
}
}
第二步,針對這個例子,将listOrder和queryUserInfo轉換成StreamFunc形式,listOrder本來就是StreamFunc,對queryUserInfo進行偏應用也可以轉換為StreamFunc形式,這是具體接口相關的:
func makeStreamFunc(orders : [Order], userInfoService : UserService?) -> StreamFunc<[OrderObject]> {
if let userInfoService = userInfoService {
// 核心是對queryUserInfo的userIds參數進行偏應用
let userInfoF : StreamFunc<[OrderObject]> = { onData, onError, onComplete in
let userIds = orders.map{$0.userId}
userInfoService.queryUserInfo(userIds: userIds, onData: { userInfoDict in
let orderObjects = orders.map { order in
OrderObject(order: order, userInfo: userInfoDict[order.userId])
}
onData(orderObjects)
}, onError: onError, onComplete: onComplete)
}
return userInfoF
} else {
return { onData, onError, onComplete in
onData(orders.map{OrderObject(order: $0)})
onComplete()
}
}
}
第三步,這樣就可以将load方法簡化為:
func rxLoad() -> Observable<[OrderObject]> {
let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
// 通過map構造Observable,通過flatMap對listOrder和queryUserInfo進行複合
let observables = orderService.map { orderService in
makeObservable(f: orderService.listOrders).flatMap { (orders) -> Observable<[OrderObject]> in
let userLoadF = makeStreamFunc(orders: orders, userInfoService: getUserService(site: orderService.site))
return makeObservable(f: userLoadF)
}
}
// merge兩個平台的Observable
return Observable.merge(observables)
}
可以看到,第一步是通用的,實際代碼中隻需要做第二步和第三步,這就對上面的接口進行了大量的簡化,并且庫以統一的方式處理掉了合并、級聯、多資料傳回的複雜邏輯,我們有相當的把握來保證正确性。當然,除了學習成本較高以外,也還是有缺點的,主要是使用方式仍然是異步形式,在部分環節仍然需要處理異步帶來的複雜度:
// 使用方調用
rxLoad().subscribe { orderObjects in
// onNext閉包中處理資料
} onError: { error in
// onError閉包中處理錯誤
} onCompleted: {
// onCompleted閉包中處理完成
} onDisposed: {
}
Rx确實大大簡化了異步程式設計,但是還不夠,因為它的使用仍然是異步形式。
四、使用AsyncSequence簡化響應式程式設計
4.1 疊代器與序列
疊代器是很多語言都有的一個概念,一個疊代器的核心是next()函數,每次調用都會傳回下一個資料,這些資料構成了一個序列(Sequence),疊代器也意味着序列可以被周遊。
4.2 異步序列
如果讓疊代器的next()方法支援異步,就産生了異步序列。Swift對此提供了一個AsyncSequence的協定,并對它提供了語言級别的支援,使得開發者可以以同步的形式周遊一個異步序列:
for try await data in asyncDataList {
print("async get data : + \(data)")
}
實際上,Swift在Combine中支援了Publisher的同步周遊:
// Combine的同步調用
for try await data in publisher.values {
print("async get publiser value \(data)")
}
4.3 CPS變換
如果能将流式接口轉換為異步序列,那麼就可以實作響應式代碼的同步編寫,這個轉換過程可以通過CPS變換實作。
CPS變換全稱Continuation-Pass-Style,這個概念來自Lisp語系,是一種顯式傳遞控制流的程式設計風格,其傳遞控制流的載體就是continuation。continuation可以了解為目前代碼執行的後續,如果一個函數f有一個continuation參數,我們就可以把目前的continuation傳遞進去,當函數産生結果時,通過continuation回到函數f外,繼續執行,這種函數調用方式成為call/cc(call with current continuation)。
這種變換,稱為CPS變換。
作為一個類比,我覺得可以将continuation了解為return的在兩個方面的推廣形式,首先,continuation是first-class的,可以作為變量存儲,可以作為函數的參數和傳回值,其次,continuation可以多次使用,而return隻能有一次。
4.4 響應式程式設計的同步形式
回頭看最原始的代碼,當我們調用orderService.listOrders時,傳進去的callback,其實就相當于一個弱化版的continuation。這意味着,如果我們可以将使用continuation将資料表示為AsyncSequence,那麼就可以将響應式代碼寫成同步形式,進而大幅簡化響應式程式設計。
Swift提供了continuation的概念,提供了AsyncStream和AsyncThrowingStream來實作這個過程,對上節Rx的實作稍作改動即可。
第一步,實作一個将流式函數轉換成AsyncThrowingStream的工具類,這個是通用的:
func makeSequence<Data>(f : StreamFunc<Data>) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream<Data, Error>{ continuation in
f { data in
continuation.yield(data)
} _: { error in
continuation.finish(throwing: e)
} _: {
continuation.finish()
}
}
}
第二步,由于AsyncSequence還不支援merge,需要自己實作一個merge工具方法來實作多個流的組合,這個也是通用的:
//多個AsyncSequence merge成一個AsyncSequence
func mergeSequence<Seq : AsyncSequence>(seqs : [Seq]) -> AsyncThrowingStream<Seq.Element, Error> {
makeSequence(f: mergeF(fs: seqs.map(makeLoadFunc)))
}
func makeLoadFunc<Seq : AsyncSequence>(ats : Seq) -> StreamFunc<Seq.Element>{
{ onData, onError, onComplete in
Task {
do {
for try await data in ats {
onData(data)
}
onComplete()
} catch {
onError(error)
}
}
}
}
func mergeF<Data>(fs : [StreamFunc<Data>]) -> StreamFunc<Data> {
{ onData, onError, onComplete in
var finish = false
var results = fs.map{_ in false}
for (index, f) in fs.enumerated() {
f { data in
if (!finish) {
onData(data)
}
} _: { e in
// 如果是第一個錯誤,直接回調,同時标記為結束
if (!finish) {
finish = true
onError(e)
}
} _: {
// 注意,即使所有的請求都結束了,回調成功
if (!finish) {
results[index] = true
// 所有都結束,回調
if (!results.contains(false)) {
finish = true
onComplete()
}
}
}
}
}
}
第三步,将listOrder和queryUserInfo轉換成StreamFunc形式,與Rx中的第二步實作完全相同;
第四步,這樣就可以将load方法簡化為:
func asLoad() -> AsyncThrowingStream<[OrderObject], Error> {
let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
// 通過map構造AsyncSequence,通過flatMap對listOrder和queryUserInfo進行複合
let streams = orderService.map { orderService in
makeSequence(f: orderService.listOrders).flatMap { (orders) -> AsyncThrowingStream<[OrderObject], Error> in
makeSequence(f: makeLoadFunc(orders: orders, userInfoService: getUserService(site: orderService.site)))
}
}
// merge兩個平台的AsyncSequence
return mergeSequence(seqs: streams)
}
可以發現,代碼與RxSwift幾乎是完全相同的,是以我們仍然有對于代碼正确性的信心,不同的是,現在使用方也得以獲得同樣的信心:
for try await orderObject in asLoad() {
print("async get orderObject \(orderObject.first?.order.orderId)")
}
五、總結
同步是程式設計中的田園世界,而流式接口作為異步接口最複雜的形态,我們通過CPS變換的控制流技術,将流式接口表示為AsyncSequence,實作了對異步序列周遊的同步形式,進而将響應式程式設計在形式上統一回了田園世界。
上面的第一步和第二步實作了AsyncSequence和StreamFunc的互相轉換,是以實際上我們證明了它們是同構的,更進一步的,我們可以證明它們與Rx、Combine也是同構的。換言之,它們是同一個概念的不同形式,理論上它們的表達能力是等價的,這個概念就是資料流,這個概念在Rx中叫做Observable,在Combine中叫做Publisher。
在實際實作上,Rx和Combine提供了大量的操作符,是以目前它們的能力遠遠強于AsyncSequence和StreamFunc,比如AsyncSequence居然不支援merge。
AsyncSequence的優勢是可以支援同步寫法,在我看來這個優勢是很大的。看到社群有過AsyncSequence替換Combine的相關的讨論,我認為邏輯上是講得通的。
AsyncSequence替換Combine的相關讨論位址:https://forums.swift.org/t/should-asyncsequence-replace-combine-in-the-future-or-should-they-coexist/53370