天天看點

異步并行批處理架構設計的一些思考

  随着網際網路資訊技術日新月異的發展,一個海量資料爆炸的時代已經到來。如何有效地處理、分析這些海量的資料資源,成為各大技術廠商争在激烈的競争中脫穎而出的一個利器。可以說,如果不能很好的快速處理分析這些海量的資料資源,将很快被市場無情地所淘汰。當然,處理分析這些海量資料目前可以借鑒的方案有很多:首先,在分布式計算方面有Hadoop裡面的MapReduce并行計算架構,它主要針對的是離線的資料挖掘分析。此外還有針對實時線上流式資料處理方面的,同樣也是分布式的計算架構Storm,也能很好的滿足資料實時性分析、處理的要求。最後還有Spring Batch,這個完全面向批處理的架構,可以大規模的應用于企業級的海量資料處理。

  在這裡,我就不具體展開說明這些架構如何部署、以及如何開發使用的詳細教程說明。我想在此基礎上更進一步:我們能否借鑒這些開源架構背後的技術背景,為服務的企業或者公司,量身定制一套符合自身資料處理要求的批處理架構。

  首先我先描述一下,目前我所服務的公司所面臨的一個使用者資料存儲處理的一個現狀背景。目前移動公司一個省内在網使用者資料規模達到幾千萬的規模數量級,而且每個省已經根據地市區域對使用者資料進行劃分,我們把這批資料存儲在傳統的關系型資料庫上面(基于Oracle,地市是分區)。移動公司的計費結算系統會根據使用者手機話費的餘額情況,實時的通知業務處理系統,給手機使用者進行停機、複機的操作。業務處理系統收到計費結算系統的請求,會把要處理的使用者資料往具體的交換機網元上派發不同的交換機指令,這裡簡單的可以稱為Hlr停複機指令(下面開始本文都簡稱Hlr指令)。目前面臨的現狀是,在日常情況下,傳統的C++多程序的背景處理程式還能勉強的“準實時”地處理這些資料請求,但是,如果一旦到了每個月的月初幾天,要處理的資料量往往會暴增,而C++背景程式處理的效率并不高。這時問題來了,往往會有使用者投訴,自己繳費了,為什麼沒有複機?或者某些使用者明明已經欠費了,但是還沒有及時停機。這樣的結果會直接降低客戶對移動營運商支撐的滿意度,于此同時,移動營運商本身也可能流失這些客戶資源。

  自己認真評估了一下,造成上述問題的幾個瓶頸所在。

一個省所有的使用者資料都放在資料庫的一個實體表中,資料庫伺服器,滿打滿算達到頂級小型機配置,也可能無法滿足月初處理量激增的性能要求,可以說頻繁的在一台伺服器上讀寫IO開銷非常巨大,整個伺服器處理的性能低下。

處理這些資料的時候,會同步地往交換機實體裝置上發送Hlr指令,在交換機沒有處理成功這個請求指令的時候,隻能阻塞等待,進一步造成後續待處理資料的積壓。

  針對上述的問題,本人想到了幾個優化方案。

資料庫中的實體表,能不能根據使用者的歸屬地市進行表實體的拆分。即把一台或者幾台伺服器的壓力,進行水準拆分。一台資料庫伺服器就重點處理某一個或者幾個地市的資料請求?降低IO開銷。

由于交換機處理Hlr指令的時候,存在阻塞操作,我們能不能改成:通過異步傳回處理的方式,把處理任務隊列中的任務先下達通知給交換機,然後交換機通過異步回調機制,反向通知處理子產品,彙報任務的執行情況。這樣處理子產品就從主動的任務輪詢等待,變成等待交換機執行結果的異步通知,這樣它就可以專注地進行處理資料的派發,不會受到某幾個任務處理時長的限制,進而影響到後面整批次的資料處理。

資料庫的實體表由于進行水準拆解,能不能做到并行加載?這樣就會大大節約串行資料加載的處理時長。

并行加載出來的待處理資料最好能放到一個批處理架構裡面,批處理架構能很好地根據要處理資料的情況,進行配置參數調整,進而很好地滿足實時性的要求。比如月初期間,可以加大處理參數的值,提高處理效率。平常的時候,可以适當降低處理參數的取值,降低系統的CPU/IO開銷。

  基于以上幾點考慮,得出如下圖所示的設計方案的元件圖:

  

  下面就具體說明一下,其中關鍵子產品如何協同工作的。

異步并行查詢加載子產品BatchQueryLoader:支援傳入多個資料源對象,同時利用google-guava庫中對于Future接口的擴充ListenableFuture,來實作批量查詢資料的并行加載。Future接口主要是用來表示異步計算的結果,并且計算完成的時候,隻能用get()方法擷取結果,get方法裡面其中有一個方法是可以設定逾時時間的。在并行加載子產品裡面,批量并行地加載多個資料源裡面的實體表中的資料,并最終回報加載的結果集合。并行資料加載和串行資料加載所用的耗時可以簡單用下面的圖例來說明:串行加載的總耗時是每個資料源加載耗時的總和。而并行加載的總耗時,取決于最大加載的那個資料源耗時時長。(注:我們把每天要進行停複機處理的使用者資料通過采集程式,分地市分布采集到水準分庫的notify_users提醒使用者表)                 

并行異步批處理子產品BatchTaskReactor:内部是通過線程池機制來實作的,接受異步并行查詢加載子產品BatchQueryLoader得到的加載結果資料,放入線程池中進行任務的異步派發,它最終就是通過Hlr派單指令異步任務執行HlrBusinessEventTask子產品下發指令任務,然後自己不斷的從阻塞隊列中擷取,待執行的任務清單進行任務的分派。與此同時,他通過Future接口,異步得到HlrBusinessEventTask派發指令的執行回報結果。

批量處理線程池運作參數配置加載BatchTaskConfigurationLoader:加載線程池運作參數的配置,把結果通知并行異步批處理子產品BatchTaskReactor,配置檔案batchtask-configuration.xml的内容如下所示。

其中corePoolSize表示保留的線程池大小,workQueueSize表示的是阻塞隊列的大小,maxPoolSize表示的是線程池的最大大小,keepAliveTime指的是空閑線程結束的逾時時間。其中建立線程池方法ThreadPoolExecutor裡面有個參數是unit,它表示一個枚舉,即keepAliveTime的機關。說了半天,這幾個參數到底什麼關系呢?我舉一個例子說明一下,當出現需要處理的任務的時候,ThreadPoolExecutor會配置設定corePoolSize數量的線程池去處理,如果不夠的話,會把任務放入阻塞隊列,阻塞隊列的大小是workQueueSize,當然這個時候還可能不夠,怎麼辦。隻能叫來“臨時工線程”幫忙處理一下,這個時候“臨時工線程”的數量是maxPoolSize-corePoolSize,當然還會繼續不夠,這個時候ThreadPoolExecutor線程池會采取4種處理政策。

現在具體說一下是那些處理政策。首先是ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕将抛出運作時 RejectedExecutionException。然後是ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運作該任務的 execute 本身。此政策提供簡單的回報控制機制,能夠減緩新任務的送出速度。其次是,ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務将被删除。最後是ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位于工作隊列頭部的任務将被删除,然後重試執行程式(如果再次失敗,則重複此過程)。如果要處理的任務沒有那麼多了,ThreadPoolExecutor線程池會根據keepAliveTime設定的時間機關來回收多餘的“臨時工線程”。你可以把keepAliveTime了解成專門是為maxPoolSize-corePoolSize的“臨時工線程”專用的。

線程池參數的設定。正常情況下我們要如何設定線程池的參數呢?我們應該這樣設定:I、workQueueSize阻塞隊列的大小至少大于等于corePoolSize的大小。II、maxPoolSize線程池的大小至少大于等于corePoolSize的大小。III、corePoolSize是你期望處理的預設線程數,個人覺得線程池機制的話,至少大于1吧?不然的話,你這個線程池等于單線程處理任務了,這樣就失去了線程池設計存在的意義了。

JMX(Java Management Extensions)批處理任務監控子產品BatchTaskMonitor:實時地監控線程池BatchTaskReactor中任務的執行處理情況(具體就是任務成功/失敗情況)。

  介紹完畢了幾個核心子產品主要的功能,那下面就依次介紹一下主要子產品的詳細設計思路。

我們把每天要進行停複機處理的使用者資料通過采集程式,采集到notify_users表。首先定義的是,我們要處理采集的通知使用者資料對象的結構描述,它對應水準分庫的表notify_users的JavaBean對象。notify_users的表結構為了示範起見,簡單設計如下(基于Oracle資料庫):

create table notify_users

(

home_city number(3) /*手機使用者的歸屬地市編碼*/,

msisdn number(15) /*手機号碼*/,

user_id number(15) /*手機使用者的使用者辨別*/

);

對應JavaBean實體類NotifyUsers,具體代碼定義如下:

異步并行查詢加載子產品BatchQueryLoader的類圖結構:                                                                                                             我們通過并行查詢加載子產品BatchQueryLoader調用異步并行查詢執行器BatchQueryExecutor,來并行地加載不同資料源的查詢結果集合。StatementWrapper則是對JDBC裡面Statement的封裝。具體代碼如下所示:

定義兩個并行加載的異常類BatchQueryInterruptedException、BatchQueryExecutionException

再抽象出一個批量查詢接口,主要是為了後續能擴充在不同的資料庫之間進行批量加載。接口類BatchQuery定義如下

好了,現在封裝一個異步并行查詢執行器BatchQueryExecutor

最後的并行查詢加載子產品BatchQueryLoader直接就是調用上面的異步并行查詢執行器BatchQueryExecutor,完成不同資料源的資料并行異步加載,代碼如下

批量處理線程池運作參數配置加載BatchTaskConfigurationLoader子產品,主要從負責從batchtask-configuration.xml中加載線程池的運作參數。BatchTaskConfiguration批處理線程池運作參數對應的JavaBean結構

當然了,你進行參數配置的時候,還可以指定多個線程池,于是要設計一個:批處理線程池工廠類BatchTaskThreadFactoryConfiguration,來依次循環儲存若幹個線程池的參數配置

剩下的是,加載運作時參數配置子產品BatchTaskConfigurationLoader

上面的這些子產品主要是針對線程池的運作參數可以調整而設計準備的。

并行異步批處理子產品BatchTaskReactor主要類圖結構如下                                                                                                   BatchTaskRunner這個接口,主要定義了批處理架構要初始化和回收資源的動作。

我們還要重新實作一個線程工廠類BatchTaskThreadFactory,用來管理我們線程池當中的線程。我們可以把線程池當中的線程放到線程組裡面,進行統一管理。比如線程池中的線程,它的運作狀态監控等等處理,你可以通過重新生成一個監控線程,

來運作、跟蹤線程組裡面線程的運作情況。當然你還可以重新封裝一個JMX(Java Management Extensions)的MBean對象,通過JMX方式對線程池進行監控處理,本文的後面,有給出運用JMX技術,進行批處理線程池任務完成情況監控的實作,實作線程池中線程運作狀态的監控可以參考一下。這裡就不具體給出,線程池線程狀态監控的JMX子產品代碼了。言歸正傳,線程工廠類BatchTaskThreadFactory的實作如下

下面是關鍵子產品:并行異步批處理子產品BatchTaskReactor的實作代碼,主要還是對ThreadPoolExecutor進行地封裝,考慮使用有界的數組阻塞隊列ArrayBlockingQueue,還是為了防止:生産者無休止的請求服務,導緻記憶體崩潰,最終做到記憶體使用可控

采取的措施。

下面設計實作的是:交換機Hlr指令處理任務子產品。當然,在後續的業務發展過程中,還可能出現,其他類型指令的任務處理,是以根據“開閉”原則的定義,要抽象出一個接口類:BusinessEvent

然後具體的Hlr指令發送任務子產品HlrBusinessEvent要實作這個接口類的方法,完成使用者停複機Hlr指令的派發。代碼如下:

實際運作情況中,我們可能要監控一下指令發送的時長,于是再設計一個:針對Hlr指令發送任務子產品HlrBusinessEvent,切面嵌入代理的Hlr指令時長計算代理類:HlrBusinessEventAdvisor,具體的代碼如下:

剩下的,我們由于是要,異步并行計算得到執行結果,于是我們設計一個:批處理Hlr任務執行子產品HlrBusinessEventTask,它要實作java.util.concurrent.Callable接口的方法call,它會傳回一個異步任務的執行結果。

接下來,我們要把并行異步加載的查詢結果,和并行異步處理任務執行的子產品,給它組合起來使用,故重新封裝一個,通知使用者批處理任務管理類子產品:NotifyUsersBatchTask。它的主要功能是:批量并行異步加載查詢待停複機的手機使用者,然後把它放入并行異步處理的線程池中,進行異步處理。然後我們列印出,本次批處理的任務一共有多少,成功數和失敗數分别是多少(當然,本文還給出了另外一種JMX方式的監控)。NotifyTaskSuccCounter類,主要是統計派發的任務中執行成功的任務的數量,而與之相對應的類NotifyTaskFailCounter,是用來統計執行失敗的任務的數量。具體的代碼如下

異步處理任務執行送出子產品BatchTaskPredicate,主要是從線程池中采集異步送出要處理的任務,然後根據異步的執行結果,回報給線程池:這個任務執行成功還是執行失敗了。具體代碼如下:

最後,我們通過,通知使用者批處理任務管理類NotifyUsersBatchTask,它構造的時候,可以通過指定資料庫連接配接池,批量加載多個資料源的資料對象。這裡我們假設并行加載cms/ccs兩個資料源對應的notify_users表的資料,它的spring配置batchtask-multidb.xml配置内容如下:

我們再來實作一種,通過JMX方式進行線程池批處理任務完成情況的監控子產品。首先定義一個MBean接口,它根據計數器的名稱,傳回計數結果。

我們再來實作這個接口,于是設計得到BatchTaskMonitor子產品

其中,計數器的名稱,我已經在NotifyUsersBatchTask子產品中已經指定了。批處理任務執行成功計數器叫做:String NOTIFYTASKSUCCCOUNTER = "TASKSUCCCOUNTER"。批處理任務執行失敗計數器叫做String NOTIFYTASKFAILCOUNTER = "TASKFAILCOUNTER"。這樣我們就可以通過JConsole實作,監控線程池任務的運作處理情況了。

最終,我們要把上面所有的子產品全部“組裝”起來。用戶端調用方式的參考代碼,樣例如下所示

  我們再來運作一下,看下結果如何?先在資料庫中分别插入福州591、廈門592一共80條的待處理資料(實際上,你可以插得更多,越多越能展現出這種異步并行批處理架構的價值)。運作截圖如下:

  正如我們所預想地那樣。很好。

  現在,我們再通過JMX技術,檢視監控一下,并行批處理異步線程池任務的完成情況吧。我們先連接配接上我們的MBean對象BatchTaskMonitor。

  發現裡面有個暴露的操作方法getBatchTaskCounter(根據計數器名稱傳回計數結果)。我們在上面紅圈的輸入框内,輸入統計失敗任務個數的計數器TASKFAILCOUNTER,然後點選确定。最後運作結果如下所示:

     發現我們批處理任務,目前已經處理失敗了196個啦!正如我們希望的那樣,可視化實時監控的感覺非常好。

  寫在最後

  最終,我們通過并行異步加載技術和線程池機制設計出了一個精簡的批處理架構。上面的代碼雖然不算多,但是,有它很獨特的應用場景,麻雀雖小五髒俱全。相信它對于其他的同行朋友,還是很有借鑒意義的。況且現在的伺服器都是多核、多CPU的配置,我們要很好地利用這一硬體資源。對于IO密集型的應用,可以根據上面的思路,加以改良,相信一定能收到不錯的效果!

  好了,不知不覺地寫了這麼多的内容和代碼。本文的前期準備、編碼、調試、文章編寫工作,也消耗了本人大量的腦力和精力。不過還是挺開心的,想着能把自己的一些想法通過部落格的方式沉澱下來,對别人有借鑒意義,而對自己則是一種“學習和總結”。路漫漫其修遠兮,吾将上下而求索。故在此,抛磚引玉。如果本人有說地不對的地方,希望各位園友批評指正!不吝賜教!