一、TDBank接入hive資料的痛點和挑戰
資料接入到Hive是TDW資料接入中應用最廣泛的場景,整體的資料流向路徑如下所示:

圖1 資料接入到TDW Hive的流向路徑
資料從源側發送,經過TDBus後存入MQ,然後由TDSort消費并根據業務規則進行分揀處理後存入中轉的hdfs目錄,再由配置的統一排程任務定時将資料以分區為機關寫入hive倉庫。可以看出,整個系統資料流經的環節較多,對運維和使用者具有如下的痛點:
難以保證明時入庫。資料多次流轉、統一排程本身排程的延遲、hdfs性能的抖動、gaia資源的競争(統一排程會通過hive生成gaia應用執行實際入庫邏輯)等都會導緻入庫延遲。
接入品質無法衡量。由于缺少入庫資料的對賬環節,導緻往往難以在第一時間感覺到資料接入品質的好壞。
接入和運維成本高。整個過程需要額外準備hdfs存儲資源、統一排程資源、hive資源、gaia計算資源,需維護這些資源和服務的可用性。這裡面僅僅是統一排程的入庫任務就占其總任務量的一半左右,給統一排程也帶來了巨大的計算量。
整個過程需要額外的物力和人力投入,且還無法保證入庫的及時性(不考慮資料遲到話入庫延遲一般在30分鐘到幾小時之間)。
除此之外,大資料接入還有如下的挑戰:
高流量和易運維性
目前tdbank接入的hive表總數為153978,日均的接入量為30萬億左右,其中最大的業務日均接入量達8萬億+。一方面流量巨大使得接入中斷或重新開機的成本非常高,一方面需接入的hive表和業務規則衆多,而我們需要根據業務規則把資料按照相應的格式落地到對應的hive。而這裡的接入資料和業務規則往往會動态變化,故我們需要靈活高效的适應業務規則的變動。
接入延遲和資料碎片
接入延遲和資料碎片是一對沖突體。追求低接入延遲會導緻産生資料碎片,不利于HDFS的存儲,并降低資料查詢的效率。而高接入延遲在某些場景下無法被使用者接受,在實際中需要權衡。
異常處理和資料一緻性
流式資料處理過程中随時可能因為機器、磁盤、人為、軟體等故障原因中斷或重新開機,這種情況下必然有一部分資料是on the fly的,進而導緻了不一緻性,在大資料流量場景下會更加明顯。Flink作為流式資料處理領域最流行的架構為我們提供了分布式系統流式資料處理時具有exactly_once語義的checkpoint機制,以幫助解決異常恢複問題,但應用仍然需要自己處理source和sink的狀态儲存和恢複,其中sink側的處理尤其具有挑戰性。
名額統計
從業務和運維角度,需要按表分區的次元統計名額資料。分布式系統中名額統計會面臨兩個問題:一是如何對名額按所需次元做彙聚;二是異常恢複時如何對名額進行復原。
資料(負載)傾斜
TDSort運作在gaia上,gaia目前隻支援對CPU和記憶體進行管控,而流式資料進行中IO資源,尤其是網絡IO也是一種寶貴的資源。在大資料流量場景下極易發生因節點流量不均勻而導緻的資料傾斜。
故障轉移
大流量下,流式資料處理應用啟停的代價相對較高,而機器、磁盤等經常會因為一些原因發生故障,這時需要有便利的手段使得運維人員可以進行剔除gaia節點、切換gaia叢集等操作。
Sink(HDFS)性能抖動
HDFS性能抖動或故障除了導緻資料無法寫入、吞吐降低外,還會導緻TDSort做checkpoint時因逾時而失敗。
二、接入實時性優化和功能增強
TDBus可以幫助收斂MQ的producer連接配接數并提供一個業務次元名額統計的切入點,MQ是資料暫存并可削峰平谷、解耦資料發送和資料處理,TDSort作為類似ETL或者data pipeline的角色承載了主要的資料接入邏輯,從業務角度審視都有其存在的必要性。入庫任務主要承擔如下功能:
根據排程配置定期去中轉的hdfs目錄上檢查某個分區的資料是否已準備就緒;
準備就緒後建立hive外表,然後通過執行sql将資料從中轉目錄插入到實際的hive分區目錄,這個過程是統一排程送出sql到hive server,hive server再在gaia上送出并運作任務完成的,中間涉及到的資料格式的轉換也都是gaia上的任務來完成的。
基于以上分析,我們做了如下優化:
去除了統一排程任務入庫的邏輯,業務資料由TDSort直接寫入hive庫。為了做到直接入庫,TDSort除了需要擷取到hive庫表、分區等相關資訊外,還需要支援将源資料轉換為所需要的hive檔案格式、壓縮類型等。
提供了高效的分區入庫狀态查詢服務TDLedger
增加了端到端對賬的支援,同樣由TDLedger承載。
對checkpoint的全面支援。
通過oceanus平台啟停TDSort應用。Oceanus為我們提供了友善的任務啟停、checkpoint儲存、曆史checkpoint點管理和恢複、資源管理和審批等功能,讓我們可以聚焦于業務本身。
優化後的資料流向圖如下所示:
圖2 優化後的hive資料接入流向
除了資料流向本身的優化外,圖中同時新增了入口名額流和出口名額流的統計計算,并在TDLedger側進行對賬,這對使用者和運維側也是非常重要的功能。
三、接入實時性優化效果
以日均接入6000億、gzip壓縮、文本格式接入的業務為例,下面為優化前後的對比:
入庫延遲可以滿足TP99<15min
圖3 優化後hive入庫延遲時間分布
有效降低了成本和資源的投入,包括hdfs存儲資源、統一排程資源、hive資源、gaia計算資源等。
很自然地解決了資料遲到問題,不論遲到多久的資料都可以安全入庫,同時也允許其他管道來源的資料寫入。
降低了系統複雜度,入庫不再需要統一排程的支援,不再依賴運維側的一些腳本。
通過oceanus統一管理了曆史checkpoint、資源、權限、任務啟停等,并将TDSort運作在gaia上,進而更便于運維和維護。
四、其他接入挑戰的解決實踐
1. 高流量和易運維性
對topic内的資料抽象了tid的概念,每個tid和一個hive表關聯,每條資料歸屬于一個tid,這樣就可以在一個topic内接入多個hive表的資料。
基于zookeeper做了配置服務,這樣可以動态的下發配置和感覺變動,并動态的接入新的topic。
接入服務TDSort基于流式資料處理領域最流行的flink開發,采用如下的拓撲結構:
圖4 TDSort拓撲結構
2. 接入延遲和資料碎片
定義單個檔案最大大小和最大資料延遲兩個次元,業務根據需要進行配置。
對接入延遲容忍度較低的業務,通過小檔案壓縮任務定期對小檔案進行合并。
3. 異常處理和資料一緻性
Source側:Checkpoint時儲存MQ的offset資訊,這樣異常時就可以從前一個成功的checkpoint進行恢複。
Sink側:對落地的HDFS檔案名進行特意設計,這樣我們從checkpoint恢複進行rollback時才能知道哪些檔案是可以被安全删除的。這裡不能根據檔案的修改時間戳進行判斷,因為每個gaia節點的時鐘并不一定是完全一緻的,而HDFS的性能也會有抖動導緻上傳檔案有延遲。
需要優先確定服務的可用性,而異常復原是一個耗時的操作,故設計為異步的,保證資料的最終一緻性。
運維下發停止指令後可以停止MQ消費,并将on the fly的資料排幹後再停止應用,這樣可以有效降低下次啟動時巨大的checkpoint恢複成本。
遇到HDFS故障時可以将本地磁盤作為暫存,這樣可以避免checkpoint因逾時失敗的問題,并有效降低下次啟動時巨大的checkpoint恢複成本。
4. 名額統計
如圖4所示,TDSort由source、writer、checker三級vertex構成,其中checker按照期望的次元對名額進行彙聚(相當于sql中的group by後組内進行sum),進而可得到相應的名額資料。
存儲每條名額資料時,同時存儲checkpointId和名額發送時間,這樣在rollback時根據checkPointId和名額發送時間删除相應記錄即可。
5. 資料(負載)傾斜
仔細觀察會發現,流量傾斜主要發生在Source和Writer節點之間。如下圖所示,對于每個gaia叢集,我們引入了稱之為Router的協調者。每個節點會定期上報IO相關的負載資訊到Router,Router會根據最近一段時間的流量情況判斷是否有機器的IO高于設定的門檻值,如果是的話則找出發資料過來的Source節點,從中找出流量最大的資料通道并進行分裂,将資料分發到負載較低的節點上去,實作IO的負載均衡。
圖5 Router調整資料路由過程
6. 故障轉移
TDSort可根據運維下發的指令動态停止某一些Source或者Writer而又不用重新開機整個應用,這在某些機器故障的情況下非常有用,可以避免成本較高的應用啟停,并實作人為控制下的故障和流量轉移。
7. Sink(HDFS)性能抖動
使用本地磁盤作為暫存,在HDFS性能抖動時将資料存入本地磁盤,不堵塞資料接入,并使checkpoint快速通過。
每個節點通過常駐的uploader上傳檔案,這樣可在sort停止後将殘留檔案也上傳到HDFS,確定不丢失資料。
五、總結
新的接入方案在接入成本、接入延遲上都有了較為明顯的優化效果,減輕了對統一排程系統的負載壓力,并具備了端到端的業務對賬能力。在公司開源協同的大背景下,TDBank的hive資料實時接入方案已經應用在pcg資料的接入中,并将逐漸替換pcg現有的基于atta的資料接入。對TEG信安資料的接入目前也在進行中,後續我們還計劃對現網存量的TDBank資料接入任務也進行遷移。
作者:馬中斌
來源:騰訊大講堂 微信公衆号
原文連結:
https://mp.weixin.qq.com/s/Ori1oWJNsJpxRj4YX_DZcw