天天看點

用Go語言實作ReactiveX(一)——Observable

用Go語言實作ReactiveX有很大的挑戰,Go語言本身沒有類的繼承,是以無法采用基類來做一些封裝操作。不過好在Go語言是有閉包和匿名函數。是以可以實作ReactiveX https://github.com/langhuihui/GoRx

影響設計ReactiveX的要素

  • 沒有類的繼承
  • 有匿名函數
  • 有閉包
  • 強類型,沒有泛型
  • goroutine代替異步

實作生産者Observable

  1. 發送資料
  2. 完成事件
  3. error事件
  4. 被訂閱
  5. 被取消訂閱

發送資料功能

有兩種方式可以實作,一種是直接調用回調函數,和javascript一樣。這種方式的局限性在于代碼相對啰嗦,因為golang的函數定義必須是有類型的,會涉及到更多的類型斷言的操作,匿名函數使用起來也比javascript的要更麻煩一些。第二種方式是采用channel來傳遞資料,這種方式更加go方式一點。是以我後來采取了第二種方式實作。(第一種也嘗試過) 簡而言之,核心就是一個chan interface{},一個無緩沖的channel用來發送資料。這個channel是由Observer傳遞進來的(類似于回調的概念)

type Next chan interface{}
Observable <------Next----- Observer     //subscribe
Observable
      Next-----data----> Observer       //next           

複制

被訂閱

當Observable接收到用于發送資料的channel的時候,就是被訂閱的時候。見上圖。

完成事件

利用close一個channel會産生一個事件的方式進行觸發。

Observable  close(Next)  ------> Observer              (complete)           

複制

Observer通過對channel讀取操作,如果第二個參數傳回false(channel已經被關閉)代表complete

data,ok:=<-next
if !ok{
//complete
}           

複制

error事件

由于golang對異常捕獲目前上不健全,是以暫時就通過next channel發送錯誤對象,在Observer中對資料類型進行類型斷言,如果是error類型,則認為收到了錯誤事件。

被取消訂閱(dispose)

這個事件是由Observer向Observable發出的 我們定義了一個新的channel :chan bool。成為stop channel專門用來做這個事情,這個channel不發送任何資料,隻用來close的時候廣播這個事件。

type Stop chan bool           

複制

channel在close的時候,所有等待接受資料的goroutine均能接受到這個關閉事件,這是其他語言不具備的優勢。

Obserable <-------Next、Stop---------- Observer  //subscribe
                  <--------- close stop ----------- Observer  //dispose           

複制

案例:FromArray

func FromArray(array []interface{}) Observable {
    return func(n Next, s Stop) {
        for _, item := range array {
            select {
            case <-s:
                return
            default:
                n <- item
            }
        }
        close(n)
    }
}           

複制

我們看到FromArray是一個函數,調用FromArray(數組或切片),會傳回一個Observable。Observable是一個函數

type Observable func(Next, Stop)           

複制

我們周遊傳入的數組或切片,然後向Next管道傳入數組中的元素(n<-item),假如Stop被關閉,我們也能及時取消資料發送(case <-s:return)。 當所有資料發送完畢我們關閉Next管道,發出complete信号(close(n))。 (未完待續)

李宇翔:用Go語言實作ReactiveX(二)——Deliver李宇翔:用Go語言實作ReactiveX(三)——鍊式程式設計