天天看點

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

實時大資料分析是指對規模巨大的資料進行分析,利用大資料技術高效的快速完成分析,達到近似實時的效果,更及時的反映資料的價值和意義。

所有人都能了解資料的時效性對于資料的價值至關重要。以唯品會為例,唯品會已經有一整套非常成熟的離線資料倉庫系統。這套系統對于業務有非常大的指導意義,但目前碰到的問題是如何将各種計算、報表加速,從原來天級别、小時級别,加速到近實時來。

這是我們開始實時離線融合這個項目的緣由。該工作我們是從 2016 年下半年開始的,到目前為止它仍然隻是一個半成品,是以這裡面包含的很多内容并不是最終的結論,在多數情況下,它僅僅是以唯品會的特點為基礎,而不一定能無縫地适用于其他公司産品。我們希望抛磚引玉,對大家有所俾益。

1. 時效性與大資料

第一個問題是:什麼是實時(real-time)? 什麼是離線(offline)?很多時候,我們會當然的把實時等同于流處理(stream processing),等同于 storm、spark streaming。但其實所謂實時和離線的差別其實是從時延(latency)的角度出發,如果時延短的就是實時,時延長的就是離線。

而時延就是從資料産生到計算出結果的時間差,時延是從端到端的,不僅僅是 query 的執行時間。采用簡單的式子表示即為:時延 = 資料準備時間 + 查詢計算時間。

實時、近實時 (near realtime)、離線一般是以時延的時間長短為區分标準。實時表示毫秒、秒級時延;近實時主要是分鐘級時延;而離線是時延超過十分鐘。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

而何為批處理、流處理?批處理,也常被稱為 “離線”,即資料以一個完整的資料集被處理可以重複計算,資料在落盤之後定時或者按需啟動計算。一般情況下,批處理一次處理的資料量大,延遲較大,經常需要全量計算。流處理,也常被稱為 “實時”,即資料以流式的方式(增量)被處理,它與批處理的特點完成相反。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

然而實時計算并不等同于流式計算,即使大多數實時計算是流式計算,但很多也可以采用批處理來實作。同時,雖然在流式計算中實時或者準實時計算結果占了較大比例,流式計算也完全可能需要較長時間才能出結果,比如說 30 分鐘的 window,window 結束才輸出結果等。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

是以說,實時計算并不等同于流式計算。業務的實時化并不一定要借助于流式計算來實作。下面我們來看看目前資料進行中之是以實時化要流式計算的瓶頸在何處。

2. 現狀及問題  

唯品會是電子商務網站,資料可以分成兩大類: 行為埋點資料和交易類資料。下圖是交易類資料的一條典型處理鍊路,行為類資料的處理與之非常類似。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

這張圖其實代表了目前大資料處理的一種典型架構。對于實時和離線而言,這兩條路徑是從源頭開始就完全分離的。

對于離線 / 批處理而言,資料層層加工。使用者可以簡易地使用 sql,使用門檻低,并且其工具、理論、系統完備。然而它的延遲性高,并且不可控制(特别是在大促時)。

對于流式 / 實時計算而言,一切以時效性為目标,鍊路短,資料無層次,大量的應用直接處理 raw data。是以它的唯一優處在于它的時效性。但是它的開發難度高,邏輯複雜,資源需求很大,并且很難保證其資料品質。同時,需要為每個應用單獨去開發其應用邏輯,無法通用化。

對于實時應用(特别是報表)來說,對數是最痛苦的一件事情。典型場景是利用實時報表提供結果,但仍需要定時和離線報表去比對其正确性。一般普遍認為離線應用的精度要高于實時應用,但實時和離線的處理方法是完全不同的,其開發方式、方法,處理邏輯、資料來源都不一緻,導緻對數非常困難。而這其中最根本的是因為實時和離線從最本源開始就是兩條計算路徑。要在這完全不同的兩條路徑上對數,難度就非常非常大了。

我們也一直在反思怎麼樣才能更好的支援業務的實時化。因為業務方總是會在抱怨資料不準,和離線對不上,口徑沒更新,開發效率低下,周期時間長等狀況,明明我們也在努力加班,努力滿足業務方要求,卻發現總是不能滿足業務的需求。

3. 實時離線融合  

目前的實時化方法真的是正确的打開方式嗎? 對于這個問題,我們的了解是:

業務需要的是近實時。絕大部分業務隻需要時延在分鐘、甚至 5~10 分鐘級别就可以了。并不需要秒級的時延。是以用 storm/spark streaming 這樣的流式計算去實作,其實是一種殺雞用牛刀的行為。

業務方需要近實時,但目前隻有實時團隊才有能力實時化。這個的原因是流式計算的開發門檻太高。但其實業務方是希望以他們容易掌控的方式實作近實時,而不是交給實時團隊去排期開發。

基于上面的了解,我們開展了實時離線融合這個項目。這個項目的目的就是:

讓業務方以他們熟悉的批處理方法來實作近實時的計算。

讓實時團隊專注于系統和平台,而不是業務。

時延 = 資料準備時間 + 查詢時間。目前之是以無法用批處理方法實作近實時的計算就是因為這兩個步驟各自花的時間太長了。如果資料準備速度足夠快,并且計算速度也足夠靈活,那麼批處理也可以達到近實時的時延。

對于批處理而言,資料準備時間 = 定時排程時間 + 資料準備計算時間。隻有在兩者都很小的情況下,資料準備時間才能大幅度地縮短。是以對于資料準備來說,使用流式處理來實作資料的實時準備是非常合理的想法。同時,因為這種資料準備的一般是基礎資料,和業務邏輯關系不大,是以也是很适合用流式的方法來實作的。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

實時離線融合鍊路圖

在這個鍊路中,流式計算、批處理共享相同的資料準備步驟(清洗、打寬)。這些步驟保證資料是在毫秒級别就能處理完成的。處理完成的資料會落地到 hive 中去(時延控制在分鐘級别)。這樣,hive 中就有了近實時的已經準備好的基礎資料。需要近實時的應用就可以去通路這些資料了。

實時資料落地 hive, 即将大批量資料實時處理之後存入 hive 中,提供給後端業務系統進行處理。目前我們的做法是每 5 分鐘一個 hive 分區,資料按照 event time 落到相應的 hive 分區,等待一定時間後關閉這個分區(這裡我們借鑒了流進行中的 watermark 概念)。同時為了與現有的 hive 分區保持相容(即對于一個已關閉分區的兩次查詢應該得到相同的結果),也為了保證分區能及時關閉,規定若其資料在分區關閉後才到達,那麼該資料将會落地到下一個分區。

對于那些不關心分區是否已關閉,而時效性要求高的應用,其可以在分鐘級通路到資料(未關閉的分區);而對于大部分應用而言,可以選擇分區關閉後再查詢(資料準備的時延就在 5~6 分鐘左右)。

這種資料高頻落地也是存在着一些問題的。 

第一,小檔案過多(為了保證落地時延,必須增加并發),會導緻查詢變慢。

第二,以普通磁盤為主的 hdfs(hadoop 分布式檔案系統)時延不穩定(每個分區的資料快的幾秒就完成,慢的需要幾分鐘)。這就對資料落地的 spark streaming 任務帶來了挑戰。

為了改善這些情況,我們對曆史分區 compact 以減少其檔案數; 将普通磁盤為主的 hdfs 替換為 alluxio 和以 ssd 為主的 hdfs 以減少其落地波動。資料放在高速檔案系統中,不僅對落地波動情況有所改善,也可提高讀取速率。

對于和離線系統的無縫對接,我們目前的做法是在每個分區關閉後,向離線排程系統發信号說這個分區資料準備完成了,這樣離線排程系統就可以正常排程依賴這個分區的下遊任務了。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

當資料準備實時化了後,如何縮短離線查詢時間呢?查詢時間 = 定時排程時間 + 查詢計算時間。要達到近實時,必須減少其排程時間與查詢計算時間來提高離線應用。那麼我們需要将高頻排程定時為五分鐘甚至小于五分鐘,并且合理地控制資源使用量,在查詢計算時,保證其中間結果不落地,使用 spark sql、presto 替代 hive,并且使用 elasticsearch、druid、kylin 等做預計算,進而減少計算量,加速查詢計算。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

如上圖所示。離線應用的三個次元,分别是對 nrt 的要求(業務自身的屬性),實作最小時延的代價(人力資源、機器資源),對資料精度的要求。每個應用在實時化都要考慮如何在 3 者之間取得一個平衡。

這種平衡就決定了存在着三種模式。

第一種是零代價加速,通過實時資料落地,可以透明地享受 30-50 分鐘的加速;

第二種追求極緻的近實時,應用越實時越好,不惜一切代價,投入大量人力物力完全地重新實作邏輯;

第三種介于兩者之間,追求在資源有限情況下去加速,但盡量不增加其計算負擔。

實時離線融合在唯品會的進展:在實時技術、資料、業務中尋找平衡

在實時離線融合的場景下,es、druid、kylin 等的作用會越來越重要。因為如果應用能夠使用這些帶預計算的存儲來實作的話,那麼查詢計算時間就可以基本忽略不計。同時,因為這些存儲并沒有 hive 那樣的分區概念,是以清洗打寬完的資料其實是可以流式的落到這些存儲中去的(秒級)。那麼,使用者就可以以類似離線 sql 的方式實作秒級的資料查詢。

4. 實時離線融合帶來的挑戰  

實時離線融合并不是免費的午餐。它也帶來了一系列新的問題和挑戰。

對于實時 / 流式計算而言,它變成了所有大資料處理的一個前置。這就要求其作為平台具有很高的穩定性、可靠性、可管理性、資料品質、sla 保證。特别是現有的在流處理系統(storm、spark streaming、flink)在理論上還沒有完全實作 end-to-end exactly once 的情況下。一般認為批處理系統(hive、spark)是非常可靠的,且支援 exactly once 語義。将基礎資料準備從批處理系統替換為流處理系統,怎麼保證其可靠性不降低是一個非常大的挑戰。

如何確定 hive 中資料的品質,目前我們的做法是多方着手:

1. 全鍊路監控,保證資料品質;  2. 考慮各種極端場景的處理方法; 3. 發現問題時,如何重寫整個 hive 分區; 4. 保留目前的離線小時抽數邏輯用于對數。 5. 改造目前的流架構來提供更好的處理語義保證。

對于離線(hive、spark)來說,應用要實時化,就必須高頻排程。這也帶來了一系列挑戰。如何提高排程效率?如何處理在上一次排程沒執行完情況下下一個批次的排程問題(資料積壓)?如何防止過度占用系統資源?這需要對于排程系統和應用都進行改造。另外,我們需要區分熱資料和冷資料。熱資料使用單獨的 ssd 或者 alluxio 叢集,而冷資料存儲在普通的 hdfs 中。

實時離線融合我們目前也隻是完成了很多基礎資料的實時化,目前已經能夠比較明顯的看到效果。但這個任務是長期的。因為使用者一般更加喜歡使用天表等很寬的表,而目前實時化的更多是小時表等基礎表,如何實時化(或者加速)天表等寬表是我們目前在推進的一項工作。隻有等這部分工作完成後,我們才能說實時離線融合真正成功了。

作者介紹

姜偉華 博士,國内最早的 hadoop 發行版:idh 的産品開發經理。主要研究方向集中于對大資料開發,從事大資料開源工作,曾經在 intel 期間 2 年之内團隊培養出 10 位 committer,建立了上海大資料流處理 meetup,建立 2 個新的 apache 項目。目前在唯品會負責實時平台。

====================================分割線================================

本文作者:ai研習社