1
背景
得物供應鍊業務是紛繁複雜的,我們既有JIT的現貨模式中間夾着這大量的倉庫作業環節,又有到倉的寄售,品牌業務,有非常複雜的逆向鍊路。在這麼複雜的業務背後,我們需要精細化關注人貨場車的效率和成本,每一單的及時履約情況,要做到這一點我們需要各粒度和次元的資料來支撐我們的精細化管理。
1.1 業務早期
業務早期,業務回報我們背景管理系統某些報表查詢慢。查詢代碼可知,如下圖:
這種現象一般表現為:
- 大表JOIN,rdbms不擅長做資料聚合,查詢響應慢,調優困難;
- 多表關聯,索引優化,子查詢優化,加劇了複雜度,大量索引,讀庫磁盤空間膨脹過快;
- 資料量大,多元分析困難,跨域取數,自助拉到實時資料困難等。
一方面原因是系統設計之初,我們主要關注業務流程功能設計,事務型業務流程資料模組化,對于未來核心名額的落地,特别是關鍵實時名額落地在業務快速增長的情況下如何做到非常好的支撐。mysql在此方面越來越捉襟見肘。
另外一方面原因是mysql這種oltp資料庫是無法滿足實時資料分析需求的,我們需要探索一套實時資料架構,拉通我們的履約,倉儲,運配等各域的資料,做有效串聯,是以我們開始了我們的實時資料架構探索,下圖是我們一些思考。
附:資料視角的架構設計也是系統架構設計的重要組成部分。
2
架構演變
2.1 原始階段
2.1.1 通過Adb(AnalyticDB for MySQL)完成實時join
通過阿裡雲DTS同步直接将業務庫單表實時同步到Adb,通過Adb強大的join能力和完全相容mysql文法,可以執行任意sql,對于單表大資料量場景或者單表和一些簡單維表的join場景表現還是不錯的,但是在業務複雜,複雜的sql rt很難滿足要求,即使rt滿足要求,單個sql所消耗的記憶體,cpu也不盡人意,能支撐的并發量很有限。
2.1.2 通過Otter完成大寬表的建設
基于Canal開源産品,擷取資料庫增量日志資料并下發,下遊消費增量資料直接生成大寬表,但是寬表還是寫入mysql資料庫,實作單表查詢,單表查詢速度顯著提升,無olap資料庫的常見做法,通過寬表減少join帶來的性能消耗。
但是存在以下幾個問題:
- 雖然otter有不錯的封裝,通過資料路由能做一些簡單的資料拼接,但在調試上線複雜度上依然有不小的複雜度;
- otter僞裝mysql從庫同時要去做etl邏輯,把cdc幹的活和實時ETL的活同時幹了,耦合度較高
- 。
2.2 實時架構1.0
2.2.1 flink+kafka+ClickHouse
在上述調研嘗試後都沒有解決根本的問題,我們開始把目标建立标準的實時數倉的思路上來,在20年olap沒有太多的可選項,我們把目标放在clickhouse上。
- 為了保證順序append每次寫入都會生成一個part檔案,滿足一定條件背景定時合并。
- 非常弱的update delete,不能保證原子性和實時性。
- clickhouse隻适合資料量大,業務模型簡單,更新場景少的場景。
- 存算不分離,複雜查詢影響clickhouse寫入。
因為clickhouse的這些特性,尤其是不支援upsert的情況下,我們通常需要提前把大寬表的資料提前在flink聚合好,并且供應鍊資料生命周期長,作業流程也長如:
- 貨物的生命周期較短時長為一周,長周期時長超過1個月;
- 庫内環節異常的多,從賣家發貨到收貨、分揀、質檢、拍照、鑒别、防僞、複查、打包、出庫、買家簽收等十幾個甚至更多的環節,一張以商品實物id為主鍵的大寬表,需要join幾十張業務表;
- 供應鍊系統早期設計沒有每張表都會備援唯一單号(入庫單,作業單,履約單)這樣的關鍵字段,導緻沒辦法直接簡單的join資料。
在這樣一個架構下,們的flink在成本上,在穩定性維護上,調優上做的非常吃力。
附:
clickhouse不支援标準的upsert模式,可以通過使用AggregatingMergeTree 引擎字段類型使用SimpleAggregateFunction(anyLast, Nullable(UInt64)) 合并規則取最後一條非null資料可以實作upsert相似的功能,但讀時合并性能有影響。
2.3 實時架構2.0
2.3.1 flink+kafka+hologres
是以我們迫切的希望有支援upsert能力的olap資料庫,同時能搞定供應鍊寫多少的場景,也能搞定我們複雜查詢的場景,我們希望的olap資料至少能做到如下幾點:
- 有upsert能力,能對flink大任務做有效拆分;
- 存算分離,複雜業務計算,不影響業務寫入,同時能平滑擴縮容;
- 有一定的join能力帶來一些靈活度;
- 有完善的分區機制,熱資料查詢性能不受整體資料增長影響;
- 完善的資料備份機制。
這樣一個行列混合的olap資料庫,支援upsert,支援存算分離,還是比較符合我們的預期。
目前這樣一套架構支援了供應鍊每天數千人的報表取數需求,以及每天10億資料量的導出,通路量在得物所有to B系統中排名靠前。
2.3.2 我們遇到的一些問題
多時間問題
如何設定segment_key,選擇哪個業務字段作為segment_key供應鍊幾十個環節都有操作時間,在不帶segment_key的情況下性能如何保障,困擾了我們一段時間。
設定合理的segment_key如有序的時間字段,可以做到完全順序寫。每個segment檔案都有個min,max值,所有的時間字段過來隻需要去比較下在不在這個最小值最大值之間(這個動作開銷很低),不在範圍内直接跳過,在不帶segment_key查詢的條件下,也能極大的降低所需要過濾的檔案數量。
批流融合
背景:業務快速發展過程中,持續疊代實時任務成為常态。供應鍊業務複雜,環節多,流程往往長達一個月周期之久,這就導緻state ttl設定周期長。job的operator變化(sql修改),checkpoint無法自動恢複,savepoint恢複機制無法滿足,比如增加group by和join。重新消費曆史資料依賴上遊kafka存儲時效,kafka在公司平台一般預設都是存儲7天,不能滿足一個月資料回刷需求場景。
方案:通過批流融合在source端實作離線 + 實時資料進行資料讀取、補齊。
(1)離線按key去重,每個key隻保留一條,減少消息量下發。
(2)離線和實時資料合并,使用last_value取相同主鍵最新事件時間戳的一條資料。
(3)使用union all + group by方式是可作為代替join的一個選擇。
(4)實時資料取當日資料,離線資料取曆史資料,防止資料漂移,實時資料需前置一小時。
Join算子亂序
- 問題分析
由于join算子是對join鍵做hash後走不同的分片處理資料,開啟了2個并發後,再因為header_id字段的值變化,detail表2次資料流走到了2個不同的taskmanage,而不同的線程是無法保證輸出有序性的,是以資料有一定的機率會亂序輸出,導緻期望的結果不正确,現象是資料丢失。
- 解決辦法
通過header inner join detail表後,拿到detail_id,這樣再次通過detail_id join就不會出現(join鍵)的值會從null變成非null的情況發生了,也就不會亂序了。
insert into sink
Select detail.id,detail.header_id,header.id
from detail
left join (
Select detail.id AS detail_id,detail.header_id,header.id
from header
inner join detail
on detail.header_id = header.id
) headerNew
on detail.id = headerNew.detail_id
2.3.3 Hologres or starrocks
這裡也聊聊大家比較關注的hologres和starrocks,starrocks從開源開始也和我們保持了密切聯系,也做了多次的深入交流,我們也大緻列了兩者之間的一些各自優勢和對于我們看來一些不足的地方。
3
其他做的一些事情
3.1 開發提效工具——flink代碼生成器
參考MyBatis gennerator一些思想,利用模闆引擎技術,定制化模闆來生成flink sql。可以解決代碼規範,和提升開發效率。基本可以通過代碼配置來生成flink sql。
3.2 開發提效工具——可視化平台
直接通過配置的方式,線上寫sql,直接生成頁面和接口,一鍵釋出,同時引入緩存,鎖排隊機制解決高峰通路性能問題。
動态配置接口,一鍵生成rpc服務:
動态配置報表:
4
未來規劃
目前架構依然存在某種程度的不可能三角,我們需要探索更多的架構可能性:
(1)利用寫在holo,計算在mc避免holo這種記憶體資料庫,在極端查詢記憶體被打爆的問題,利用mc的計算能力可以搞定一些事實表join的問題提升一些靈活度。
(2) 借助apache hudi推進湖倉一體,hudi做批流存儲統一,flink做批流計算統一,一套代碼,提供5-10分鐘級的準實時架構,緩解部分場景隻需要準時降低實時計算成本。
作者:newbie & zero.x
來源:微信公衆号:得物技術
出處:https://mp.weixin.qq.com/s/4StnjSWpd4rO_GiajmPA6Q