最近花了不少時間對消息總線進行了重構。重構的重點是在消息總線中加入了Guava的EventBus,并應用于以下兩個場景:
(1)改進廣播通知
(2)業務邏輯串聯,用事件驅動替代責任鍊模式
廣播通知是消息總線提供的功能之一。在重構之前,用戶端接收廣播通知是通過消息總線用戶端SDK的一個API來實作的:
但之前的廣播通知設計并不合理。它受限于之前的基于RabbitMQ的樹形路由拓撲模型:

這個拓撲結構中有些隻發送不接受的“虛拟隊列”并不是真實存在的隊列。這些消息生産者無法接收消息,這是非常大的一個缺陷。我一直在想辦法重新設計它,之前的關注點都集中在RabbitMQ上,想在MQ上找到一種解決方案,但這很難,除非擯棄“虛拟隊列”的設計。于是,我将關注點轉移到消息總線中另一個可以提供pub/sub的元件上(後稱之為pubsuber),該元件目前可以是redis也可以是zookeeper。因為每個client(更準确得說是每個建立client的pool)都會以長連接配接的方式挂在pubsuber上。是以,它本身就是一個很不錯的廣播管道,并且因為它脫離RabbitMQ單獨實作,跟虛拟隊列的設計不相沖突。
上面的思路沒有問題,但語義與實作上并不對等。通知的收發從語義上來說應該是Client
API級别的。而PubSuber接收到的廣播事件卻是Pool級别的,并不依賴client(Pool建立PubSuber以及Client)。我們不應該在Pool層面上接收廣播事件。是以這裡存在一個事件的截獲與二次轉發的過程。這是我們針對EventBus的第一個應用場景:用它轉發PubSuber接收到的廣播通知給client。
PubSuber接收到廣播消息之後通過EventBus 作二次轉發:
事件釋出完了之後,EventBus會将其分發到該事件的訂閱者處理,這裡需要注意的是建立的EventBus是一個異步EventBus的執行個體,它在一個獨立的線程上執行事件處理器方法。而所有的事件處理器都需要通過Client進行注冊:
以上這一步,就将消息通知跟Client關聯起來。而且對多個client注冊不同的事件處理器,還可以起到多點傳播的作用(原來在Pool級别是一個事件,現在在Client級别,多個Client可以應對若幹個處理器)。
EventBus通過注解來解析事件處理器與事件之間的關聯關系,更多的實作細節,請參考之前的文章。下面就是訂閱廣播通知的方式:
僅僅需要一個注解即可。當然最後别忘記移除注冊,如果你不再希望接收通知的話,整個過程如下:
這樣,原先的拓撲結構就不再包含廣播通知的實作了:
但基于同步事件驅動的方式似乎能起到跟責任鍊模式一樣的效果。它通過事件分發來驅動業務邏輯調用。将chain的每一個調用都看做是一個事件處理方法,一個單向通信邏輯(比如produce)對應一個事件處理器(produceEventProcessor)。因為此處的EventBus是同步的(事件處理邏輯在調用線程上執行,執行順序跟事件發生的順序相同),是以隻要編排好事件順序,一一觸發事件,事件處理器也就會一一按照事件觸發的順序執行。
我們以消息生産者來看一下通過EventBus改造後的業務邏輯是什麼樣子。
首先我們定義一個生産消息的事件處理器:
為了使得邏輯關系緊湊,我們将事件以内部類的方式定義在生産消息的事件處理器内部:
定義每個事件的事件處理方法:
在client被調用以生産消息時,首先建立該事件處理器的執行個體,然後向EventBus注冊事件處理器:
隻有注冊了該執行個體,在釋出事件時,才會觸發該執行個體的事件處理方法。注冊完成該執行個體之後,需要初始化事件對象,這裡事件之間以及事件處理器之間沒有必然聯系,我們以一個消息上下文對象的引用來讓它們以共享“記憶體”的方式進行資料交換:
準備工作就緒,現在開始釋出事件。這裡事件的釋出順序跟執行順序是一緻的,是以我們需要根據業務邏輯來編排事件,以形成原先的串聯調用的效果:
這就是重構的整個過程。我們發現這裡不再存在鍊式(遞歸)調用了,各個事件處理器方法之間也沒有耦合性,它們通過MessageContext來共享上下文。如果我們要增加新的業務邏輯,如何擴充?四步走:
(1)定義一個新事件對象
(2)定義一個新的事件處理器方法
(3)執行個體化該事件對象
(4)根據需要插入原先的編排過的事件中去并釋出該事件
跟原先的事件沒有任何關系。
原文釋出時間為:2015-06-30
本文作者:vinoYang