用Go語言實作ReactiveX有很大的挑戰,Go語言本身沒有類的繼承,是以無法采用基類來做一些封裝操作。不過好在Go語言是有閉包和匿名函數。是以可以實作ReactiveX https://github.com/langhuihui/GoRx
影響設計ReactiveX的要素
- 沒有類的繼承
- 有匿名函數
- 有閉包
- 強類型,沒有泛型
- goroutine代替異步
實作生産者Observable
- 發送資料
- 完成事件
- error事件
- 被訂閱
- 被取消訂閱
發送資料功能
有兩種方式可以實作,一種是直接調用回調函數,和javascript一樣。這種方式的局限性在于代碼相對啰嗦,因為golang的函數定義必須是有類型的,會涉及到更多的類型斷言的操作,匿名函數使用起來也比javascript的要更麻煩一些。第二種方式是采用channel來傳遞資料,這種方式更加go方式一點。是以我後來采取了第二種方式實作。(第一種也嘗試過) 簡而言之,核心就是一個chan interface{},一個無緩沖的channel用來發送資料。這個channel是由Observer傳遞進來的(類似于回調的概念)
type Next chan interface{}
Observable <------Next----- Observer //subscribe
Observable
Next-----data----> Observer //next
複制
被訂閱
當Observable接收到用于發送資料的channel的時候,就是被訂閱的時候。見上圖。
完成事件
利用close一個channel會産生一個事件的方式進行觸發。
Observable close(Next) ------> Observer (complete)
複制
Observer通過對channel讀取操作,如果第二個參數傳回false(channel已經被關閉)代表complete
data,ok:=<-next
if !ok{
//complete
}
複制
error事件
由于golang對異常捕獲目前上不健全,是以暫時就通過next channel發送錯誤對象,在Observer中對資料類型進行類型斷言,如果是error類型,則認為收到了錯誤事件。
被取消訂閱(dispose)
這個事件是由Observer向Observable發出的 我們定義了一個新的channel :chan bool。成為stop channel專門用來做這個事情,這個channel不發送任何資料,隻用來close的時候廣播這個事件。
type Stop chan bool
複制
channel在close的時候,所有等待接受資料的goroutine均能接受到這個關閉事件,這是其他語言不具備的優勢。
Obserable <-------Next、Stop---------- Observer //subscribe
<--------- close stop ----------- Observer //dispose
複制
案例:FromArray
func FromArray(array []interface{}) Observable {
return func(n Next, s Stop) {
for _, item := range array {
select {
case <-s:
return
default:
n <- item
}
}
close(n)
}
}
複制
我們看到FromArray是一個函數,調用FromArray(數組或切片),會傳回一個Observable。Observable是一個函數
type Observable func(Next, Stop)
複制
我們周遊傳入的數組或切片,然後向Next管道傳入數組中的元素(n<-item),假如Stop被關閉,我們也能及時取消資料發送(case <-s:return)。 當所有資料發送完畢我們關閉Next管道,發出complete信号(close(n))。 (未完待續)
李宇翔:用Go語言實作ReactiveX(二)——Deliver李宇翔:用Go語言實作ReactiveX(三)——鍊式程式設計