天天看點

angular2 rxjs 實作強類型自定義事件

    事件(event)與rxjs中的可觀察對象(Observable+Observer)本質相同,在github上找到一個庫typed-rx-emitter封裝好了Emitter,讀index.ts代碼可以看出,Emitter通過Messages實作自定義事件的強類型(事件名由Messages的Key限制,參數的強類型由Messages[Key]來給定),為Messages的每種事件(對應一個Key)分别維護了Observable集合和Observer集合,并提供on和emit兩個函數來分别用于訂閱和觸發事件,這樣事件的消費者和生産者就清閑了,隻需要去調用on, emit。

    核心代碼(搬運自index.ts)

import { Observer } from 'rxjs'
import { Observable } from 'rxjs/internal/Observable'

export type ALL = '__ALL__'
const ALL: ALL = '__ALL__'

interface State<Messages extends object> {
  callChain: Set<keyof Messages | ALL>
  observables: Map<keyof Messages | ALL, Observable<any>[]>
  observers: Map<keyof Messages | ALL, Observer<any>[]>
  options: Options<Messages>
}

export type Options<Messages> = {
  onCycle(chain: (keyof Messages | ALL)[]): void
  isDevMode: boolean
}

export class Emitter<Messages extends object> {

  private emitterState: State<Messages>

  constructor(options?: Partial<Options<Messages>>) {

    let DEFAULT_OPTIONS: Options<Messages> = {
      isDevMode: false,
      onCycle(chain) {
        console.error(
          '[typed-rx-emitter] Error: Cyclical dependency detected. '
          + 'This may cause a stack overflow unless you fix it. '
          + chain.join(' -> ')
        )
      }
    }

    this.emitterState = {
      callChain: new Set,
      observables: new Map,
      observers: new Map,
      options: {...DEFAULT_OPTIONS, ...options}
    }
  }

  /**
   * Emit an event (silently fails if no listeners are hooked up yet)
   */
  emit<K extends keyof Messages>(key: K, value: Messages[K]): this {
    let { isDevMode, onCycle } = this.emitterState.options
    if (isDevMode) {
      if (this.emitterState.callChain.has(key)) {
        onCycle(Array.from(this.emitterState.callChain).concat(key))
        return this
      } else {
        this.emitterState.callChain.add(key)
      }
    }
    if (this.hasChannel(key)) {
      this.emitOnChannel(key, value)
    }
    if (this.hasChannel(ALL)) {
      this.emitOnChannel(ALL, value)
    }
    if (isDevMode) this.emitterState.callChain.clear()
    return this
  }

  /**
   * Subscribe to an event
   */
  on<K extends keyof Messages>(key: K): Observable<Messages[K]> {
    return this.createChannel(key)
  }

  /**
   * Subscribe to all events
   */
  all(): Observable<Messages[keyof Messages]> {
    return this.createChannel(ALL)
  }

  / privates /

  private createChannel<K extends keyof Messages>(key: K | ALL) {
    if (!this.emitterState.observers.has(key)) {
      this.emitterState.observers.set(key, [])
    }
    if (!this.emitterState.observables.has(key)) {
      this.emitterState.observables.set(key, [])
    }
    const observable: Observable<Messages[K]> = Observable
      .create((_: Observer<Messages[K]>) => {
        this.emitterState.observers.get(key)!.push(_)
        return () => this.deleteChannel(key, observable)
      })
    this.emitterState.observables.get(key)!.push(observable)
    return observable
  }

  private deleteChannel<K extends keyof Messages>(
    key: K | ALL,
    observable: Observable<Messages[K]>
  ) {
    if (!this.emitterState.observables.has(key)) {
      return
    }
    const array = this.emitterState.observables.get(key)!
    const index = array.indexOf(observable)
    if (index < 0) {
      return
    }
    array.splice(index, 1)
    if (!array.length) {
      this.emitterState.observables.delete(key)
      this.emitterState.observers.delete(key)
    }
  }

  private emitOnChannel<K extends keyof Messages>(
    key: K | ALL,
    value: Messages[K]
  ) {
    this.emitterState.observers.get(key)!.forEach(_ => _.next(value))
  }

  private hasChannel<K extends keyof Messages>(key: K | ALL): boolean {
    return this.emitterState.observables.has(key)
  }
}
           

    Event$封裝 

    把Emitter重新封裝到類的static方法後,就可以實作簡單的調用了,封裝類命名為 Event$ 以差別于dom的 Event,代碼中的Messages根據需要可以不斷擴充,Event$不要動了。

import { Emitter } from './emitter'
import { Observable } from 'rxjs';

type Messages = {
    ADD: { sender: any, data:string }
}

export class Event$ {
    private static emitter = new Emitter<Messages>();
    /** subscribe event */
    static on<K extends keyof Messages>(key: K): Observable<Messages[K]> {
        return Event$.emitter.on(key);
    }

    /** fire events */
    static fire<K extends keyof Messages>(key: K, value: Messages[K]): void {
        Event$.emitter.emit(key, value);
    }
}
           

    調用  

//生産者調用.
Event$.fire('ADD',{sender:this, data:'hello world'});

//消費者調用.
Event$
  .on('ADD')
  .pipe(
    takeUntil(this.ngDestroyed$)//見另一篇部落格:"angular2 rxjs 取消訂閱的最佳實踐".
  )
  .subscribe((_,data)=>console.log(data));
           

    TODO 

    由于Event$成了所有自定義事件的中介者,可能會帶來性能瓶頸。

    typed-rx-emitter改進?

    由于Subject兼具IObservable和IObserver,可以簡化index.ts的實作如下。

import { Subject } from 'rxjs'
import { Observable } from 'rxjs/internal/Observable'

export type ALL = '__ALL__'
const ALL: ALL = '__ALL__'

interface State<Messages extends object> {
  callChain: Set<keyof Messages | ALL>
  subjects:Map<keyof Messages | ALL, Subject<any>>
  options: Options<Messages>
}

export type Options<Messages> = {
  onCycle(chain: (keyof Messages | ALL)[]): void
  isDevMode: boolean
}

export class Emitter<Messages extends object> {

  private emitterState: State<Messages>

  constructor(options?: Partial<Options<Messages>>) {

    let DEFAULT_OPTIONS: Options<Messages> = {
      isDevMode: false,
      onCycle(chain) {
        console.error(
          '[typed-rx-emitter] Error: Cyclical dependency detected. '
          + 'This may cause a stack overflow unless you fix it. '
          + chain.join(' -> ')
        )
      }
    }

    this.emitterState = {
      callChain: new Set,
      subjects: new Map,
      options: {...DEFAULT_OPTIONS, ...options}
    }
  }

  /**
   * Emit an event (silently fails if no listeners are hooked up yet)
   */
  emit<K extends keyof Messages>(key: K, value: Messages[K]): this {
    let { isDevMode, onCycle } = this.emitterState.options
    if (isDevMode) {
      if (this.emitterState.callChain.has(key)) {
        onCycle(Array.from(this.emitterState.callChain).concat(key))
        return this
      } else {
        this.emitterState.callChain.add(key)
      }
    }
    if (this.hasSubject(key)) {
      this.emitSubject(key, value)
    }
    if (this.hasSubject(ALL)) {
      this.emitSubject(ALL, value)
    }
    if (isDevMode) this.emitterState.callChain.clear()
    return this
  }

  /**
   * Subscribe to an event
   */
  on<K extends keyof Messages>(key: K): Observable<Messages[K]> {
    return this.getSubject(key)
  }

  /**
   * Subscribe to all events
   */
  all(): Observable<Messages[keyof Messages]> {
    return this.getSubject(ALL)
  }

  / privates /

  private getSubject<K extends keyof Messages>(key: K | ALL) {
    if(!this.emitterState.subjects.has(key)){
      this.emitterState.subjects.set(key, new Subject());
    }
    return this.emitterState.subjects.get(key);
  }

  private emitSubject<K extends keyof Messages>(
    key: K | ALL,
    value: Messages[K]
  ) {
    this.emitterState.subjects.get(key)!.next(value);
  }

  private hasSubject<K extends keyof Messages>(key: K | ALL): boolean {
    return this.emitterState.subjects.has(key);
  }
}
           

繼續閱讀