天天看點

觀察者模式和釋出訂閱模式有什麼不同?

作者:大資料與人工智能分享
觀察者模式和釋出訂閱模式有什麼不同?

基于 go 語言和大家探讨設計模式中的觀察者模式. 觀察者模式适用于多對一的訂閱/釋出場景.

  • ”多“:指的是有多名觀察者
  • ”一“:指的是有一個被觀察事物
  • ”訂閱“:指的是觀察者時刻關注着事物的動态
  • ”釋出“:指的是事物狀态發生變化時是透明公開的,能夠正常進入到觀察者的視線

在上述場景中,我們了解到核心對象有兩類,一類是“觀察者”,一類是“被觀察的事物”,且兩者間在數量上存在多對一的映射關系.

在具體作程式設計實作時,上述場景的實作思路可以是百花齊放的,而觀察者模式隻是為我們提供了一種相對規範的設計實作思路,其遵循的核心宗旨是實作“觀察者”與“被觀察對象”之間的解耦,并将其設計為通用的子產品,便于後續的擴充和複用.

學習設計模式時,我們腦海中需要中需要明白,教條是相對刻闆的,而場景和問題則是靈活多變的,在工程實踐中,我們避免生搬硬套,要做到因地制宜,随機應變.

2 代碼實踐

2.1 核心角色

在觀察者模式中,核心的角色包含三類:

  • Observer:觀察者. 指的是關注事物動态的角色
  • Event:事物的變更事件. 其中 Topic 辨別了事物的身份以及變更的類型,Val 是變更詳情
  • EventBus:事件總線. 位于觀察者與事物之間承上啟下的代理層. 負責維護管理觀察者,并且在事物發生變更時,将情況同步給每個觀察者.
觀察者模式和釋出訂閱模式有什麼不同?

觀察者模式的核心就在于建立了 EventBus 的角色. 由于 EventBus 子產品的誕生,實作了觀察者與具體被觀察事物之間的解耦:

  • 針對于觀察者而言,需要向 EventBus 完成注冊操作,注冊時需要聲明自己關心的變更事件類型(調用 EventBus 的 Subscribe 方法),不再需要直接和事物打交道
  • 針對于事物而言,在其發生變更時,隻需要将變更情況向 EventBus 統一彙報即可(調用 EventBus 的 Publish 方法),不再需要和每個觀察者直接互動
  • 對于 EventBus,需要提前維護好每個觀察者和被關注事物之間的映射關系,保證在變更事件到達時,能找到所有的觀察者逐一進行通知(調用 Observer 的 OnChange 方法)

三類角色組織生成的 UML 類圖如下所示:

觀察者模式和釋出訂閱模式有什麼不同?

對應的代碼實作示例展示如下:

type Event struct {
    Topic string
    Val   interface{}
}


type Observer interface {
    OnChange(ctx context.Context, e *Event) error
}


type EventBus interface {
    Subscribe(topic string, o Observer)
    Unsubscribe(topic string, o Observer)
    Publish(ctx context.Context, e *Event)
}           

觀察者 Observer 需要實作 OnChange 方法,用于向 EventBus 暴露出通知自己的“聯系方式”,并且在方法内部實作好當關注對象發生變更時,自己需要采取的處理邏輯.

下面給出一個簡單的觀察者實作示例 BaseObserver:

type BaseObserver struct {
    name string
}


func NewBaseObserver(name string) *BaseObserver {
    return &BaseObserver{
        name: name,
    }
}


func (b *BaseObserver) OnChange(ctx context.Context, e *Event) error {
    fmt.Printf("observer: %s, event key: %s, event val: %v", b.name, e.Topic, e.Val)
    // ...
    return nil
}           

事件總線 EventBus 需要實作 Subscribe 和 Unsubscribe 方法暴露給觀察者,用于新增或删除訂閱關系,其實作示例如下:

type BaseEventBus struct {
    mux       sync.RWMutex
    observers map[string]map[Observer]struct{}
}


func NewBaseEventBus() BaseEventBus {
    return BaseEventBus{
        observers: make(map[string]map[Observer]struct{}),
    }
}


func (b *BaseEventBus) Subscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    _, ok := b.observers[topic]
    if !ok {
        b.observers[topic] = make(map[Observer]struct{})
    }
    b.observers[topic][o] = struct{}{}
}


func (b *BaseEventBus) Unsubscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    delete(b.observers[topic], o)
}           

針對 EventBus 将事物變更事件同步給每個觀察者的 Publish 流程,可以分為同步模式和異步模式,分别在 2.2 小節和 2.3 小節中展開介紹.

2.2 同步模式

在同步模式的實作中,通過 SyncEventBus 實作了 EventBus 的同步通知版本,對應類圖如下:

觀察者模式和釋出訂閱模式有什麼不同?
觀察者模式和釋出訂閱模式有什麼不同?

在同步模式下,EventBus 在接受到變更事件 Event 時,會根據事件類型 Topic 比對到對應的觀察者清單 observers,然後采用串行周遊的方式分别調用 Observer.OnChange 方法對每個觀察者進行通知,并對處理流程中遇到的錯誤進行聚合,放到 handleErr 方法中進行統一的後處理.

type SyncEventBus struct {
    BaseEventBus
}


func NewSyncEventBus() *SyncEventBus {
    return &SyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
}


func (s *SyncEventBus) Publish(ctx context.Context, e *Event) {
    s.mux.RLock()
    defer s.mux.RUnlock()
    subscribers := s.observers[e.Topic]


    errs := make(map[Observer]error)
    for subscriber := range subscribers {
        if err := subscriber.OnChange(ctx, e); err != nil {
            errs[subscriber] = err
        }
    }


    s.handleErr(ctx, errs)
}           

此處對 handleErr 方法的實作邏輯進行建立了簡化,在真實的實踐場景中,可以針對遇到的錯誤建立更完善的後處理流程,如采取重試或告知之類的操作.

func (s *SyncEventBus) handleErr(ctx context.Context, errs map[Observer]error) {
    for o, err := range errs {
        // 處理 publish 失敗的 observer
        fmt.Printf("observer: %v, err: %v", o, err)
    }
}           

2.3 異步模式

在異步模式的實作中,通過 AsyncEventBus 實作了 EventBus 的異步通知版本,對應類圖如下:

觀察者模式和釋出訂閱模式有什麼不同?
觀察者模式和釋出訂閱模式有什麼不同?

在異步模式下,會在 EventBus 啟動之初,異步啟動一個守護協程,負責對接收到的錯誤進行後處理.

在事物發生變更時,EventBus 的 Publish 方法會被調用,此時 EventBus 會并發調用 Observer.OnChange 方法對每個觀察者進行通知,在這個過程中遇到的錯誤會通過 channel 統一彙總到 handleErr 的守護協程中進行處理.

type observerWithErr struct {
    o   Observer
    err error
}




type AsyncEventBus struct {
    BaseEventBus
    errC chan *observerWithErr
    ctx  context.Context
    stop context.CancelFunc
}




func NewAsyncEventBus() *AsyncEventBus {
    aBus := AsyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
    aBus.ctx, aBus.stop = context.WithCancel(context.Background())
    // 處理處理錯誤的異步守護協程
    go aBus.handleErr()
    return &aBus
}


func (a *AsyncEventBus) Stop() {
    a.stop()
}


func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) {
    a.mux.RLock()
    defer a.mux.RUnlock()
    subscribers := a.observers[e.Topic]
    for subscriber := range subscribers {
        // shadow
        subscriber := subscriber
        go func() {
            if err := subscriber.OnChange(ctx, e); err != nil {
                select {
                case <-a.ctx.Done():
                case a.errC <- &observerWithErr{
                    o:   subscriber,
                    err: err,
                }:
                }
            }
        }()
    }
}


func (a *AsyncEventBus) handleErr() {
    for {
        select {
        case <-a.ctx.Done():
            return
        case resp := <-a.errC:
            // 處理 publish 失敗的 observer
            fmt.Printf("observer: %v, err: %v", resp.o, resp.err)
        }
    }
}           

2.4 使用示例

下面分别給出同步和異步模式下觀察者模式的使用示例:

func Test_syncEventBus(t *testing.T) {
    observerA := NewBaseObserver("a")
    observerB := NewBaseObserver("b")
    observerC := NewBaseObserver("c")
    observerD := NewBaseObserver("d")


    sbus := NewSyncEventBus()
    topic := "order_finish"
    sbus.Subscribe(topic, observerA)
    sbus.Subscribe(topic, observerB)
    sbus.Subscribe(topic, observerC)
    sbus.Subscribe(topic, observerD)


    sbus.Publish(context.Background(), &Event{
        Topic: topic,
        Val:   "order_id: xxx",
    })
}           

異步測試代碼:

func Test_asyncEventBus(t *testing.T) {
    observerA := NewBaseObserver("a")
    observerB := NewBaseObserver("b")
    observerC := NewBaseObserver("c")
    observerD := NewBaseObserver("d")


    abus := NewAsyncEventBus()
    defer abus.Stop()


    topic := "order_finish"
    abus.Subscribe(topic, observerA)
    abus.Subscribe(topic, observerB)
    abus.Subscribe(topic, observerC)
    abus.Subscribe(topic, observerD)


    abus.Publish(context.Background(), &Event{
        Topic: topic,
        Val:   "order_id: xxx",
    })


    <-time.After(time.Second)
}           

3 工程案例

本章和大家一起梳理一下在工程實踐中對觀察者模式的使用場景.

3.1 MQ 釋出/訂閱

觀察者模式和釋出訂閱模式有什麼不同?

大家耳熟能詳的消息隊列就是對觀察者模式的一種實踐,大家可以采用類比的方式在 MQ (Message Queue)架構中代入觀察者模式中的每一類角色:

  • EventBus:對應的是消息隊列元件,為整個通信架構提供了分布式解耦、流量削峰等能力
  • Event:對應的是消息隊列中的一條消息,有明确的主題 topic,由生産者 producer 提供
  • Observer:對應的是消費者 consumer,對指定事物的動态(topic)進行訂閱,并在消費到對應的變更事件後執行對應的處理邏輯

3.2 ETCD 監聽回調

觀察者模式和釋出訂閱模式有什麼不同?

另一個踐行了觀察者模式的工程案例是基于 golang 編寫的分布式 kv 存儲元件 etcd.

etcd 提供了作用于指定資料範圍的監聽回調功能,能在對應資料狀态發生變更時,将變更通知傳達到每個訂閱者的手中,在這個過程中:

  • EventBus:對應的是 etcd 服務端的 watchableStore 監聽器存儲子產品,該子產品會負責存儲使用者建立的一系列監聽器 watcher,并建立由監聽資料 key 到監聽器集合 watcherGroup 之間的映射關系. 當任意存儲資料發生變化時,etcd 的資料存儲子產品會在一個統一的切面中調用通知方法,将這一資訊傳達到 watchableStore 子產品,watchableStore 則會将變更資料與監聽資料 key 之間進行 join,最終得到一個需要執行回調操作的 watchers 組合,順沿 watcher 中的路徑,向訂閱者發送通知消息
  • Event:對應的是一條 etcd 狀态機的資料變更事件,由 etcd 使用方在執行一條寫資料操作時觸發,在寫操作真正生效後,變更事件會被傳送到 watchableStore 子產品執行回調處理
  • Observer:使用 etcd watch 功能對指定範圍資料建立監聽回調機制的使用方,在 etcd 服務端 watchableStore 子產品會建立監聽器實體 watcher 作為自身的代理,當變更事件真的發生後,watchableStore 會以 watcher 作為起點,沿着傳回路徑一路将變更事件發送到使用方手中.

想了解有關 etcd watch 機制的更多内容,可以閱讀我之前發表的文章:

etcd watch 機制源碼解析——服務端篇 以及 etcd watch 機制源碼解析——用戶端篇.

4 總結

本文和大家一起探讨了設計模式中的觀察者模式:

  • 觀察者模式适用于多對一的訂閱/釋出場景,其實作思路是在觀察者與被觀察對象之間添加收口了釋出訂閱功能的中間層,核心宗旨是實作“觀察者”與“被觀察對象”之間的解耦
  • 通過 UML 類圖結合具體代碼示例,對觀察者模式進行實踐. 根據變更事件的通知模式,觀察者模式可以分為同步和異步兩種模型
  • 本文給出兩個踐行了觀察者模式的工程案例,一個是 Message Queue 的釋出訂閱模式,一個是 ETCD 服務端對 watch 功能的實作思路

繼續閱讀