天天看點

【轉】Angular—RxJS之Observable

目錄

一、概述

二、詳解

Observable翻譯為可觀察對象,是RxJS中的一個對象,可以用來處理異步事件,例如http請求。實際上,在Angular中,所有的http請求傳回的都是Observable。Observable和promise本質上是相同的,都是生産者主動向消費者push産品,而消費者被動接收。但兩者也有很大差別,Observable可以發送任意多值,并且在被訂閱錢,它是不會執行的,這是promise不具備的特點。

Observable用于在發送方和接收方之間傳輸消息,可以将這些消息看作流。

在建立Observable對象時,需要傳入一個函數作為構造函數的參數,這個函數叫訂閱者函數,是生産者向消費者推送消息的地方。在被消費者subscribe(訂閱)之前,訂閱者函數不會執行,直到subscribe()函數被調用,該函數傳回一個subscription對象,裡面有一個unsubscribe()函數,消費者可以随時拒絕消息的接收。

subscribe()函數接收一個observer(觀察者)對象作為入參。

消息的發送可以是同步的,也可以是異步的。

我們可以使用一系列的RxJS操作符,在這些消息被接收方接收之前,對它進行一些列的處理、轉換,因為這些操作符都是純函數,沒有副作用,可以放心使用,并不會産生期望之外的效果。

observer觀察者

observer為觀察者,可以觀察Observable對象。observer是一個對象,裡面包含三個屬性,分别是next、error和complete,它們都是函數。

  1. (1) next: 以接收的值作為入參, 在正常情況下執行, 可選
  2. (2) error: 出錯的情況下執行, 可選
  3. (3) complete: 傳輸完成的情況下執行, 可選
Observable被觀察者

執行個體

通過RxJS庫中的of方法建立一個Observable對象,代碼如下所示。

  1. onClick() {
  2. let observable = of(1, 2, 3);
  3. observable.subscribe({ next: num => console.log(num) });
  4. // 控制台分别輸出:1 2 3
  5. }

解析

RxJS中的of方法用于建立一個Observable,它會将其參數一個一個的分别發送給接收方。

subscribe函數接收一個Observable對象,但這裡,隻定義了next方法,可以發現next方法接收一個參數,這個參數就是生産者發送過來的值,然後将其列印在控制台上。

訂閱者函數
  1. onClick2() {
  2. const observable = Observable.create(observer => {
  3. try {
  4. observer.next(1);
  5. observer.next(2);
  6. observer.next(3);
  7. } catch (e) {
  8. observer.error(e);
  9. observer.complete();
  10. });
  11. const observer = {
  12. next: num => console.log(num),
  13. error: e => console.log(e),
  14. complete: () => console.log('complete!!!')
  15. observable.subscribe(observer); // 控制台分别輸出:1 2 3 complete!!!
【轉】Angular—RxJS之Observable

以上代碼效果與用of建立的Observable是一樣的,of函數的源碼就是類似這種結構。

subscribe訂閱

在被消費者subscribe(訂閱)之前,訂閱者函數不會執行,直到subscribe()函數被調用,該函數傳回一個subscription對象。裡面有一個unsubscribe()函數,消費者可以随時拒絕消息的接收。也就是說,在Observable調用subscribe函數之前,什麼也不會發生,就像下面這段代碼,控制台什麼輸出内容都沒有。

訂閱observable對象,如下代碼所示。

異步發送消息

以上代碼中生産者發送消息的方式都是同步。如下代碼所示,異步發送消息。

  1. onClick3() {
  2. let time = 0;
  3. const intervalId = setInterval(() => {
  4. console.log(`wait ${++time}s`);
  5. }, 900)
  6. setTimeout(() => {
  7. observer.next(4);
  8. clearInterval(intervalId)
  9. }, 2000);
  10. // observer.complete(); // 注意不能立即調用complete函數,不然會終止消息傳輸
  11. setTimeout(() => observer.complete(), 3000)
  12. observable.subscribe(observer);
【轉】Angular—RxJS之Observable
unsubscribe函數

調用subscribe函數,傳回一個subscription對象,裡面有一個unsubscribe()函數,用于取消訂閱。

  1. onClick4() {
  2. const obs = Observable.create(observer => {
  3. setTimeout(() => observer.next(2), 2000); // 等待兩秒才發送下一個值
  4. let suber = obs.subscribe({
  5. next: x => console.log("接收到:", x)
  6. setTimeout(() => suber.unsubscribe(), 1000); // 在一秒後取消訂閱

運作代碼,最終隻列印了數字1,因為數字2在兩秒後才會發送,而消費者在1秒後就中斷了消息傳輸。

自定義subscribe函數

Observable的subscribe函數傳回一個subscription對象,該對象有一個unsubscribe函數,用于取消訂閱。

如下代碼所示,使用函數模拟訂閱。模拟了Observable的subscribe函數的内部實作(僅僅是模拟而已)。

  1. onClick5() {
  2. function subscribe(observer) {
  3. var intervalID = setInterval(() => {
  4. observer.next('launch.....');
  5. }, 1000);
  6. return {
  7. unsubscribe: () => clearInterval(intervalID)
  8. var subscription = subscribe({ next: (x) => console.log(x) });
  9. setTimeout(() => subscription.unsubscribe(), 5000);
RxJS的map操作符

map操作符可以接收可觀察者對象發送的值,将其轉換成另外的形式,并以一個新的可觀察對象發送這些新值。

如下代碼所示,将原來可觀察對象發送的值全部轉換為hello world,并訂閱map操作符傳回的新的可觀察對象。

  1. import { map } from 'rxjs/operators';
  2. const observable = of(1, 2, 3);
  3. const opt = map(num => 'hello');
  4. const newObservable = opt(observable);
  5. newObservable.subscribe(data => console.log(data));
  6. // 控制台分别輸出:hello hello hello

可以發現map操作符将123轉換成了三個hello,并輸出到控制台上,這種方式代碼結構并不是很清晰,并不推薦這種寫法。

pipe管道、tap操作符
  1. const newObservable = observable.pipe(
  2. tap(num => console.log(num)),
  3. map(num => 'hello world')
  4. );

繼續閱讀