天天看點

大資料實時處理:百分點實時計算架構和算法

當今時代,資料不再昂貴,但從海量資料中擷取價值變得昂貴,而要及時擷取價值則更加昂貴,這正是大資料實時計算越來越流行的原因。以百 分點公司為例,在高峰期每秒鐘會有近萬HTTP請求發送到百分點伺服器上,這些請求包含了使用者行為和個性化推薦請求。如何從這些資料中快速挖掘使用者興趣偏 好并作出效果不錯的推薦呢?這是百分點推薦引擎面臨的首要問題。本文将從系統架構和算法兩方面全介紹百分點公司在實時計算方面的經驗和心得體會,供讀者參 考。

a) 實時計算架構

大資料實時處理:百分點實時計算架構和算法

圖 1百分點大資料平台原理示意圖

工欲善其事,必先利其器。一個穩定可靠且高效的底層架構是實時計算的必要基礎。圖 1給出了百分點資料大平台的總體架構,如圖所示,大資料平台包含資料存儲和資料處理兩個層次。

存儲服務層提供了資料處理層需要的各類分布式存儲,包括分布式檔案系統(Hadoop HDFS)、分布式SQL資料庫(MySQL)、分布式 NoSQL資料庫(Redis、MongoDB、HBase)、分布式消息隊列(Apache Kafka)、分布式搜尋引擎(Apache Solr) 以及必不可少的Apache Zookeeper。

資料處理層由四個部分組成。其中Web應用雲包含了所有直接面對使用者的Web服務,每個Web應用都會産生Web日志以及其他實時資料,這些資料一 方面會及時交由實時計算架構進行處理,另一方面也會定期同步至離線計算架構;實時計算架構會處理接收到的實時資料,并将處理結果輸出到資料查詢架構或者離 線計算架構;離線計算架構則定期對資料進行處理,并将處理結果輸出至資料查詢架構;資料查詢架構提供了一系列應用接口供程式調取需要的各項資料,同時提供 了一些Web工具幫助業務人員對海量資料進行統計、彙總和分析。

在百分點大資料平台中,與實時計算密切相關的有實時計算架構和資料查詢架構,這部分的元件架構和資料流如圖 2所示。

大資料實時處理:百分點實時計算架構和算法

圖 2實時計算架構和資料查詢架構示意

從圖上可以看出,資料采集服務會将收集到的實時資料推送到消息隊列Kafka中;Kafka中的資料會被兩個處理平台 BDM CEP(Big Data Management Complex Event Processing)和Storm消費并處理。Storm是當 下比較流行的開源流處理架構,百分點公司在2013年中開始使用Storm進行資料清洗、統計和一部分分析任務。在引入Storm之前,百分點所有的實時 計算都是基于BDM CEP進行的,它是我們基于中間件ICE開發的一套流處理平台。BDM CEP包含有四類元件:dispatcher負責從 Kafka中讀取消息,根據消息内容分發給相應的worker;worker複雜處理接收到的消息,并将處理結果傳遞給其他worker或者輸出到各類存 儲服務中;config負責維護dispatcher和worker的互動關系和配置資訊,并在互動關系或配置更新時及時通知dispatcher和 worker;monitor負責監控dispatcher和worker的運作情況,把監控資訊送出給Ganglia,monitor還負責系統異常時 的報警,以及dispatcher和worker發生故障時進行重新開機和遷移。資料查詢架構由圖中最下層的三個元件組成,其中 BDM DS(Data Source)封裝了一系列的資料查詢邏輯并以REST API和ICE服務的形式供各種應用調 用;BDM OLAP(Online Analytical Processing)提供了實時查詢使用者行為和标簽明細,以及近實時的使用者多元度統計、彙 總和分析功能,這些功能是以REST API和Web應用方式提供的;BDM Search是對Solr和HBase的一次封裝,以REST API和 ICE服務方式對外提供近實時搜尋功能。

百分點公司的主要服務都是運作在這套架構上的,它擁有良好的穩定性和擴充性,一般來說隻需要增加水準擴充結點即可提高資料處理能力,這為百分點業務的穩定發展奠定了技術基礎。

b) 實時計算算法

要真正實作大資料實時計算,光有架構是不行的,還必須針對特定業務開發特定的處理流程和算法。相比較離線計算而言,實時計算在算法方面需要考慮的更 多,這是因為實時計算能夠用到的存儲資源遠不如離線,而且處理過程的時間限制要比離線計算嚴格,這都要求實時計算算法必須做相當多的優化。在這一節中,筆 者将以海量計數問題為例介紹百分點公司在實時計算算法方面的經驗。

目前,百分點資料平台上包含了近千萬的電商單品資料,實時追蹤這些單品的浏覽和交易資料是必須的,這也是做個性化推薦、商品畫像、銷量預測和使用者畫 像等業務的必要前提。我們的問題是:如何設計一種算法使得我們可以實時檢視任意單品最近24小時的浏覽量?這個問題描述起來很簡單,但稍加思索就會發現做 起來并不容易。下面我們先給出一個簡單方案,而後按照一定的原則逐漸精化到最佳方案。

c) 簡單方案

大資料實時處理:百分點實時計算架構和算法

圖 3按秒計數方案

看到這個問題時,大部分讀者會很快想到如圖 3所示的算法方案。圖中紅色、藍色和綠色的方塊分别表示不同的單品。在這個方案中,我們為每個單品儲存一份浏覽資訊,它包含兩個資料結構:

d) 曆史浏覽量清單(簡稱曆史),一個清單,清單中每個元素包含一個時間戳和一個整數,分别代表過去24小時中的某一秒及這一秒鐘的浏覽量,按時 間順序排序。這個清單的最長會包含24*3600=86400個元素,但一般情況下極少有單品時時刻刻都被浏覽,我們可以假設這個清單的平均長度不超過 10000。

e) 累計浏覽量(簡稱累計量),一個整數,代表截止到最後一次通路時的浏覽量。

如圖所示,假設藍色單品對應的資料是 [(t1, a1), (t2, a2), …, (tn, an)]和A。這表示t1時刻的該單品浏覽量是a1,t2時刻是a2,tn是最後一次記錄到浏覽該單品的時刻,浏覽量是an。截止到tn,該單品的總浏覽量是A。

當單品浏覽源源不斷進入到消息隊列時,處理程序(或線程)P1,P2…會實時讀取到這些資訊,并修改對應單品的資料資訊。例如,P1讀取到t時刻對藍色單品的浏覽記錄時,會進行下面的操作:

f) 得到目前時刻ct;

g) 對資料庫中藍色單品資料加鎖,加鎖成功後讀取出資料,假設曆史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;

h) 累計量遞增,即從A修改為A+1

i) 如果ct=tn,則更新曆史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最後删除時間戳小于ct-24*3600的清單元素,删除的同時 從累計量中減去對應時刻的浏覽量,例如隻有元素t1> ct-24*3600,則操作完成後的浏覽量為A+1-a1;

j) 将新的曆史和累計量輸出至資料庫,釋放鎖。

不難驗證這個方案是可以正确得出每個單品24小時内的浏覽量的,并且隻要在資源(計算、存儲和網絡)充足的情況下,資料庫中單品的浏覽量是實時更新的。這個方案也是分布式實時計算中最簡單最常見的一種模式。

k) 避免鎖

大資料實時處理:百分點實時計算架構和算法

圖 4不包含鎖的方案

第一個方案中需要對資料庫加鎖,無論加鎖粒度多細,都會嚴重影響計算效率。雖然像Redis一類的記憶體資料庫提供了incr這樣的原子操作,但這種操作多數情況下隻适用于整型資料,并不适合本問題的曆史資料。

要想提高實時處理效率,避免鎖是非常重要的。一種常見的做法是将并行操作串行化,就像MapReduce中的Reduce階段一樣,将key相同的 資料交由同一個reducer處理。基于這個原理,我們可以将方案改造為如圖 4所示,我們新增一個資料分發處理過程,它的作用是保證同一個單品的所有數 據都會發送給同一個處理程式。例如将藍色單品交由P1處理,紅色交由P2處理,綠色交由P3處理。這樣P1在處理過程中不需要對資料庫加鎖,因為不存在資 源競争。這樣可以極大的提高計算效率,于是整個計算過程變為:

l) 得到目前時刻ct;

m) 讀取資料庫中藍色單品資訊,假設曆史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;

n) 累計遞增,即從A修改為A+1

  • o) 如果ct=tn,則更新曆史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最後删除時間戳小于ct-24*3600的清單元素,删除的同時 從累量中減去對應時刻的浏覽量;

p) 将新的曆史和累計量輸出至資料庫。

步驟b)和e)省去了鎖操作,整個系統的并發性和吞吐量會得到大大提高。當然,沒有免費的午餐,這種方案的缺點在于存在單點隐患,例如一旦P1由于 某些原因挂掉了,那麼藍色單品的資料将得不到及時處理,計數結果将無法保證明時。這種計算過程對系統監控和故障轉移會有很高的要求。

q) 資料分層

大資料實時處理:百分點實時計算架構和算法

圖 5帶有本地緩存的方案

方案二已經可以大大提高計算效率,但這還不夠,我們可以看到在計算步驟b)和e)中總是要把曆史和累計量同時從資料庫中讀出或寫入,實際上這是沒有 必要的,因為隻有累計量才是外部必須使用的資料,而曆史隻是算法的中間資料。這樣,我們可以差別對待曆史和累計量,我們将曆史和累計量都緩存在計算程序 中,定期更新曆史至資料庫,而累計量則實時更新。新的方案如圖 5所示,計算過程變為:

r) 得到目前時刻ct;

s) 如果本地沒有藍色單品的資訊,則從資料庫中讀取藍色單品資訊;否則直接使用本地緩存的資訊。假設曆史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;

t) 累計量遞增,即從A修改為A+1

u) 如果ct=tn,則更新曆史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最後删除時間戳小于ct-24*3600的清單元素,删除的同時 從累計量中減去對應時刻的浏覽量;

v) 将新的累計量輸出至資料庫;如果滿足一定的條件(例如上次輸出時間足夠久遠,或者處理的消息量達到一定數量),則将曆史輸出至資料庫。

這種方案可以大大降低資料庫壓力、資料IO和序列化反序列化次數,進而提高整個系統的處理效率。資料分層實際上是計算機中一種常用的路數,例如硬體 中的高速緩存/記憶體/磁盤,系統IO中的緩沖區/磁盤檔案,資料庫的記憶體索引、系統DNS緩存等等。我們使用的開源搜尋引擎Solr就使用了同樣的思路達 到近實時索引。Solr包含磁盤全量索引和實時增加的記憶體增量索引,并引入了“soft送出”的方式更新新索引。新資料到達後,Solr會使用 “soft”送出的方式更新記憶體增量索引,在檢索的時候通過同時請求全量索引和增量索引并合并的方式獲得到最新的資料。之後會在伺服器空閑的時 候,Solr會把記憶體增量索引合并到磁盤全量索引中保證資料完整。

當然,這種方案也對系統的穩定性提出了更高的要求,因為一旦P1挂掉那麼它緩存的資料将丢失,及時P1及時重新開機,這些資料也無法恢複,那麼在一段時間内我們将無法得到準确的實時浏覽量。

w) 模糊化

現在,我們來考慮存儲資源問題。假設時間戳和整型都用long類型(8位元組)儲存,那麼按照方案一中的估計,我們對每個單品的需要記錄的資料大小約 為10000×(8+8)+8=16008位元組≈156KB,1000萬單品的資料總量将超過1T,如果考慮到資料庫和本地緩存因素,那麼整個系統需要的 存儲量至少是2T!這對于計數這個問題而言顯然是得不償失的,我們必須嘗試将資料量降低,在這個問題中可行的是降低曆史的存儲精度。我們将曆史定義為小時 級别精度,這樣每個單品的曆史至多有24個,資料量最多392位元組,1000萬單品的資訊總量将變為3.6G,系統總的存儲量不超過8G,這是可以接受 的。如果考慮用int類型代替long類型存儲時間(小時數),則存儲量可以進一步降低到不足6G。這樣新的計算過程變為:

x) 得到目前時刻精确到小時的部分ct;

y) 如果本地沒有藍色單品的資訊,則從資料庫中讀取藍色單品資訊;否則直接使用本地緩存的資訊。假設曆史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;

z) 累計量遞增,即從A修改為A+1

aa) 如果ct=tn,則更新曆史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最後删除小時數小于ct-24的清單元素,删除的同時從累計量中 減去對應時刻的浏覽量;

ab) 将新的浏覽量輸出至資料庫;如果滿足一定的條件,則将曆史輸出至資料庫。

在這種方案下,資料庫中存儲的并不是過去24小時内的浏覽量,而是過去23小時多一點内的。例如在1月2日12:15時資料庫中的浏覽量實際上是1月1日13:00到1月2日12:15的浏覽量!

這種降低資料精度的方法我們可以稱之為模糊化,它是用資源換效率的一種方法。在對資料精确性不是特别敏感的領域,這種方法可以大大降低系統資源使用 量、提高系統的處理效率。利用模糊化的實時算法快速得到近似結果,而後用離線算法慢慢修正結果的精确度,是百分點在大資料進行中經常使用的招數。

ac) 局部精化

大資料實時處理:百分點實時計算架構和算法

圖 6局部精華示意圖

有時候,模糊化會掩蓋掉一些重要的細節資訊,達不到業務需求的要求。例如,電商有很多的秒殺活動,此時必須及時監測單品浏覽量,如果我們還按小時維 度進行計算,那顯然不能滿足要求。這種情況下我們就必須對局部資料進行細化,它是模糊化的逆操作,我們稱之為局部精化。如圖 6所示,第k小時的資料是很 敏感的,我們希望它的資料能更實時一些,那我們可以将第k小時的資料切分的更細,對它做10分鐘、分鐘甚至秒級别的計算,而其他時間段仍舊采用小時精度。

這種方案會增加系統的設計和開發難度,而且必須有靈活的配置才能滿足多變的業務需求。

ad) 資料模組化

除了局部細化,還有一種方法可以提高資料的精确度,這就是資料模組化。在方案四中我們提到在小時精度下,實際上隻能得到23小時多一點之前的浏覽量, 有一部分資料丢失了沒有用到。實際上我們可以将丢棄掉的資料利用起來得到更好的結果。最簡單思路是假設同一小時内單品的浏覽量是線性增加的,那麼我們顯然 可以利用相鄰兩個小時的浏覽曆史推算出任意時刻的浏覽量。回到方案四中的例子,1月2日12:15的實時浏覽量可以通過下面的公式計算得出:

[a0 + (a1-a0)×(60-15)/60] + a1 + … + a24

其中a0代表1月1日12:00到13:00之間的浏覽量,依次類推,a24代表1月2日12:00到12:15之間的浏覽量。公式中的 a0 + (a1-a0)×(60-15)/60 估計了1月1日12:15-13:00之間的浏覽量,這樣就得出了從1月1日12:15到1月2日 12:15之間24小時内的浏覽量。

大資料實時處理:百分點實時計算架構和算法

圖 7某單品的全天浏覽分布

我們還可以利用更複雜的浏覽量分布模型得出精度更高的估計,圖 7給出了某單品一天的浏覽分布曲線,這個分布适用于絕大多數的商品以及絕大多數的時 間。是以,我們完全可以利用這個分布來更精确的估計每個單品的浏覽量,利用這個模型我們甚至不需要記錄浏覽曆史,隻需要知道當天0:00到目前的浏覽總量 就可以計算出前24小時内的浏覽量,甚至預測接下來的浏覽量情況!

當然,模型也不是萬能的,模型本身的建立和更新也是有代價的,如果模組化方法不恰當或者模型更新不及時,很有可能得出的結果會很差。

ae) 小結

本文首先介紹了百分點公司大資料平台的基本原理,并詳細說明了其中與實時計算相關部分,實時計算架構和資料查詢架構,的系統架構、處理流程和應用。 而後,我們以海量資料計數問題為例,深入淺出的介紹了在百分點公司在實時計算算法中常用的方法和技巧,以及它們适用的場景和可能帶來的問題。這些方法和技 巧具有普遍性和通用性,被廣泛應用于百分點個性化推薦引擎的各個子產品,包括使用者意圖預測、使用者畫像、個性化推薦評分、商品分類等等。如果能在實際業務中靈 活運用這些方法和技巧,則能夠大大提高實時計算的資料規模和處理效率,幫助業務快速發展。希望本文的介紹能夠幫助讀者更好的了解大資料實時計算的方方面 面。

繼續閱讀