天天看點

RxJS 中一些容易忽略的概念

本文收集了一些在學習 RxJS 過程中發現的容易忽略或比較容易弄錯的概念,并嘗試對其進行解釋。

本文涉及的代碼均在 RxJS v6 版本,其他版本差別不大,基本概念是相同的。

EMPTY 不是空轉

EMPTY

是一個内置的 Observable,很多人看到其名字認為它什麼也不做,但對于 Observable 來說一定是處在下面“狀态”之一的:

  • 正常推送資料
  • 出現錯誤
  • 完成

EMPTY

就是直接到達完成狀态的 Observable:

import { EMPTY } from "rxjs";

EMPTY.subscribe({
  next() {
    console.log("no data will arrived");
  },
  complete() {
    console.log("complete");
  },
});
// OUTPUT: complete
           

empty()

EMPTY

功能一樣,但

empty()

已經被棄用了。

throwError()

throwError()

EMPTY

很類似,都沒有推送資料的狀态,差別隻在于

throwError()

生成的 Observable 會直接到達錯誤狀态。

catchError() 中忽略錯誤不能使源 Observable 繼續

RxJS 和 Ramda 從某些方面看相似度要高于 Lodash,最重要的特點就是操作都不會改變源資料,傳回的都是新的一份資料,也就是 Immutable。是以下面的代碼不會如願輸出 A 和 B:

import { EMPTY, from } from "rxjs";
import { catchError, map } from "rxjs/operators";

from(["a", null, "b"])
  .pipe(
    map((value) => value.toUpperCase()),
    catchError((err) => EMPTY)
  )
  .subscribe({
    next: console.log,
    complete: () => console.log("complete"),
  });
// OUTPUT: A
// OUTPUT: complete
           

讓我們看一下整體的流程:

  1. from()

    操作生成了一個 Observable,這裡将其命名為 Stream1
  2. 對 Stream1 進行

    map()

    操作,新生成了一個 Observable,将其命名為 Stream2,可以說 Stream2 和 Stream1 是沒任何關系的
  3. 和前一步類似,對 Stream2 進行

    catchError()

    操作,又新生成了一個 Observable,将其命名為 Stream3
  4. 此時有 Observer 進行了 subscribe 操作,注意此時這個 Observer 關注的是 Stream3,Stream1 和 Stream2 隻是個過渡,而在

    map()

    對 null 進行 toUpperCase() 操作時會出現異常
  5. 此時

    catchError()

    發揮了作用,傳回了

    EMPTY

    ,并同時結束了 Stream3,上文說到

    EMPTY

    也是一個 Observable,隻不過它會直接到達完成狀态,我們将其命名為 Stream4

是以最終的輸出為 A 和 complete,但是它們的來源不同: A 來自于 Stream3,complete 來自于 Stream4 。

Observable 的轉換過程就像是不同的水管連接配接在一起,每一次水流通過一個水管總會有些變化。

如何了解時間

關于 Observable 有個很好的類比:Observable 像是具有時間特性的數組。那如何了解這裡的“時間”,又怎樣量化地表示時間呢?

在檢視文檔時,經常會遇到類似下面這樣的圖:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ioWi9XEA-1609405535081)(/img/posts/rxjs/timer.png)]

下面的帶箭頭的直線就是時間軸,對于

timer(3000, 1000)

來說,從起點到第一個數字的時間長度應該是任意兩個數字之間商檢長度的 3 倍(3000 / 1000)。

而對于時間不敏感的操作,比如類似

from([1, 2, 3])

的操作來說,時間看似是沒什麼作用的,但是也是有時間概念的:1 2 3 和 結束 都在一個時間片段中發出,但對于 Observer 來說仍然是按順序一個個到達的。

RxJS 為我們提供了量化時間的方法: Marble Test,關于這個主題可以看另一篇文章,本文暫不贅述,我寫了一些 Marble Test 用例,有興趣地可以先看下。

如何了解高階 Observable

高階 Observable,也就是 發出 Observable 的 Observable,如下面的例子:

import { from, of } from "rxjs";
import { map } from "rxjs/operators";

from([1, 2, 3]).pipe(
  map(value => of(value))
).subscribe(value$ => { // Observer 1 得到 Observable 1
  value$.subscribe(console.log) // Observer 2 得到 Observab 1 推送的值
})
// OUTPUT: 1
// OUTPUT: 2
// OUTPUT: 3
           

上面的例子隻是個解釋,實際開發中沒人會這樣處理,這裡這是為了說明 Observable 可以推送任意的資料,甚至是其他的 Observable。就像我們熟悉的高階函數一樣,傳回的是函數,也需要我們再次調用這個函數才能得到最終的結果。從這個角度來看,Function 和 Observable 一樣都具有 lazy 的特性。

資料

  • Reactive Program 入門必讀
  • Marble Test

繼續閱讀