天天看點

Blink 有何特别之處?菜鳥供應鍊場景最佳實踐miniBatch/microBatch攢批的間隔時間防止OOM,每個批次最多緩存多少條資料開啟LocalGlobal開啟PartialFinal

阿裡妹導讀:菜鳥供應鍊業務鍊路長、節點多、實體多,使得技術團隊在建設供應鍊實時數倉的過程中,面臨着諸多挑戰,如:如何實作實時變Key統計?如何實作實時逾時統計?如何進行有效地資源優化?如何提升多實時流關聯效率?如何提升實時作業的開發效率? 而 Blink 能否解決這些問題?下面一起來深入了解。

背景

菜鳥從2017年4月開始探索 Blink(即 Apache Flink 的阿裡内部版本),2017年7月開始線上上環境使用 Blink,作為我們的主流實時計算引擎。

為什麼短短幾個月的探索之後,我們就選擇Blink作為我們主要的實時計算引擎呢?

在效率上,Blink 提供 DataStream、TableAPI、SQL 三種開發模式,強大的 SQL 模式已經滿足大部分業務場景,配合半智能資源優化、智能傾斜優化、智能作業壓測等功能,可以極大地提升實時作業的開發效率;在性能上,諸如MiniBatch&MicroBatch、維表 Async&Cache、利用 Niagara 進行本地狀态管理等内部優化方案,可以極大地提升實時作業的性能;在保障上,Blink 自帶的 Failover 恢複機制,能夠實作線程級的恢複,可以做到分鐘級恢複,配合 Kmonitor 監控平台、烽火台預警平台,可以有效地實作實時作業的資料保障。

接下來,我将結合供應鍊業務的一些業務場景,簡要說明,Blink 如何解決我們遇到的一些實際問題。

回撤機制

訂單履行是供應鍊業務中最常見的物流場景。什麼是訂單履行呢?當商家 ERP 推單給菜鳥之後,菜鳥履行系統會實時計算出每筆訂單的出庫、攬收、簽收等節點的預計時間,配送公司需要按照各節點的預計時間進行訂單的配送。為了保證訂單的準點履約,我們經常需要統計每家配送公司每天各個節點的預計單量,便于配送公司提前準備産能。

看似很簡單的實時統計加工,我們在開發過程中遇到了什麼問題呢?履行重算!當物流訂單的上遊某個節點延遲時,履行系統會自動重算該筆訂單下遊所有節點的預計時間。比如某個物流訂單出庫晚點後,其後的預計攬收時間、預計簽收時間都會重算。而對于大部分的實時計算引擎來說,并不能很友好的支援這種變 Key 統計的問題。以前,資料量沒那麼大的時候,還可以通過 OLAP 資料庫來解決這類場景,當量上來後, OLAP 方案的成本、性能都是很大的問題。

除了 OLAP 方案,我們提倡采用 Blink 已經内置的 Retraction 機制,來解決這類變 Key 統計的問題,這也是我們在2017年初就開始嘗試 Blink 的重要原因。Blink 的Retraction 機制,使用 State 在記憶體或者外部儲存設備中對資料進行統計處理,當上遊資料源對某些彙總 Key 的資料做更新時,Blink 會主動給下遊下發一個删除消息進而“撤回”之前的那條消息,并用最新下發的消息對表做更新操作。

下面是一個簡化後的案例,供了解Blink Retraction的内部計算過程:

Blink 有何特别之處?菜鳥供應鍊場景最佳實踐miniBatch/microBatch攢批的間隔時間防止OOM,每個批次最多緩存多少條資料開啟LocalGlobal開啟PartialFinal

對于上述案例,可以通過 Blink 提供的強大的、靈活的、簡易的 SQL 開發模式來實作,隻需要幾行 SQL 即可完成。

select   plan_tms_sign_time
       ,sum(1) as plan_tms_sign_lgtord_cnt
from
       (select   lg_order_code
                ,last_value(plan_tms_sign_time) as plan_tms_sign_time
        from     dwd_csn_whc_lgt_fl_ord_ri
        group by lg_order_code
        ) ss
group by plan_tms_sign_time
;           

維表關聯

供應鍊業務的實體角色非常多(倉、配、分撥、站點、小件員、貨主、行業、地區等),實體繁多,這意味着我們在建設實時明細中間層的時候,會使用大量的維表關聯,這對 Blink 在維表關聯的性能上提出了更高的要求——如何提升大量的大小維表的關聯性能?Blink 從來沒讓使用者失望,Blink SQL 模式在維表關聯的性能上,也做了大量的優化:

優化1:Async IO,有一些實時計算引擎,維表關聯是采用同步通路的方式,即來一條資料,去資料庫查詢一次,等待傳回後輸出關聯結果。這種方式,可以發現網絡等待時間極大地阻礙了吞吐和延遲。而 Blink 采用了異步通路的模式,可以并發地處理多個請求和回複,進而連續地請求之間不需要阻塞等待,吞吐量大大提升。

優化2:緩存,維表關聯涉及到大量的維表查詢請求,其中可能存在大量相同 Key 的重複請求。Blink SQL 模式提供了緩存的機制,并提供 LRU 和 ALLCache 兩種緩存方案。

使用者可以通過配置 Cache='LRU' 參數,開啟 LRU 緩存優化。開啟後,Blink 會為每個 JoinTable 節點建立一個 LRU 本地緩存。當每個查詢進來的時候,先去緩存中查詢,如果存在則直接關聯輸出,減少了一次 IO 請求。如果不存在,再發起資料庫查詢請求,請求傳回的結果會先存入緩存中以備下次查詢。

如果維表資料不大,使用者可以通過配置 Cache='ALL' 參數,對維表進行全量緩存。這樣,所有對該維表的查詢操作,都會直接走本地緩存模式,幾乎沒有 IO,關聯的性能非常好。

優化3:緩存無效 Key,如果維表很大,無法采用 ALLCache 的方案,而在使用 LRU 緩存時,會存在不少維表中不存在的 Key 。由于命中不了緩存,導緻緩存的收益較低,仍然會有大量請求發送到資料庫,并且LRU模式下緩存裡的key不會永久保留,可以通過調整參數,設定保留時間。

優化4:Distribute By 提高緩存命中率,預設情況下,維表關聯的節點與上遊節點之間是 Chain 在一起,不經過網絡。這在緩存大小有限、Key 總量大、熱點不明顯的情況下, 緩存的收益可能較低。這種情況下可以将上遊節點與維表關聯節點的資料傳輸改成按 Key 分區。這樣通常可以縮小單個節點的 Key 個數,提高緩存的命中率。

除了上述幾點優化,Blink SQL 模式還在嘗試引入 SideInput、Partitioned ALL Cache 等優化方案,相信在随後開源的 Blink 版本中,維表關聯的性能會越來越好。

下面是一張來自 Flink Committer 雲邪 異步查詢的流程圖,供了解與同步請求的差異。

Blink 有何特别之處?菜鳥供應鍊場景最佳實踐miniBatch/microBatch攢批的間隔時間防止OOM,每個批次最多緩存多少條資料開啟LocalGlobal開啟PartialFinal

資料傾斜

無資料不傾斜,我們在實時數倉建設過程中,也當然會遇到資料傾斜問題。在統計賣家的單量時,有些賣家單量大,有些賣家單量小,單量超大的賣家,就會産生資料傾斜;在統計行業的單量時,有些行業單量大,有些行業單量小,單量超大的行業,就會産生資料傾斜;在統計貨品的庫存流水情況時,有些貨品庫存流水頻繁,一些貨品庫存流水較少,庫存流水超頻繁的貨品就會産生資料傾斜……

我們應該如何處理資料傾斜問題呢?以統計賣家的單量為例,以前我們會先把訂單這個 Key 作 Hash,先針對 Hash 之後的值做一次去重的聚合操作,再在此基礎上,再做一次針對原 Key 去重的聚合操作。兩次類似的聚合操作,導緻代碼寫起來比較複雜,體力勞動比較多。

2017年,我們的實時資料開始全面切換到 Blink 上,Blink 在資料傾斜這塊,又給我們提供了什麼的方案呢?Blink 給出的答案是:MiniBatch/MicroBatch+LocalGlobal+PartialFinal。

MiniBatch/MicroBatch,可以實作微批處理,進而減少對 State 的通路,提升吞吐。因為微批處理會導緻一定的延遲,最好結合 Blink 提供的允許延遲的相關參數來使用。

LocalGlobal,分為 Local 和 Global 兩個階段,有點類似 MapReduce 中的Combine 和 Reduce 兩個階段。LocalGlobal 可以很好地處理非去重類的聚合操作,但對 Count Distinct 的優化效果一般,因為在 Local 階段,可能 Distinct Key的去重率并不會很高,進而導緻後續的 Global 階段,仍然會有熱點。

PartialFinal,可以很好地解決 Count Distinct 帶來的資料傾斜問題。PartialFinal 可以将 Distinct Key 自動打散,先聚合一次,在此基礎上,再聚合一次,進而實作打散熱點的作用。PartialFinal 跟手動 Hash 再聚合兩次的效果一緻,通過 Blink 提供的 PartialFinal 參數,可以自動實作,不再需要人為手工編寫 Hash 再聚合兩次的代碼。

由上可以看出,Blink 在資料傾斜的處理上,已經實作了自動化,以前人為編寫的打散熱點方案,現在幾個參數就能全部搞定,大大提升了代碼的編寫效率。

下面是相關參數,使用者可以直接在 Blink 的作業參數中進行配置。

miniBatch/microBatch攢批的間隔時間

blink.miniBatch.allowLatencyMs=5000

blink.microBatch.allowLatencyMs=5000

防止OOM,每個批次最多緩存多少條資料

blink.miniBatch.size=20000

開啟LocalGlobal

blink.localAgg.enabled=true

開啟PartialFinal

blink.partialAgg.enabled=true

逾時統計

上架是倉儲業務的重要組成部分。上架,顧名思義,就是要把到倉的貨品,上到倉庫的存儲貨架上。上架一般分為采購上架、銷退上架、調撥上架等。及時上架是對倉庫的重要考核項之一,無論哪一種類型的上架,我們經常需要針對到貨後超過 x 小時未上架的訂單進行預警。

但是,Blink 的計算是消息機制,需要上遊發送消息才能觸發下遊計算,而上述的場景中,未上架就說明不會有上架的消息流入 Blink,進而無法完成下遊的計算。

對于這種實時逾時統計的問題,應該如何來解呢?我們嘗試了幾種方案,供參考:

方案1:針對部分 Source Connector,Blink 提供了"延時下發"的功能,使用者可以通過指定 DataDeliveryDelayMs 參數,實作消息延遲下發。正常的消息正常流入,正常消息也可以通過配置該參數,使其按照自己的需求延時流入。這樣,通過正常流入的消息關聯延時流入的消息,可以觸發 Blink 在消息正常流入時計算一次,在延時消息流入時再觸發計算一次。這種方案,可以實作我們的業務需求,但是這種方案會把所有消息重新發送一遍,而不僅僅是到貨後超過x小時未上架的消息,這樣會造成計算資源的浪費,我們不建議在資料量很大的場景下使用該方案。

方案2:如果有第三方的消息中間件,而這個消息中間件又能支援配置逾時下發的規則,這将是一個比較好的方案。據了解,Kafka 的最新版本已經能夠根據業務需求,配置消息逾時下發的規則。我們隻需要在 Blink 中,通過正常流入的消息流關聯關鍵Kafka 逾時下發的消息流,就可以觸發 Blink 進行逾時消息的統計。這樣,除了Blink,我們需要同時保障 Kafka 的穩定性。Kafka的逾時消息訂閱,可以參見:[1]。

方案3:我們能夠很自然的想到 CEP,而 Blink 也已經提供了 CEP 的功能,且已經SQL化。使用者可以通過 Blink CEP 完成上述業務需求的統計。在實操過程中,我們發現,通過 Blink CEP 統計的結果,往往與真實結果(明細彙總統計)有一定的出入。什麼原因呢?原來到貨時間,被回傳了多次,有可能開始回傳的是9點,但是後面發現回傳錯了,改成了8點,而 CEP 的 Watermark 是全局地向前走的,對于這種場景,無法很好的适配。

方案4:Flink 的 ProcessFunction,是一個 Low-Level 的流處理操作。通過改寫其中的 ProcessElement 方法,可以告訴 Blink的State 裡面存什麼,以及如何更新State;通過改寫 OnTimer 方法,可以告訴 State 何時下發逾時消息。通過對上述幾種方案的原理對比及性能壓測,我們最終選擇的也是這套方案。由于逾時場景,在供應鍊業務中非常常見,我們已經将該方案沉澱下來,同樣的場景,通過 1min 配置下相關參數,即可完成類似場景逾時消息的下發。

下面是方案4簡化後的實作架構圖,供了解相關實作及優勢。

Blink 有何特别之處?菜鳥供應鍊場景最佳實踐miniBatch/microBatch攢批的間隔時間防止OOM,每個批次最多緩存多少條資料開啟LocalGlobal開啟PartialFinal

零點起跳

每次大促,大屏上零點時刻雙十一的零點時刻一直是大家關注的焦點,為了在零點一過就讓各項名額盡快在大屏上展現出來,我們進行了一些端到端的優化,供參考。

優化1:合理調整 Blink 讀取上遊消息源的 FlushInterval 。我們知道 Blink 是以Block 的形式傳輸資料,如果 Block 一直積攢不滿,Block 可能一直等待無法下發。這種情況,我們可以通過調整 FlushInterval 參數,直接控制多長時間往下遊 sink 一次。這樣,Block 積滿或間隔達到滿足其中一個條件,Block 就會往下流。

優化2:合理調整 MiniBatch/MicroBatch的size 和 AllowLatency 參數。前文提到,MiniBatch/MicroBatch 是微批處理模式,都會帶來一定的延遲,可以通過合理控制 Size 和 AllowLatency 參數,來控制該模式帶來的延遲。與優化1一樣,兩者滿足其一,就會往下繼續執行。

優化3:合理控制寫 Checkpoint 的方式以及 Checkpoint 的大小。利用 Checkpoint 實作 Exactly Once 的容錯方式一直是 Flink 作為流引擎的一個亮點。但是過于複雜的運算和網絡環境有可能導緻 checkpoint 的對齊時間過長,進而導緻整個 Job 的延遲變長。同時,Exactly Once 模式下做 Checkpoint 的時間間隔與整個任務中資料流的延遲也是一個 Trade Off。是以我們在處理特别複雜的 Job 時也将這個因素考慮了進去,并沒有使用預設的 Exactly Once 方式,而是依舊實際需求采用了 At Least Once 。同時,将 Checkpoint 的周期設定為了60s,盡可能的保證了任務在延遲較小的情況下,在 Failover 的情形下仍然能做到快速恢複。

優化4:除了 Blink 端,在資料服務端,大屏上的實時資料,我們建議采用查詢性能優異的 Hbase 作為存儲引擎,可以保證零點一過,三秒内便能實作大屏資料的跳動。

……

未來展望

Blink 在不斷快速地發展,不僅僅是流處理,目前也開始支援批處理,使用者隻需要寫一套代碼就可以同時實作批和流的資料開發,目前在日志型的資料場景上,我們也正在探索利用 Blink 直接實作批流混合模式;不僅僅是半智能資源調優,目前開始内測智能資源調優,Blink 可以根據吞吐量、算子複雜度等因素,對線上作業的資源配置進行全智能自适應調優,再也不用在大促前手動更改資源配置;不僅僅是 Java,更期望有 Python 等多語言生态,來描述計算邏輯,相信開發效率又會上一個新的台階;不僅僅是 ETL,更期望有更廣闊的大資料算法內建,可以實作複雜的大資料AI場景……未來已來,我們相信,Blink 已經做好了迎接未來的準備。

參考資料:

[1]

https://ketao1989.github.io/2016/01/02/delayed-message-consume-service-use-kafka/

原文釋出時間為: 2019-05-23

本文作者: 晨笙、緣橋

本文來自雲栖社群合作夥伴“

阿裡技術

”,了解相關資訊可以關注“

”。