天天看點

Storm在推薦系統中的應用

Storm簡介

       apache開源社群項目Storm,是一款分布式實時計算系統。它之上的應用易于開發與部署。關于他們的介紹,請移步http://storm.apache.org/,那裡有更官方且全面的介紹。 我們利用Storm擅長基于資料流并行計算的優勢,彌補Hadoop在實時計算方面的缺憾。這些使用日志采集系統(比如基于Kafka或者Scribe)作為輸入源計算出來的實時結果,将為推薦系統所享用。

       我們最早使用基于Hadoop的計算解決方案,由于資料采集方案的限制,最快的計算頻率也隻能是每小時一次。利用Kafka加Storm的解決方案後,計算結果送出頻次縮短到1分鐘一次。根據觀察,我們目前的設定尚未達到Storm叢集和資料庫的極限。這個送出周期仍然存在縮短的空間。

Storm應用

我們在Storm叢集上搭建應用處理線上的幾種次元的線上pv統計。其活動圖如圖1所示:

Storm在推薦系統中的應用

圖1 實時計算應用活動圖

多數的業務邏輯都是這樣的:

1.      不同的日志流來源擁有各自的Spout,負責資料的讀取、整理和簡單校驗。

2.      Spout将這些日志送向處理分發的下一層Bolt節點,做進一步的校驗和整理,然後分流到下一層業務各自的持久化Bolt中。

3.      持久化Bolt負責将合并計算的最終結果持久化到資料庫。

他們的部署圖類似圖2所示:

Storm在推薦系統中的應用

圖2 線上統計系統部署圖

我們在Spout這一層做簡單的日志格式校驗,如果出錯就記錄并彙報。第二層的Bolt處理的結果可能稍微複雜一點:這裡的節點有時候需要緩存部分特别的校驗資訊(來自于緩存或者資料庫)來做資料校驗,同時要做的更關鍵的事情是判斷應該根據業務分發給下層的哪種Bolt。我們的方案是每個資料表使用自己的Bolt。這樣的好處是不同的業務邏輯可以分開處理。線上的pv統計即便丢失掉一部分資料其實也無傷大雅(我們的可接受範圍是丢失率在5%以下),是以我們開辟了記憶體中的空間緩存中間計算結果。事實上,Storm的可靠性相當不錯,每個節點都能運作很久而沒有任何異常。如果有更高的要求,那麼可以求助于外部的緩存方案。

       利用Storm計算得到的線上資料,可以回饋給推薦系統:告訴系統哪些不良的推薦素材正在拉低推薦效果,或者哪些推薦素材推送次數過多。有些時候,這些資料還要作為參考去評定離線統計出來的資料是否有誤,甚至可以快速的定位問題,而無需臨時驅動一個不常用的離線計算任務(甚至需要額外開發)。我們目前離線資料計算和線上資料計算的差異在3%以下。

應用心得:

1. 合理利用流的分發政策

截止發文,Storm官方已經為資料分發實作了八種不同的解決方案:

²  ShuffleGrouping:資料元以随機形式分發的解決方案,保證負載均衡。

²  FieldsGrouping:以指定的Key進行分發的解決方案。

²  AllGrouping:顧名思義,給所有的bolt任務都發送資料。官方友情提示慎用此方案。

²  GlobalGrouping:資料隻會湧向id最小的那個bolt任務。注意,是隻有一個bolt任務會接收到資料。

²  NoneGrouping:本意是讓使用者無需關心具體實作的解決方案。目前其實是ShuffleGrouping。官方聲稱未來可能會有不一樣的實作。

²  DirectGrouping:由資料生産者直接指定要發送到的任務。隻能用于被聲明為DirectStream的流中,并且資料元(Tuple)必須使用emitDirect方法來發送。

²  Localor shuffleGrouping:如果目标bolt有一個或者多個task與源bolt的task在同一個工作程序中,tuple将會被随機發送給這些同程序中的tasks。否則,和普通的ShuffleGrouping行為一緻。意思是當有本地的bolt程序時,隻發送給本地任務,沒有才當成shuffleGrouping使用。 

²  Partial Key grouping:0.10.0版本新增的解決方案,類似FieldsGrouping。考慮到了下遊Bolt的負載均衡問題。

²  如果以上政策還不夠解決問題。那麼可以自定義方案,使用CustomGrouping。

我們使用的Storm版本是0.9.2。為了均衡負載,我們在Spout傳遞到第二層校驗層的時候,使用了shuffleGrouping的方式。根據圖3,我們可以看到各個節點的負載很平均。資料往最底層分發的時候,考慮到最終歸并後的結果集體入庫,除了均衡負載,不應該引入增加資料庫送出次數的問題。是以這裡的分發政策,我們使用了fieldsGrouping的方式:我們使用時間标簽和資料庫唯一鍵複合的形式作為分發的key。這樣的方式,使得在大部分的送出周期裡,資料庫的送出操作并沒有被增加,同時也達到均衡的目的。

Storm在推薦系統中的應用

圖3 Storm叢集的監控網頁

2. 使用簡單對象進行傳遞

在Storm的官方文檔上,我們并沒有發現這樣的注意事項,但是我們卻在實際應用中發現了以下的問題。我們一開始傳遞資料的時候,并不是像官方例子裡使用字元串類型進行傳遞,而是圖友善使用了序列化的對象。經過統計,我們發現資料轉化大概有10%左右的錯誤情況。當修改為使用字元串進行傳遞以後,這樣的問題就沒有了。有心人可以繼續追蹤。

3. 重載fail方法

編寫Spout和Bolt實作類的時候,建議重載fail方法。每次資料發送失敗後,發送方會調用自己的fail方法。是以,這個方法裡不僅能有效的植入一些自己的報警措施,也可以選擇再次發送資料,避免資料的丢失。如果很重要的資料,重複發送失敗之後可以引入離線修複的辦法去完成。

4. 預警措施

在Storm的應用過程中,我們曾經遇到過Storm的雪崩,zookeeper節點硬碟損壞導緻的worker假死。這些現象雖然通過Storm自動重新開機或者人工重新開機來解決。但是觸發頻率與發現時間卻是我們需要注意的問題。如果不能及時發現問題,将導緻損失掉一大部分的資料,并且不利于定位問題。我們考慮一種簡單的預警方法是從資料結果入手。我們認為如果臨近日期中同一時段資料出現了大幅度的波動,将是報警的時機。如果為了快速部署實作,簡單的SQL語句和shell腳本就能夠實作這種方案。

5. 補救措施

預警的政策決定之後,資料的補救方式也應當确定下來。當資料錯誤發生在大跨度的時間裡,我們可能會有數日的資料是缺失的。有兩種方案可以考慮。

第一個是令Kafka的offset進行回調。第二是使用離線計算的方式去補回結果。

第一種方案好處是,Storm系統無需修改。壞處是回灌資料将影響新資料的消費。

第二種方案不會有妨礙繼續消費的問題,不過需要有一份作為基準的離線資料與離線計算系統來支援。由于我們已經有這樣的日志系統,我們目前采用的是第二種政策。