Guava是Google開源的一個Java基礎類庫,它在Google内部被廣泛使用。Guava提供了很多功能子產品比如:集合、并發庫、緩存等,EventBus是其中的一個module,本篇結合EventBus源碼來談談它的設計與實作。
首先,我們先來預覽一下EventBus子產品的全部類圖:

類并不是多而且幾乎沒有太多繼承關系。
下面,我們來看一下各個類的職責:
EventBus:核心類,代表了一個事件總線。Publish事件也由它發起。
AsyncEventBus:在分發事件的時候,将其壓入一個全局隊列的異步分發模式。
Subscriber:對某個事件的處理器抽象,封裝了事件的訂閱者以及處理器,并負責事件處理(該類的類名及其語義有些不明确,後續會談到)。
SubscriberRegistry:訂閱系統資料庫,它用于存儲Subscriber跟Event的對應關系,以便于EventBus在publish一個事件時,可以找到它對應的Subscriber。
Dispatcher:事件分發器,它定義了事件的分發政策。
@Subscribe:用于辨別事件處理器的注解,當EventBus
publish一個事件後,相應的Subscriber将會得到通知并執行事件處理器。
@AllowConcurrentEvents:該注解跟@Subscribe一同使用,辨別該訂閱者的處理方法為線程安全的,該注解還用于辨別該方法将可能會被EventBus在多線程環境下執行。
DeadEvent:死信(沒有訂閱者關注的事件)對象。
SubscribeExceptionHandler:訂閱者抛出異常的處理器。
SubscribeExceptionContext:訂閱者抛出異常的上下文對象。
在對每個類進行分解之前,我們再來看一下各個類之間的關聯關系:
它有這麼幾個字段:
identifier:事件總線的辨別,這說明在一個應用裡是可以有多個EventBus的。如果不指明它的值,它将以“default”作為其預設名稱。
executor:它是Executor接口的執行個體,用于對訂閱者處理事件方法的執行。這裡需要注意的是,該字段的執行個體化是在EventBus内部構造器中,并不是從外部注入進來的,另外真正的執行訂閱者方法的時機也不由EventBus負責,而是由Subscriber負責,是以該字段會被公開給外部通路。
exceptionHandler:它是SubscribeExceptionHandler的執行個體,用于處理訂閱者在執行事件處理方法時抛出的異常。EventBus可以接收一個外部定義的異常處理器,也可以采用内部預設的日志記錄處理器。
subscribers:訂閱者系統資料庫,用于存儲所有的事件以及事件處理器、訂閱對象的對應關系。
dispatcher:事件分發器,用于分發事件給訂閱對象的事件處理器,該對象在EventBus構造方法内部初始化,預設的實作是PerThreadQueuedDispatcher,該分發器将事件存入隊列,并保證在同一個線程上發送的事件能夠按照他們釋出的順序被分發給所有的訂閱者。
EventBus提供了幾個核心方法:
register:注冊subscriber;
unregister:移除注冊過的subscriber;
post:釋出事件;
你可以将EventBus看做是一個代理,這些方法真正的實作者都是上面的這些對象。
一個支援異步釋出模式的EventBus,它覆寫了EventBus的預設構造方法,指定了一個異步的分發器:LegacyAsyncDispatcher,這個分發器基于一個全局的隊列來暫存未釋出的事件。
之前也提到Subscriber的名稱是比較容易混淆的。這個類的名稱看似表示一個訂閱者對象,但其實是用來封裝“一個訂閱者的一個事件處理器”對象。因為當一個訂閱者存在多個處理方法被标注為@Subscribe的時候,那麼每個處理方法都對應于一個獨立的Subscriber對象的執行個體。我個人覺得這個名稱與其具體的實作語義有些混淆。當然也許實作者認為:一個對象以及一個事件處理器就是一個Subscriber的話,那是沒有問題的。是以這裡為了了解友善,你可以将其看做是一個封裝了訂閱者對象以及一個訂閱者處理器方法的實體類。
Subscriber的通路級别是package的,它還承擔了執行事件處理的責任。通過一個create靜态工廠方法建立它:
它接收三個參數:
bus:EventBus的執行個體,通過它來擷取事件的執行器(executor)
listener:真實的訂閱者對象
method:訂閱對象的事件處理方法的Method執行個體
在實作中,它會先判斷該處理器方法上是否被标注有@AllowConcurrentEvents注解,如果有,則執行個體化Subscriber類的一個執行個體;如果沒有,則不允許eventbus在多線程的環境中調用處理器方法,是以這裡專門為此提供了一個同步的訂閱者對象:SynchronizedSubscriber來保證線程安全。
該類的兩個關鍵方法之一:
dispatchEvent:
它調用一個多線程執行器來執行事件處理器方法。
另一個方法:invokeSubscriberMethod以反射的方式調用事件處理器方法。
另外,該類對Object的equals方法進行了override并辨別為final。主要是為了避免同一個對象對某個事件進行重複訂閱,在SubscriberRegistry中會有相應的判等操作。當然這裡Subscriber也override并final了hashCode方法。這是最佳實踐,不必多談,如果不了解的可以去看看《Effective
Java》。
該類還有個内部類,就是我們上面談到的SynchronizedSubscriber,它繼承了Subscriber,與Subscriber唯一的不同就是在invokeSubscriberMethod的執行上做了同步。
針對單個EventBus的訂閱與事件的關系維護。在内部用來存儲訂閱者關系的對象是java并發包下的并發Map:ConcurrentMap,該map以Class對象為鍵,值的類型是CopyOnWriteArraySet<Subscriber>集合類型。
SubscriberRegistry直接依賴EventBus對象,是以在構造器中需要注入EventBus的執行個體。
SubscriberRegistry裡有兩個關鍵的執行個體方法:register/unregister。
接收訂閱者對象作為參數并建立Event跟Subscriber的關聯關系。
我們來看看它的實作:
它首先獲得一個Multimap執行個體(它是Google
Guava集合架構提供的一個多值Map類型,也就是說一個key可以對應多個value),該Multimap用于存儲事件類型對應的該訂閱者内所有關于該事件的處理器方法集合,其key為事件的Class類型。這裡在for循環的中通過asMap擷取其map視圖,即可将Multimap對應的多個值存儲到一個Collection中。
也就是說這裡for循環的每個entry,表示的是一個事件的Class執行個體對應的一組Subscriber的集合,即eventMethodsInListener。
然後根據該事件的Class對象從系統資料庫中擷取對應的存儲Subscriber執行個體的集合,如果不存在則建立該集合,然後将該訂閱者内所有的事件處理器方法都加入到系統資料庫中去。
unregister的實作跟register有些類似,先查找該訂閱者所有的事件類型與處理器的對應關系。然後,周遊所有的事件類型,移除針對目前訂閱者的所有Subscriber執行個體。
register/unregister方法都調用了findAllSubscribers方法,它有一些特别之處,這裡需要單獨拎出來提一下。
findAllSubscribers用于查找事件類型以及事件處理器的對應關系。查找注解需要涉及到反射,通過反射來擷取标注在方法上的注解。因為Guava針對EventBus的注冊采取的是“隐式契約”而非接口這種“顯式契約”。而類與接口是存在繼承關系的,所有很有可能某個訂閱者其父類(或者父類實作的某個接口)也訂閱了某個事件。是以這裡的查找需要順着繼承鍊向上查找父類的方法是否也被注解标注,代碼實作:
同樣涉及這個問題的,還有根據事件類型擷取Subscriber執行個體的方法:getSubscribers。
dispatcher用于分發事件給Subscriber。它内部實作了多個分發器用于提供在不同場景下不同的事件順序性。Dispatcher是一個抽象類,定義了一個核心抽象方法:
該方法用于将一個指定的事件分發給所有的訂閱者。
另外在Dispatcher提供了三個不同的分發器實作:
它比較常用,針對每個線程建構一個隊列用于暫存事件對象。保證所有的事件都按照他們publish的順序從單一的線程上發出。保證從單一線程上發出,沒什麼特别的地方,主要是在内部定義了一個隊列,将其放在ThreadLocal中,用以跟特定的線程關聯。
另一個異步分發器的實作:LegacyAsyncDispatcher,之前在介紹AsyncEventBus的時候提到,它就是用這種實作來分發事件。
它在内部通過一個ConcurrentLinkedQueue<EventWithSubscriber>的全局隊列來存儲事件。從關鍵方法:dispatch的實作來看,它跟PerThreadQueuedDispatcher的差別主要是兩個循環上的差異(這裡基于隊列的緩存事件的方式,肯定會存在兩個循環:循環取隊列裡的事件以及循環發送給Subscriber)。
PerThreadQueuedDispatcher:是兩層嵌套循環,外層是周遊隊列取事件,記憶體是周遊事件的訂閱處理器。
LegacyAsyncDispatcher:是一前一後兩個循環。前面一個是周遊事件訂閱處理器,并建構一個事件實體對象存入隊列。後一個循環是周遊該事件實體對象隊列,取出事件實體對象中的事件進行分發。
其實以上兩個基于中間隊列的分發實作都可以看做是異步模式,而ImmediateDispatcher則是同步模式:隻要有事件發生就會立即分發并被立即得到處理。ImmediateDispatcher從感官上看類似于線性并順序執行,而采用隊列的方式有多線程彙聚到一個公共隊列的由發散到聚合的模型。是以,ImmediateDispatcher的分發方式是一種深度優先的方式,而使用隊列是一種廣度優先的方式。
它是一個實體對象,封裝了沒有訂閱者的事件。DeadEvent由兩個屬性組成:
source:事件源(通常指釋出事件的EventBus對象)
event:事件對象
DeadEvent對象的産生:當通過某個EventBus的執行個體釋出一個事件的時候,沒有找到事件訂閱者并且它本身又不是一個DeadEvent的執行個體時,将由EventBus建構一個DeadEvent類的執行個體。
Guava的EventBus源碼還是比較簡單、清晰的。從源碼來看,它一番常用的Observer的設計方式,放棄采用統一的接口、統一的事件對象類型。轉而采用基于注解掃描的綁定方式。
其實無論是強制實作統一的接口,還是基于注解的實作方式都是在建構一種關聯關系(或者說滿足某種契約)。很明顯接口的方式是編譯層面上強制的顯式契約,而注解的方式則是運作時動态綁定的隐式契約關系。接口的方式是傳統的方式,編譯時确定觀察者關系,清晰明了,但通常要求有一緻的事件類型、方法簽名。而基于注解實作的機制,剛好相反,編譯時因為沒有接口的文法層面上的依賴關系,顯得不那麼清晰,至少靜态分析工具很難展示觀察者關系,但無需一緻的方法簽名、事件參數,至于多個訂閱者類之間的繼承關系,可以繼承接收事件的通知,可以看作既是其優點也是其缺點。
原文釋出時間為:2015-06-01
本文作者:vinoYang