天天看點

淺淡 RxJS WebSocket

引言

中背景儀表盤是一個非常複雜,特别是當需要全面屏運用時,資料的實時性需求非常高。WebSocket 不管在什麼環境中使用其實都是非常簡單,各現代浏覽器實作标準都很統一,而且

接口

也足夠簡單。

即便是在 Angular 也是如此,隻需要簡單幾行代碼就能使用 WebSocket。

```const ws = new WebSocket('wss://echo.websocket.org');

ws.onmessage = (e) => {

console.log('message', e);

}```

若需要向服務端發送消息,則:

```ws.send(`content`);```

在 Angular 裡絕大多數的人都會根據上述代碼進一步拓展,比如統一消息解析、錯誤處理、多路複用等,并最終将其封裝成一個服務類。

事實上,RxJS 也包裹了一個 WebSocket Subject,位于

rxjs/websocket

如何使用

假如将上面的示例使用 RxJS 來寫,則:

```import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

const ws = webSocket(‘wss://echo.websocket.org’);

ws.subscribe(res => {

console.log(‘message’, res);

});

ws.next(

content

);```

webSocket

是一個工廠函數,所生産出來的

WebSocketSubject

對象可被多次訂閱,若未訂閱或取消最後一個訂閱時都會導緻 WebSocket 連接配接中斷,當再一次訂閱時會重新自動連接配接。

WebSocketSubjectConfig

webSocket

除了接收字元串(WebSocket服務遠端位址)外,還允許指定更複雜的配置項。

預設情況下,消息是使用

JSON.parse

JSON.stringify

對消息格式序列化和反序列化操作,是以不管消息發送或接收都以 JSON 為準,可通過 serializer、deserializer 屬性來改變。

若需要關心 WebSocket 什麼時候開始或結束(

closeObserver

),則:

```const open$ = new Subject();

const ws = webSocket({

url: 'wss://echo.websocket.org',

openObserver: open$

// 訂閱打開事件

open$.subscribe(() => {});```

消息

WebSocketSubject

也是

Subject

的變體之一,是以訂閱它表示接收消息,反之則利用

next

complete

error

來維護消息的推送。

  • 使用

    next

    來發送消息
  • complete

    會嘗試檢測是否最後一個訂閱,若是将會關閉連接配接
  • error

    相當于原始

    close

    方法且必須提供

    { code: number, reason?: string}

    參數,注意

    code

    務必遵守 取值範圍

可被重放

調用

next

發送消息時若 WebSocket 連接配接中斷(例如:沒人訂閱時),消息會被緩存當下一次重新連接配接以後會按順序發送。這對于異步世界裡非常友善,我們隻需要確定 Angular 啟動前初始化好 WebSocket 不管什麼時候訂閱接收消息,都可以随時發送也無須等待。

事實上這一點是 RxJS WebSocket 預設情況下是通過

webSocket

所生産的

WebSocketSubject

其本質上是

ReplaySubject

的“重放”能力。當然你可以通過

webSocket

的第二個參數改變這種行為。

多路複用

一般來說我們不太可能隻會一個 Web Socket 服務完成所有的事,然而也不太可能針對每一個業務執行個體建立一個

webSocket

。往往我們會增加一層網關并将這些業務 WebSocket 進行彙總,對于前端始終隻需要一個連接配接,這就是多路複用存在的意義。

而核心是必須要讓後端知道,什麼時候發送什麼消息給什麼樣的服務。

首先必須先使用

multiplex

方法來建立

Observable

以便訂閱某一路消息,它有三個參數來幫助我們區分消息:

  • subMsg

    告知正在訂閱哪一路消息
  • unsubMsg

    告知取消訂閱哪一路消息
  • messageFilter

    過濾消息,使訂閱者隻接收哪一路消息

```const ws = webSocket('wss://echo.websocket.org');

const user$ = this.ws.multiplex(

() => ({ type: 'subscribe', tag: 'user' }),

() => ({ type: 'unsubscribe', tag: 'user' }),

message => message.type === 'user'

);

user$.subscribe(message => console.log(message));

const todo$ = this.ws.multiplex(

() => ({ type: ‘subscribe’, tag: ‘todo’ }),

() => ({ type: ‘unsubscribe’, tag: ‘todo’ }),

message => message.type === ‘todo’

todo$.subscribe(message => console.log(message));```

user$

流和

todo$

流他們共用一個 WebSocket 連接配接,這便是多路複用。

雖然訂閱是通過

multiplex

建立的,然後消息的推送依然還是需要使用

ws.next()

總結

這原本是對内部一個簡單教育訓練,然而我發現竟然極少人會讨論 RxJS 裡面 Web Socket 的實作。

其實一直有想着要給

ng-alain

内置 WebSocket,隻是就封裝角度來講完全沒有價值,因為已經足夠優雅。