天天看點

大衆點評Cat--Server子產品架構分析

cat server基于netty,是典型的reactor模型。

大衆點評Cat--Server子產品架構分析

上圖是網上找的reactor模型示例圖。netty的boss和workder分别映射圖中的mainreactor和subreactor。由boss accept nio channel,之後交由worker read, decode以及handle等。netty的實作和圖中略微有點不一緻,在netty中decode和handle是同步的。

需要說明的是netty handler也可以異步處理,netty支援線程池分發handler thread。

還有另一種方案,由應用程式自身實作延遲隊列做異步處理,handler隻需将消息(事件)放入隊列即可,cat采用的就是這種方案。cat是通過decoder解碼消息後調用handler将消息插入延遲隊列,并沒有向netty注冊handler再由netty在decode完畢後調用相應handler。

cat的傳輸資料對象為messagetree,messagecodec對傳輸消息編碼和解碼,messagehanlder将消息放入隊列。下圖是cat server的靜态結構。

大衆點評Cat--Server子產品架構分析

cathomemodule就是cat server,它包含兩個邏輯子產品,一個是reactor,另一個是延時隊列(period),分别對應上圖的左右半邊。

messageconsumer和tcpsocketreceiver均被cathomemodule依賴,其實是在receiver初始化工程中也相應初始化了這兩個重要元件。同時messageconsumer也是messagehandler聚合屬性,而handler則是receiver的一個内部屬性。結構上看起來有點混亂,但其實module是啟動了receiver的初始化,然後receiver在初始化過程中依賴了handler,而handler又依賴了consumer。而module和consumer之間的依賴關系是一種很弱的關系,隻是為了注冊虛拟機的shutdownhook(消息送出章節會做詳細說明)。

messagetree是經由網絡傳遞的消息封包對象,由messagecodec進行解碼和編碼。在服務端被解碼生成後作為方法參數被messageconsumer消費,最終放入messagequeue等待messageanalyzer處理。

messagequeue被聚合在periodtask内,後者是個daemon線程,不斷輪詢messagequeue,當有隊列裡有消息時就調用messageanalyzer處理,每個task隻對應一個analyzer,analyzer就是隊列的消費者。

一個period代表一個周期,每個周期對應一個持續時間(duration),預設是一小時,且周期是整點時間段,例如1:00-2:00,2:00-3:00,而不是1:01-2:01。每個周期對應多個task,每條消息相應的也會被拷貝成多分分發給每個task。

周期由periodmanager參考周期政策(periodstrategy)的結果生成或結束。

介紹完cat server的實體概念後,再從動态層面看下它的初始化過程以及消息的就緒,消費和送出過程。

在靜态結構視圖裡提到了cathomemodule依賴兩個實體,分别為messagecosumer和tcpsocketreceiver,它們分别承擔了reaactor以及延遲隊列的功能,由cathomemodule的initialize和setup階段被初始化。

首先看下延遲隊列的初始化過程。

大衆點評Cat--Server子產品架構分析

上節提到政策結果會開始一個新的周期或者結束一個老的周期。上圖的步驟5,政策會基于持續時間,提前開始時間(假設為a)以及延遲結束時間(假設為b)生成一個結果(a和b在cat中均預設為3分鐘)。政策結果如果為正則start新周期,為負則end老周期,為0則不做任何動作。cat預設超過duration*2+b的周期會被清理,相應的新周期也會提前a開始。

再看周期啟動周期任務的過程(8-12),周期先回從應用上下文中擷取所有的messageanalyzer(類似spring的getbeansoftype),再循環每個analyzer為每個analyzer生成一個或多個周期任務(視analyzer#getanalyzercount值而定)。

再看下reactor子產品的初始化過程。

大衆點評Cat--Server子產品架構分析

tcpsocketreciver在server啟動過程中被從上下文中lookup出來,并且執行初始化過程,在初始化的過程中通過依賴注入将messageconsumer注入了messagehandler中。相應的也将messagehandler和messagecodec注入給自己作為聚合屬性。

大衆點評Cat--Server子產品架構分析

tcpsocketreciver是個netty server,它監聽socket請求,由關聯的messagedecoder解析生成messagetree,再交由messagehandler處理。messagehandler将messagetree交由messageconsumer(延遲隊列)消費,consumer基于messagetree的時間戳找到相應的period(步驟8),将其放入相應的periodtask中。cat延時隊列不支援主題,是以每條消息會預設被所有task消費。

較長的描述下步驟8和步驟10:

1. periodmanager維護了一個m_periods的list,find的過程就是輪詢該清單找出duration包含messagetree#timestamp的唯一周期。

2. period裡會儲存m_tasks映射,結構是(analyzer類型全限定名,list)。在上節提過analyzer會對應一到多個task,對這種重複task select過程會通過messagetree#domain進行hash取模隻選出1個的。之是以要支援analyzer和task一對多的關系模型應該是考慮到有些analyzer處理會比較耗時,對這種就需要設定多個task,保證處理效率。

大衆點評Cat--Server子產品架構分析

消費過程相對簡單,周期任務通過守護線程不斷使用messageanalyzer對messagequeue進行分析,如果messagequeue#poll資料不為空則對poll出的messagetree進行業務處理。每種messageanalyzer針對具體應用場景做相應處理,使用者也可以自定義analyzer,隻需要被容器感覺就行。

消息被消費後并不會立即持久化,而是放在記憶體裡(analyzer的map屬性),結構是(duration, (domain, t)),t是個業務次元的報表對象,例如eventreport,heartbeatreport等。在周期結束或者jvm shutdown時會觸發消息持久化。

大衆點評Cat--Server子產品架構分析

1. 上圖是正常周期結束的處理過程。之前介紹過periodmanager會預設停掉duration*2+extratime時間前的周期,被停的周期會循環周期内的所有task做finish,相應的task調用其聚合屬性messageanalyzer的docheckpoint做消息持久化。

consumer#docheckpoint會通過periodmanager#findperiod查找目前時間點對應的周期,并對該周期進行finish,對應上圖步驟9及以後。

3. 需要強調的一點是,messageanalyzer對messagetree并不是單純的做儲存,而是基于業務做名額度量。可以把messagetree想象成度量名額,而messageanalyzer則是對名額作分析出報表,docheckpoint持久化的一般都是報表資料,報表裡包含了部分甚至全部messagetree資訊。