天天看點

響應式程式設計的複雜度和簡化

作者:阿裡技術

作者:蔣文豪

響應式系統不是今天的主題,我們要讨論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。

一、什麼是響應式程式設計

什麼是響應式程式設計,它是一種程式設計範式?還是一種設計模式?抑或是其他?響應式系統和響應式程式設計有什麼關系?又比如,響應式程式設計它适用于什麼場景?解決什麼問題?

微軟于2011年率先建設了.Net上的Rx庫,以簡化容易出錯的異步和事件驅動程式設計,邁出了響應式程式設計的第一步,随後業界為許多程式設計語言提供了對應的實作。

.Net上的Rx庫位址:https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)

什麼是響應式,我們從一個例子開始。

響應式程式設計的複雜度和簡化

在上面的表格中,建立了單元格之間的關系:A1 = B1 + C1,建立關系之後A1将響應任何對于B1和C1的變化,毫無疑問,這就是一種響應式行為。

我覺得這個例子很棒的地方在于,它顯然很簡單,同時又足夠深刻,首先,它充分的展現了響應式的概念,其次,變化發生時,肯定觸發了某些過程的執行,說明背後存在關系的建立和沿着關系傳播的變化,再次,稍微深入一點看,B1和C1的變化可以是一系列的變化,可以很自然的引申到流的概念,最後,它有一個很進階的抽象,對使用方來說,整個過程是聲明式的。

當然,舉例子來說明一個概念的時候,本質上是用一個外延在解釋一個概念的内涵,往往是會将内涵縮小的,是以我們可以嘗試推廣這個外延。列出這個例子的特征:

描述了一個單元格裡的整數等于另外兩個單元格裡整數相加,當後者每次發生變化時,變化都會傳播到第一個單元格,并進行求值。

關鍵詞為整數、等于、相加、變化、每次、傳播、求值。前三個關鍵詞僅僅和例子相關,可以直接去掉。變化可以推廣為資料,每次可以在邏輯上等價于流的概念,流可以有0個、1個或多個資料,傳播可以推廣為通信(在這個意義上,函數調用、RPC、socket、MQ都是通信),求值推廣為執行一個過程。是以我們可以得出響應式程式設計的定義:

通過聲明式的通信定義,将資料流與過程組合起來,進而實作資料驅動過程的一種複合程式設計範式。

時至今日,業界對于響應式的定義仍然是不統一的,是以這是我自己的了解。響應式的基礎概念是資料流,理念是過程的執行是通過響應資料來驅動的,核心是構造資料和過程的響應關系,并且能夠讓資料沿着關系傳播驅動過程,是以響應式程式設計本質上是一種對通信的抽象,說它是一種程式設計範式,是因為它提供一種對于資料與過程組合方式的看法,說它是複合範式而不是基本範式,是因為它不像OOP或者FP一樣提供的是對于資料和過程的看法,而是以兩者為基礎,是以可以有對象響應式和函數響應式。

當我們基于響應式建構系統時,就是響應式系統,響應式系統的建構原則可以參考此處(位址:https://www.reactiveprinciples.org/patterns/communicate-facts.html),總的來說,系統會分割成一個一個的分區,分區内部對狀态進行本地化,分區之間通過通信進行異步解耦,可以通過控制這個通信的過程,實作系統的彈性擴縮容和部分元件失敗的回彈性。

響應式系統不是今天的主題,我們要讨論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。

二、響應式程式設計的複雜度

響應式程式設計的複雜度來自于4個方面:

  • 可以有0次、1次或多次資料産生,也就是資料流;
  • 除了資料之外,還有能夠辨別錯誤和完成(正常結束);
  • 資料流和資料流、資料流和過程的組合複雜度很高;
  • 在上面的基礎上,需要處理整個過程中線程切換、并發同步、資料緩沖等問題。

為了支援資料流的概念,可以産生0次、1次或多次資料産生,API設計需要把資料回調和結果回調分開,通常也會把錯誤回調和完成回調分開,這種接口被稱為流式接口,一個标準的流式接口設計如下所示:

typealias Func = () -> ()
typealias OnData<Data> = (Data) -> ()
typealias OnError = (Error) -> ()
typealias OnComplete = Func
typealias StreamFunc<Data> = (@escaping OnData<Data>, @escaping OnError, @escaping OnComplete) -> ()           

顯然,流式接口是普通異步接口将一次結果向多次結果的推廣,這種推廣同時也增加了邏輯的複雜度。

我們可以通過一個邏輯上簡單的例子來看一下流式接口的使用過程,為了關注于核心的複雜度,隻會展現前3個方面,一方面是由于加入第4點的話會導緻代碼過于冗長混淆關注點,另一方面相信各位對第4點本身的複雜度和它引起的衆多問題已經非常熟悉了。

這個例子很簡單,隻有三步:

1)假設需要為一個店鋪提供一個訂單展示頁面,這些訂單來自兩個不同的平台“鵝鵝鵝”和“鴨鴨鴨”,他們各自提供了查詢的接口(listOrders,為了簡單假設他們提供的模型和接口完全一緻);

2)訂單清單需要展示使用者的昵稱等資訊,需要通過對應平台的另外一個接口(queryUserInfo)查詢;

3)由于SDK緩存、持久化、網絡請求政策,資料無法一次性擷取,這兩個接口可能存在多次資料回調。

進一步簡化問題,我們忽略變更處理、UI渲染和使用者互動處理,僅僅考慮資料加載,這需要組合2個階段的4次接口調用,先分别請求兩個平台的訂單,使用訂單請求對應平台的userInfo,最後合并成完整資料:

// 資料怎麼回調,什麼情況結束,onError和onComplete分别在什麼情況回調,保證有且僅有一次回調
func load(onData : OnData<[OrderObject]>?, onError : OnError?, onComplete : OnComplete?) {
    let orderServices = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
    // 記錄整體請求的完成狀态
    var listOrderFinish = false
    var queryUserFinish = false
    // 記錄各個請求的結果
    var listOrderResults = orderServices.map{_ in false}
    var queryUserResults = [Bool]()
    for (index, orderService) in orderServices.enumerated() {
        orderService.listOrders { orders in
            // 已結束不處理
            if (listOrderFinish) {
                return;
            }

            let index = queryUserResults.count
            queryUserResults[index] = false
            if let userService = getUserService(site: orderService.site){
                let userIds = orders.map { order in
                    order.userId
                }
​
                userService.queryUserInfo(userIds: userIds) { userInfoDict in
                    if (listOrderFinish && queryUserFinish) {
                        return;
                    }
​
                    let orderObjects = orders.map { order in
                        OrderObject(order: order, userInfo: userInfoDict[order.userId])
                    }
                    onData?(orderObjects)
                } onError: { error in
                    // 如果是第一個錯誤,直接回調,同時标記為結束
                    if (!listOrderFinish || !queryUserFinish) {
                        listOrderFinish = true
                        queryUserFinish = true
                        onError?(error)
                    }
                } onComplete: {
                    // 外層結束,内層也結束,才是最終結束
                    if (!listOrderFinish || !queryUserFinish) {
                        queryUserResults[index] = true
                        // 所有都結束,回調
                        if (listOrderFinish && !queryUserResults.contains(false)) {
                            listOrderFinish = true
                            onComplete?()
                        }
                    }
                }
            } else {
                let orderObjects = orders.map { order in
                    OrderObject(order: order)
                }
                onData?(orderObjects)
​
                queryUserResults[index] = true
                // 所有都結束,回調
                if (listOrderFinish && !queryUserResults.contains(false)) {
                    listOrderFinish = true
                    onComplete?()
                }
            }
        } onError: { error in
            // 如果是第一個錯誤,直接回調,同時标記為結束
            if (!listOrderFinish) {
                listOrderFinish = true
                onError?(error)
            }
        } onComplete: {
            // 注意,即使所有的請求都結束了,也不能回調結束,因為這裡的結束隻是代表Order請求結束,userInfo請求不一定結束
            if (!listOrderFinish) {
                listOrderResults[index] = true
                // 所有都結束,回調
                if (!listOrderResults.contains(false)) {
                    listOrderFinish = true
                }
            }
        }
​
    }
}           

在這個接口的實作中,資料回調最簡單,在沒有結束的情況下,多次回調的資料可以直接回調,問題是如何保證錯誤和完成有且僅有一次回調,且結果回調後不再回調資料,即:

  • 什麼時候回調錯誤?
  • 什麼時候回調完成?

如果我們認為一個接口出錯,就回調錯誤,這是最簡單的錯誤處理,隻需要檢查和設定結束狀态,在沒有結束時的第一個錯誤進行回調即可,注意,我們需要在userInfo的請求中也做類似的處理,并保證錯誤回調後不再執行任何回調。

完成的回調要比錯誤複雜的多,我們可以來思考一下:

  • 首先,我們不能在listOrders的onComplete裡面取回調完成,因為這裡不能代表queryUserInfo這個接口也完成了;
  • 其次,我們也不能簡單的通過所有queryUserInfo都完成了就回調完成,因為listOrders在完成前仍然有可能傳回新的訂單資料。

也就是說,這裡的完成需要在queryUserInfo進行判斷,并且也需要考慮外層請求的完成情況,比普通異步接口的級聯要多了兩個次元。這僅僅是2種接口4次請求,在真實的程式設計中,接口數量會多得多,并且需要把第4點加進來,線程/隊列、并發、同步、緩沖區,還要處理新資料推送響應,再考慮調試、監控、排查,複雜度顯然會繼續大幅增長,保證這個過程的正确性是一件痛苦的事情。

三、響應式程式設計的複雜度使用Rx/Combine簡化響應式程式設計

為了解決這些問題,業界搞出了Reactive Streams規範(位址:https://www.reactive-streams.org/),也出現了若幹的實作,都以工具庫的形式提供,包括Rx系列、Reactor,以及蘋果功能類似的Combine。作為一個iOS開發,我對RxSwift和Combine比較了解,兩者主要的差別在于Combine多了一個Subscription的抽象來協調Publisher和Subscriber之間的行為,尤其是Back Pressure相關的控制,但總的來說,都提供了對于異步資料流的抽象群組合能力,用法上也很類似,這裡以RxSwift為例來重寫上面的過程。

第一步,實作一個将流式函數轉換成Observable的工具類,這個是通用的,非常直覺:

func makeObservable<Data>(f : @escaping StreamFunc<Data>) -> Observable<Data> {
    Observable<Data>.create { observer in
        f { data in
            observer.onNext(data)
        } _: { error in
            observer.onError(error)
        } _: {
            observer.onCompleted()
        }
        return Disposables.create()
    }
}           

第二步,針對這個例子,将listOrder和queryUserInfo轉換成StreamFunc形式,listOrder本來就是StreamFunc,對queryUserInfo進行偏應用也可以轉換為StreamFunc形式,這是具體接口相關的:

func makeStreamFunc(orders : [Order], userInfoService : UserService?) -> StreamFunc<[OrderObject]> {
    if let userInfoService = userInfoService {
        // 核心是對queryUserInfo的userIds參數進行偏應用
        let userInfoF : StreamFunc<[OrderObject]> = { onData, onError, onComplete in
            let userIds = orders.map{$0.userId}
            userInfoService.queryUserInfo(userIds: userIds, onData: { userInfoDict in
                let orderObjects = orders.map { order in
                    OrderObject(order: order, userInfo: userInfoDict[order.userId])
                }
                onData(orderObjects)
            }, onError: onError, onComplete: onComplete)
        }
        return userInfoF
    } else {
        return { onData, onError, onComplete in
            onData(orders.map{OrderObject(order: $0)})
            onComplete()
        }
    }
}           

第三步,這樣就可以将load方法簡化為:

func rxLoad() -> Observable<[OrderObject]> {
    let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
​
    // 通過map構造Observable,通過flatMap對listOrder和queryUserInfo進行複合
    let observables = orderService.map { orderService in
        makeObservable(f: orderService.listOrders).flatMap { (orders) -> Observable<[OrderObject]> in
            let userLoadF = makeStreamFunc(orders: orders, userInfoService: getUserService(site: orderService.site))
            return makeObservable(f: userLoadF)
        }
    }
​
    // merge兩個平台的Observable
    return Observable.merge(observables)
}           

可以看到,第一步是通用的,實際代碼中隻需要做第二步和第三步,這就對上面的接口進行了大量的簡化,并且庫以統一的方式處理掉了合并、級聯、多資料傳回的複雜邏輯,我們有相當的把握來保證正确性。當然,除了學習成本較高以外,也還是有缺點的,主要是使用方式仍然是異步形式,在部分環節仍然需要處理異步帶來的複雜度:

// 使用方調用
rxLoad().subscribe { orderObjects in
    // onNext閉包中處理資料
} onError: { error in
    // onError閉包中處理錯誤
} onCompleted: {
    // onCompleted閉包中處理完成
} onDisposed: {
​
}           

Rx确實大大簡化了異步程式設計,但是還不夠,因為它的使用仍然是異步形式。

四、使用AsyncSequence簡化響應式程式設計

4.1 疊代器與序列

疊代器是很多語言都有的一個概念,一個疊代器的核心是next()函數,每次調用都會傳回下一個資料,這些資料構成了一個序列(Sequence),疊代器也意味着序列可以被周遊。

4.2 異步序列

如果讓疊代器的next()方法支援異步,就産生了異步序列。Swift對此提供了一個AsyncSequence的協定,并對它提供了語言級别的支援,使得開發者可以以同步的形式周遊一個異步序列:

for try await data in asyncDataList {
    print("async get data : + \(data)")
}           

實際上,Swift在Combine中支援了Publisher的同步周遊:

// Combine的同步調用
for try await data in publisher.values {
    print("async get publiser value \(data)")
}           

4.3 CPS變換

如果能将流式接口轉換為異步序列,那麼就可以實作響應式代碼的同步編寫,這個轉換過程可以通過CPS變換實作。

CPS變換全稱Continuation-Pass-Style,這個概念來自Lisp語系,是一種顯式傳遞控制流的程式設計風格,其傳遞控制流的載體就是continuation。continuation可以了解為目前代碼執行的後續,如果一個函數f有一個continuation參數,我們就可以把目前的continuation傳遞進去,當函數産生結果時,通過continuation回到函數f外,繼續執行,這種函數調用方式成為call/cc(call with current continuation)。

這種變換,稱為CPS變換。

作為一個類比,我覺得可以将continuation了解為return的在兩個方面的推廣形式,首先,continuation是first-class的,可以作為變量存儲,可以作為函數的參數和傳回值,其次,continuation可以多次使用,而return隻能有一次。

4.4 響應式程式設計的同步形式

回頭看最原始的代碼,當我們調用orderService.listOrders時,傳進去的callback,其實就相當于一個弱化版的continuation。這意味着,如果我們可以将使用continuation将資料表示為AsyncSequence,那麼就可以将響應式代碼寫成同步形式,進而大幅簡化響應式程式設計。

Swift提供了continuation的概念,提供了AsyncStream和AsyncThrowingStream來實作這個過程,對上節Rx的實作稍作改動即可。

第一步,實作一個将流式函數轉換成AsyncThrowingStream的工具類,這個是通用的:

func makeSequence<Data>(f : StreamFunc<Data>) -> AsyncThrowingStream<Data, Error> {
    AsyncThrowingStream<Data, Error>{ continuation in
        f { data in
            continuation.yield(data)
        } _: { error in
            continuation.finish(throwing: e)
        } _: {
            continuation.finish()
        }
    }
}           

第二步,由于AsyncSequence還不支援merge,需要自己實作一個merge工具方法來實作多個流的組合,這個也是通用的:

//多個AsyncSequence merge成一個AsyncSequence
func mergeSequence<Seq : AsyncSequence>(seqs : [Seq]) -> AsyncThrowingStream<Seq.Element, Error> {
    makeSequence(f: mergeF(fs: seqs.map(makeLoadFunc)))
}
​
func makeLoadFunc<Seq : AsyncSequence>(ats : Seq) -> StreamFunc<Seq.Element>{
    { onData, onError, onComplete in
        Task {
            do {
                for try await data in ats {
                    onData(data)
                }
                onComplete()
            } catch {
                onError(error)
            }
        }
    }
}
​
func mergeF<Data>(fs : [StreamFunc<Data>]) -> StreamFunc<Data> {
    { onData, onError, onComplete in
        var finish = false
        var results = fs.map{_ in false}
        for (index, f) in fs.enumerated() {
            f { data in
                if (!finish) {
                    onData(data)
                }
            } _: { e in
                // 如果是第一個錯誤,直接回調,同時标記為結束
                if (!finish) {
                    finish = true
                    onError(e)
                }
            } _: {
                // 注意,即使所有的請求都結束了,回調成功
                if (!finish) {
                    results[index] = true
                    // 所有都結束,回調
                    if (!results.contains(false)) {
                        finish = true
                        onComplete()
                    }
                }
            }
        }
    }
}           

第三步,将listOrder和queryUserInfo轉換成StreamFunc形式,與Rx中的第二步實作完全相同;

第四步,這樣就可以将load方法簡化為:

func asLoad() -> AsyncThrowingStream<[OrderObject], Error> {
    let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
​
    // 通過map構造AsyncSequence,通過flatMap對listOrder和queryUserInfo進行複合
    let streams = orderService.map { orderService in
        makeSequence(f: orderService.listOrders).flatMap { (orders) -> AsyncThrowingStream<[OrderObject], Error> in
            makeSequence(f: makeLoadFunc(orders: orders, userInfoService: getUserService(site: orderService.site)))
        }
    }
​
    // merge兩個平台的AsyncSequence
    return mergeSequence(seqs: streams)
}           

可以發現,代碼與RxSwift幾乎是完全相同的,是以我們仍然有對于代碼正确性的信心,不同的是,現在使用方也得以獲得同樣的信心:

for try await orderObject in asLoad() {
    print("async get orderObject \(orderObject.first?.order.orderId)")
}           

五、總結

同步是程式設計中的田園世界,而流式接口作為異步接口最複雜的形态,我們通過CPS變換的控制流技術,将流式接口表示為AsyncSequence,實作了對異步序列周遊的同步形式,進而将響應式程式設計在形式上統一回了田園世界。

上面的第一步和第二步實作了AsyncSequence和StreamFunc的互相轉換,是以實際上我們證明了它們是同構的,更進一步的,我們可以證明它們與Rx、Combine也是同構的。換言之,它們是同一個概念的不同形式,理論上它們的表達能力是等價的,這個概念就是資料流,這個概念在Rx中叫做Observable,在Combine中叫做Publisher。

在實際實作上,Rx和Combine提供了大量的操作符,是以目前它們的能力遠遠強于AsyncSequence和StreamFunc,比如AsyncSequence居然不支援merge。

AsyncSequence的優勢是可以支援同步寫法,在我看來這個優勢是很大的。看到社群有過AsyncSequence替換Combine的相關的讨論,我認為邏輯上是講得通的。

AsyncSequence替換Combine的相關讨論位址:https://forums.swift.org/t/should-asyncsequence-replace-combine-in-the-future-or-should-they-coexist/53370

繼續閱讀