天天看點

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

作者:SelectDB

随着大資料應用的不斷深入,企業不再滿足離線資料加工計算的時效,實時資料需求已成為資料應用新常态。伴随着實時分析需求的不斷膨脹,傳統的資料架構面臨的成本高、實時性無法保證、元件繁冗、運維難度高等問題日益凸顯。為了适應業務快速疊代的特點,幫助企業提升資料生産和應用的時效性、進一步挖掘實時資料價值,實時數倉的建構至關重要。

本文将分享如何基于 Home - Apache Doris 和 Apache Flink 快速建構一個極速易用的實時數倉,包括資料同步、資料內建、數倉分層、資料更新、性能提升等方面的具體應用方案,在這之前,我們先可以先了解一下傳統的資料架構如何設計的、又存在哪些痛點問題。

# 實時數倉的需求與挑戰

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

上圖所示為傳統的資料架構,如果我們從資料流的⻆度分析傳統的資料處理架構,會發現從源端采集到的業務資料和日志資料主要會分為實時和離線兩條鍊路:

  • 在實時資料部分,通過 Binlog 的⽅式,将業務資料庫中的資料變更 (CDC,Change Data Capture)采集到實時數倉。同時,通過 Flume-Kafka-Sink 對日志資料進⾏實時采集。當不同來源的資料都采集到實時存儲系統後,便可以基于實時存儲系統來建構實時數倉。在實時數倉的内部,我們仍然會遵守傳統數倉分層理論,将資料分為 ODS 層、DWD 層、DWS 層、 ADS 層以實作最大程度的模型複用。
  • 在離線資料部分,通過 DataX 定時同步的⽅式,批量同步業務庫 RDS 中的資料。當不同來源的資料進⼊到離線數倉後,便可以在離線數倉内部,依賴 Spark SQL 或 Hive SQL 對資料進⾏定時處理,分離出不同層級 (ODS 、DWD 、ADS 等)的資料,并将這些資料存在⼀個存儲媒體上,⼀般會采用如 HDFS 的分布式檔案系統或者 S3 對象存儲上。通過這樣的⽅式,離線數倉便建構起來了。與此同時,為了保障資料的⼀緻性,通常需要資料清洗任務使⽤離線資料對實時資料進⾏清洗或定期覆寫,保障資料最終的⼀緻性。

從技術架構的⻆度對傳統資料技術棧進行分析,我們同樣會發現,為了迎合不同場景的需求,往往會采用多種技術棧,例如在湖倉部分通常使用的是 Hive 、Iceberg 、Hudi 等資料湖;面向湖上資料的 Ad-hoc 查詢一般選擇 Impala 或 Presto;對于 OLAP 場景的多元分析,一般使⽤ Doris 或 Kylin、 Druid。除此之外,為應對半結構化資料的分析需求,例如日志分析與檢索場景,通常會使⽤ ES 進行分析;面對高并發點查詢的 Data Serving 場景會使⽤ HBase;在某些場景下可能還需要對外提供統⼀的資料服務,這時可能會使⽤基于 Presto/Trino 的查詢⽹關,對⽤戶提供統一查詢服務。其中涉及到的資料元件有數十種,高昂的使用成本群組件間相容、維護及擴充帶來的繁重壓力成為企業必須要面臨的問題。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

從上述介紹即可知道,傳統的資料架構存在幾個核心的痛點問題:

  • 傳統資料架構元件繁多,維護複雜,運維難度非常高。
  • 計算、存儲和研發成本都較高,與行業降本提效的趨勢背道而馳。
  • 同時維護兩套資料倉庫(實時數倉和離線數倉)和兩套計算(實時資料量和實時計算任務),資料時效性與一緻性無法保證。

在此背景下,我們亟需⼀個“極速、易用、統一、實時”的資料架構來解決這些問題:

  • 極速:更快的查詢速度,最大化提升業務分析人員的效率;
  • 易用:對于使用者側的使用和運維側的管控都提供了極簡的使用體驗;
  • 統⼀:異構資料與分析場景的統一,半結構化和結構化資料可以統⼀存儲,多分析場景可以統一技術棧;
  • 實時:端到端的高時效性保證,發揮實時資料的價值。

# 如何建構極速易用的實時數倉架構

基于以上的需求,我們采取 Apache Doris 和 Apache Flink 來建構極速易用的實時數倉,具體架構如下圖所示。多種資料源的資料經過 Flink CDC 內建或 Flink Job 加⼯處理後,⼊庫到 Doris 或者 Hive/Iceberg 等湖倉中,最終基于 Doris 提供統⼀的查詢服務。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

在資料同步上,通過 Flink CDC 将 RDS 的資料實時同步到 Doris;通過 Routine Load 将 Kafka 等消息系統中的資料實時同步到 Doris 。在數倉分層上,ODS 層通常選擇使用明細模型建構,DWD 層可以通過 SQL 排程任務對 ODS 資料抽取并擷取,DWS 和 ADS 層則可以通過 Rollup 和物化視圖進行建構。在資料湖上, Doris ⽀持為 Hive、Iceberg 、Hudi 以及Delta Lake(todo)提供聯邦分析和湖倉加速的能⼒。在資料應用上,Apache Doris 既可以承載批量資料加工處理的需求,也可以承載高吞吐的 Adhoc 和高并發點查詢等多種應⽤場景。

# 解決方案

如何實作資料的增量與全量同步

1. 增量及全量資料同步

在全量資料和增量的同步上,我們采取了 Flink CDC 來實作。其原理非常簡單,Flink CDC 實作了基于 Snapshot 的全量資料同步、基于 BinLog 的實時增量資料同步,全量資料同步和增量資料同步可以⾃動切換,是以我們在資料遷移的過程中,隻需要配置好同步的表即可。當 Flink 任務啟動時,優先進⾏曆史表的資料同步,同步完後⾃動切換成實時同步。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

2. 資料一緻性保證

如何保證資料一緻性是大家重點關注的問題之一,那麼在新架構是如何實作的呢?

資料⼀緻性⼀般分為“最多⼀次” 、“⾄少⼀次”和“精确⼀次”三種模型。

最多⼀次(At-Most-Once):發送⽅僅發送消息,不期待任何回複。在這種模型中,資料的⽣産和消費過程中可能出現資料丢失的問題。⾄少⼀次(At-Least-Once):發送⽅不斷重試,直到對⽅收到為⽌。在這個模型中,⽣産和消費過程都可能出現資料重複。精确⼀次(Exactly-Once):能夠保證消息隻被嚴格發送⼀次,并且隻被嚴格處理⼀次。這種資料模型能夠嚴格保證資料⽣産和消費過程中的準确⼀緻性。

  • Flink CDC 通過 Flink Checkpoint 機制結合 Doris 兩階段送出可以實作端到端的 Exactly Once 語義。具體過程分為四步:
  • 事務開啟(Flink Job 啟動及 Doris 事務開啟):當 Flink 任務啟動後, Doris 的 Sink 會發起 Precommit 請求,随後開啟寫⼊事務。
  • 資料傳輸(Flink Job 的運⾏和資料傳輸):在 Flink Job 運⾏過程中, Doris Sink 不斷從上遊算⼦擷取資料,并通過 HTTP Chunked 的⽅式持續将資料傳輸到 Doris。
  • 事務預送出:當 Flink 開始進⾏ Checkpoint 時,Flink 會發起 Checkpoint 請求,此時 Flink 各個算⼦會進⾏ Barrier 對⻬和快照儲存,Doris Sink 發出停⽌ Stream Load 寫⼊的請求,并發起⼀個事務送出請求到 Doris。這步完成後,這批資料已經完全寫⼊ Doris BE 中,但在 BE 沒有進⾏資料釋出前對⽤戶是不可⻅的。
  • 事務送出:當 Flink 的 Checkpoint 完成之後,将通知各個算⼦,Doris 發起⼀次事務送出到 Doris BE ,BE 對此次寫⼊的資料進⾏釋出,最終完成資料流的寫⼊。
如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

綜上可知,我們利用 Flink CDC 結合 Doris 兩階段事務送出保證了資料寫入一緻性。需要注意的是,在該過程中可能遇到一個問題:如果事務預送出成功、但 Flink Checkpoint 失敗了該怎麼辦?針對該問題,Doris 内部支援對寫⼊資料進⾏復原(Rollback),從⽽保證資料最終的⼀緻性。

3. DDL 和 DML 同步

随着業務的發展,部分⽤戶可能存在 RDS Schema 的變更需求。當 RDS 表結構變更時,⽤戶期望 Flink CDC 不但能夠将資料變化同步到 Doris,也希望将 RDS 表結構的變更同步到 Doris,⽤戶則無需擔⼼ RDS 表結構和 Doris 表結構不⼀緻的問題。

Light Schema Change

目前,Apache Doris 1.2.0 已經實作了 Light Schema Change 功能,可滿⾜ DDL 同步需求,快速⽀持 Schema 的變更。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

Light Schema Change 的實作原理也比較簡單,對資料表的加減列操作,不再需要同步更改資料檔案,僅需在 FE 中更新中繼資料即可,進而實作毫秒級的 Schema Change 操作,且存在導入任務時效率的提升更為顯著。在這個過程中,由于 Light Schema Change 隻修改了 FE 的中繼資料,并沒有同步給 BE。是以會産⽣ BE 和 FE Schema 不⼀緻的問題。為了解決這種問題,我們對 BE 的寫出流程進⾏了修改,具體包含三個⽅⾯。

  • 資料寫⼊:FE 會将 Schema 持久化到中繼資料中,當 FE 發起導⼊任務時,會把最新的 Schema 一起發給 Doris BE,BE 根據最新的 Schema 對資料進⾏寫⼊,并與 RowSet 進⾏綁定。将該 Schema 持久化到 RowSet 的中繼資料中,實作了資料的各⾃解析,解決了寫⼊過程中 Schema 不⼀緻的問題。
  • 資料讀取:FE ⽣成查詢計劃時,會把最新的 Schema 附在其中⼀起發送給 BE,BE 拿到最新的 Schema 後對資料進⾏讀取,解決讀取過程中 Schema 發⽣不⼀緻的問題。
  • 資料 Compaction:當資料進⾏ Compaction 時,我們選取需要進⾏ Compaction 的 RowSet 中最新的 Schema 作為之後 RowSet 對應的 Schema,以此解決不同 Schema 上 RowSet 的合并問題。

經過對 Light Schema Change 寫出流程的優化後, 單個 Schema Chang 從 310 毫秒降低到了 7 毫秒,整體性能有近百倍的提升,徹底的解決了海量資料的 Schema Change 變化難的問題。

Flink CDC DML 和 DDL 同步

有了 Light Schema Change 的保證, Flink CDC 能夠同時⽀持 DML 和 DDL 的資料同步。那麼是如何實作的呢?

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉
  • 開啟 DDL 變更配置:在 Flink CDC 的 MySQL Source 側開啟同步 MySQL DDL 的變更配置,在 Doris 側識别 DDL 的資料變更,并對其進⾏解析。
  • 識别及校驗:當 Doris Sink 發現 DDL 語句後,Doris Sink 會對表結構進⾏驗證,驗證其是否⽀持 Light Schema Change。
  • 發起 Schema Change :當表結構驗證通過後,Doris Sink 發起 Schema Change 請求到 Doris,從⽽完成此次 Schema Change 的變化。

解決了資料同步過程中源資料⼀緻性的保證、全量資料和增量資料的同步以及 DDL 資料的變更後,一個完整的資料同步⽅案就基本形成了。

如何基于 Flink 實作多種資料內建

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

除了上文中所提及的基于 Flink CDC 進行資料增量/全量同步外,我們還可以基于 Flink Job 和 Doris 來建構多種不同的資料內建方式:

  • 将 MySQL 中兩個表的資料同步到 Flink 後,在 Flink 内部進⾏多流 Join 完成資料打寬,後将⼤寬表同步到 Doris 中。
  • 對上遊的 Kafka 資料進⾏清洗,在 Flink Job 完成清洗後通過 Doris-Sink 寫⼊ Doris 中。
  • 将 MySQL 資料和 Kafka 資料在 Flink 内部進⾏多流 Join,将 Join 後的寬表結果寫⼊ Doris中。
  • 在 Doris 側預先建立寬表,将上遊 RDS 中的資料根據 Key 寫入, 使⽤ Doris 的部分列更新将多列資料分别寫⼊到 Doris 的⼤寬表中。

如何選擇資料模型

Apache Doris 針對不同場景,提供了不同的資料模型,分别為聚合模型、主鍵模型、明細模型。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

AGGREGATE 聚合模型

在企業實際業務中有很多需要對資料進行統計和彙總操作的場景,如需要分析網站和 APP 通路流量、統計使用者的通路總時長、通路總次數,或者像廠商需要為廣告主提供廣告點選的總流量、展示總量、消費統計等名額。在這些不需要召回明細資料的場景,通常可以使用聚合模型,比如上圖中需要根據門店 ID 和時間對每個門店的銷售額實時進行統計。

UNIQUE KEY 主鍵模型

在某些場景下使用者對資料更新和資料全局唯一性有去重的需求,通常使用 UNIQUE KEY 模型。在 UNIQUE 模型中,會根據表中的主鍵進⾏ Upsert 操作:對于已有的主鍵做 Update 操作,更新 value 列,沒有的主鍵做 Insert 操作,比如圖中我們以訂單id為唯一主鍵,對訂單上的其他資料(時間和狀态)進行更新。

DUPLICATE 明細模型

在某些多元分析場景下,資料既沒有主鍵,也沒有聚合需求,Duplicate 資料模型可以滿足這類需求。明細模型主要用于需要保留原始資料的場景,如日志分析,使用者行為分析等場景。明細模型适合任意次元的 Ad-hoc 查詢。雖然同樣無法利用預聚合的特性,但是不受聚合模型的限制,可以發揮列存模型的優勢(隻讀取相關列,而不需要讀取所有 Key 列)。

如何建構數倉分層

由于資料量級普遍較大,如果直接查詢數倉中的原始資料,需要通路的表數量和底層檔案的數量都較多,展現在日常工作中就是 SQL 異常複雜、計算耗時增高。而分層要做的就是對原始資料重新做歸納整理,在不同層級對資料或者名額做不同粒度的抽象,通過複用資料模型來簡化資料管理壓力,利用血緣關系來定位資料鍊路的異常,同時進一步提升資料分析的效率。在 Apache Doris 可以通過以下多種思路來建構資料倉庫分層:

微批排程

通過 INSERT INTO SELECT 可以将原始表的資料進行處理和過濾并寫入到目标表中,這種 SQL 抽取資料的行為一般是以微批形式進行(例如 15 分鐘一次的 ETL 計算任務),通常發生在從 ODS 到 DWD 層資料的抽取過程中,是以需要借助外部的排程工具例如 DolphinScheduler 或 Airflow 等來對 ETL SQL 進行排程。

Rollup 與物化視圖

物化視圖本質是一個預先計算的過程。我們可以在 Base 表上,建立不同的 Rollup 或者物化視圖來對 Base 表進行聚合計算。通常在明細層到彙總層(例如 DWD 層到 DWS 層或從 DWS 層到 ADS 層)的彙聚過程中可以使用物化視圖,以此實作名額的高度聚合。同時物化視圖的計算是實時進行的,是以站在計算的角度也可以将物化視圖了解為一個單表上的實時計算過程。

多表物化視圖

Apache Doris 2.0 将實作多表物化視圖這一功能,可以将帶有 Join 的查詢結果固化以供使用者直接查詢,支援定時自動或手動觸發的方式進行全量更新查詢結果,未來還将進一步支援更加完善的自動增量重新整理。基于多表物化視圖這一功能的實作,我們可以做更複雜的資料流處理,比如資料源側有 TableA、TableB、TableC,在多表物化視圖的情況下,使用者就可以将 TableA 和 TableB 的資料進行實時Join 計算後物化到 MV1 中。在這個角度上來看,多表物化視圖更像一個多流資料實時 Join 的過程。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

如何應對資料更新

在實時資料倉庫建構的過程中,還需要面臨高并發寫入和實時更新的挑戰。如何在億級資料中快速找到需要更新的資料,并對其進⾏更新,⼀直都是⼤資料領域不斷追尋的答案。

1. 高并發資料更新

在 Apache Doris 中通過 Unique Key 模型來滿足資料更新的需求,同時通過 MVCC 多版本并發機制來實作資料的讀寫隔離。當新資料寫入時,如果不存在相同 Key 的資料則會直接寫⼊;如果有相同 Key 的資料則增加版本,此時資料将以多個版本的形式存在。背景會啟動異步的 Compaction 程序對曆史版本資料進⾏清理,當⽤戶在查詢時 Doris 會将最新版本對應的資料傳回給⽤戶,這種設計解決了海量資料的更新問題。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

在 Doris 中提供了 Merge-on-Read 和 Merge-on-Write 兩種資料更新模式。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

在此我們以訂單資料的寫入為例介紹 Merge-on-Read 的資料寫入與查詢流程,三條訂單資料均以 Append 的形式寫⼊ Doris 表中:

  • 資料 Insert:首先我們寫入 ID 為 1,2,3 的三條資料;
  • 資料 Update:當我們将訂單 1 的 Cost 更新為 30 時,其實是寫⼊⼀條 ID 為 1,Cost 為 30 的新版本資料,資料通過 Append 的形式寫⼊ Doris;
  • 資料 Delete:當我們對訂單 2 的資料進⾏删除時,仍然通過 Append ⽅式,将資料多版本寫⼊ Doris ,并将 _DORIS_DELETE_SIGN 字段變為 1 ,則表示這條資料被删除了。當 Doris 讀取資料時,發現最新版本的資料被标記删除,就會将該資料從查詢結果中進⾏過濾。

Merge-on-Read 的特點是寫⼊速度比較快,但是在資料讀取過程中由于需要進⾏多路歸并排序,存在着大量非必要的 CPU 計算資源消耗和 IO 開銷。

是以在 1.2.0 版本中,Apache Doris 在原有的 Unique Key 資料模型上增加了 Merge-on-Write 的資料更新模式。Merge-on-Write 兼顧了寫入和查詢性能。在寫⼊的過程中引⼊了 Delete Bitmap 資料結構,使⽤ Delete Bitmap 标記 RowSet 中某⼀⾏是否被删除,為了保持 Unique Key 原有的語義, Delete Bitmap 也⽀持多版本。另外使⽤了兼顧性能和存儲空間的 Row Bitmap,将 Bitmap 中的 MemTable ⼀起存儲在 BE 中,每個 Segment 會對應⼀個 Bitmap。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉
  • 寫入流程:
  • DeltaWriter 先将資料 Flush 到磁盤
  • 批量檢查所有 Key,在點查過程中經過區間樹,查找到對應的 RowSet。
  • 在 RowSet 内部通過 BloomFilter 和 index 進行⾼效查詢。

當查詢到 Key 對應的 RowSet 後,便會覆寫 RowSet Key 對應的 Bitmap,接着在 Publish 階段更新 Bitmap,從⽽保證批量點查 Key 和更新 Bitmap 期間不會有新的可⻅ RowSet,以保證 Bitmap 在更新過程中資料的正确性。除此之外,如果某個 Segment 沒有被修改,則不會有對應版本的 Bitmap 記錄。

  • 查詢流程:
  • 當我們查詢某⼀版本資料時, Doris 會從 LRU Cache Delete Bitmap 中查找該版本對應的緩存。
  • 如果緩存不存在,再去 RowSet 中讀取對應的 Bitmap。
  • 使⽤ Delete Bitmap 對 RowSet 中的資料進⾏過濾,将結果傳回。

該模式不需要在讀取的時候通過歸并排序來對主鍵進行去重,這對于高頻寫入的場景來說,大大減少了查詢執行時的額外消耗。此外還能夠支援謂詞下推,并能夠很好利用 Doris 豐富的索引,在資料 IO 層面就能夠進行充分的資料裁剪,大大減少資料的讀取量和計算量,是以在很多場景的查詢中都有非常明顯的性能提升。在真實場景的測試中,通過 Merge-on-Write 可以在保證數萬 QPS 的高頻 Upset 操作的同時實作性能 3-10 倍的提升。

2. 部分列更新

部分列更新是一個比較普遍的需求,例如廣告業務中需要在不同的時間點對同一個廣告行為(展示、點選、轉換等)資料的更新。這時可以通過 Aggregate Key 模型的replace_if_not_null實作。具體建表語句如下:

CREATE TABLE IF NOT EXISTS request_log
(
    `session_id` LARGEINT NOT NULL COMMENT "id",

    `imp_time` DATE REPLACE_IF_NOT_NULL COMMENT "展示",  #展示資料更新
    `imp_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT "",

    `click_time` DATE REPLACE_IF_NOT_NULL COMMENT "點選",#點選資料更新
    `click_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT "",

    `conv_time` DATE REPLACE_IF_NOT_NULL COMMENT "轉化",#轉換資料更新
    `conv_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT ""
)
AGGREGATE KEY(`session_id`)
DISTRIBUTED BY HASH(`session_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
           

具體更新過程如下:

(1)更新展示資料

mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query OK, 1 row affected (0.05 sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | NULL       | NULL       | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)
           

(2)更新點選資料

mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query OK, 1 row affected (0.05 sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | NULL       | NULL       | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)
           

(3)更新轉化資料

ysql> insert into request_log(session_id,click_time,click_data)VALUES(1,'2022-12-21','click');
Query OK, 1 row affected (0.03 sec)
{'label':'insert_2649087d8dc046bd_a39d367af1f93ab0', 'status':'VISIBLE', 'txnId':'385667'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | 2022-12-21 | click      | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)

mysql>
           

同時部分列更新還可用于支援畫像場景的寬表列實時更新。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

另外值得期待的是 Apache Doris 的 Unique Key 模型也即将實作部分列更新的功能,可以通過 Apache Doris GitHub 代碼倉庫及官網,關注新版本或新功能的釋出(相關位址可下滑至文章底部擷取)。

如何進一步提升查詢性能

1. 智能物化視圖

物化視圖除了可以作為高度聚合的彙總層外,其更廣泛的定位是加速相對固定的聚合分析場景。物化視圖是指根據預定義的 SQL 分析語句執⾏預計算,并将計算結果持久化到另一張對使用者透明但有實際存儲的表中,在需要同時查詢聚合資料和明細資料以及比對不同字首索引的場景,命中物化視圖時可以獲得更快的查詢性能。在使用物化視圖時需要建⽴ Base 表并基于此建⽴物化視圖,同⼀張 Base 表可以建構多個不同的物化視圖,從不同的次元進⾏統計。物化視圖在查詢過程中提供了智能路由選擇的能力,如果資料在物化視圖中存在會直接查詢物化視圖,如果在物化視圖中不存在才會查詢 Base 表。對于資料寫入或更新時,資料會在寫入 Base 表的同時寫入物化視圖,從⽽讓 Doris 保證物化視圖和 Base 表資料的完全⼀緻性。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

智能路由選擇遵循最⼩比對原則,隻有查詢的資料集⽐物化視圖集合⼩時,才可能⾛物化視圖。如上圖所示智能選擇過程包括選擇最優和查詢改寫兩個部分:

選擇最優

  • 在過濾候選集過程中,被執行的 SQL 語句通過 Where 條件進⾏判斷,Where 條件為advertiser=1。由此可⻅,物化視圖和 Base 表都有該字段,這時的選集是物化視圖和 Base 表。
  • Group By 計算,Group By 字段是 advertiser 和 channel,這兩個字段同時在物化視圖和 Base 表中。這時過濾的候選集仍然是物化視圖和 Base 表。
  • 過濾計算函數,⽐如執⾏ count(distinctuser_id),然後對資料進⾏計算,由于 Count Distinct 的字段 user_id 在物化視圖和 Base 表中都存在,是以過濾結果仍是物化視圖和 Base 表。
  • 選擇最優,通過⼀系列計算,我們發現查詢條件⽆論是 Where 、Group By 還是 Agg Function 關聯的字段,結果都有 Base 表和物化視圖,是以需要進⾏最優選擇。Doris 經過計算發現 Base 表的資料遠⼤于物化視圖,即物化視圖的資料更⼩。

由此過程可⻅,如果通過物化視圖進行查詢,查詢效率更⾼。當我們找到最優查詢計劃,就可以進⾏⼦查詢改寫,将 Count Distinct 改寫成 Bitmap ,從⽽完成物化視圖的智能路由。完成智能路由之後,我們會将 Doris ⽣成的查詢 SQL 發送到 BE 進⾏分布式查詢計算。

2. 分區分桶裁剪

Doris 資料分為兩級分區存儲, 第一層為分區(Partition),目前支援 RANGE 分區和 LIST 分區兩種類型, 第二層為 HASH 分桶(Bucket)。我們可以按照時間對資料進⾏分區,再按照分桶列将⼀個分區的資料進行 Hash 分到不同的桶⾥。在查詢時則可以通過分區分桶裁剪來快速定位資料,加速查詢性能的同時實作高并發。

3. 索引查詢加速

除了分區分桶裁剪, 還可以通過存儲層索引來裁剪需要讀取的資料量,僅以加速查詢:

  • 字首索引:在排序的基礎上快速定位資料
  • Zone Map 索引:維護列中 min/max/null 資訊
  • Bitmap 索引:通過 Bitmap 加速去重、交并查詢
  • Bloom Filter 索引:快速判斷元素是否屬于集合;
  • Invert 反向索引:支援字元串類型的全文檢索;

4. 執行層查詢加速

同時 Apache Doris 的 MPP 查詢架構、向量化執行引擎以及查詢優化器也提供了許多性能優化方式,在此僅列出部分、不做詳細展開:

  • 算子下推:Limit、謂詞過濾等算子下推到存儲層;
  • 向量化引擎:基于 SIMD 指令集優化,充分釋放 CPU 計算能力;
  • Join 優化:Bucket Shuffle Join、Colocate Join 以及 Runtime Filter 等;

行業最佳實踐

截止目前,Apache Doris 在全球範圍内企業使用者規模已超過 1500 家,廣泛應用于數十個行業中。在使用者行為分析、AB 實驗平台、日志檢索分析、使用者畫像分析、訂單分析等方向均有着豐富的應用。在此我們列出了幾個基于 Doris 建構實時資料倉庫的真實案例作為參考:

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

第 1 個案例是較為典型的基于 Doris 建構實時數倉,下層資料源來自 RDS 業務庫、⽂件系統資料以及埋點日志資料。在資料接⼊過程中通過 DataX 進⾏離線資料同步以及通過 Flink CDC 進⾏實時資料同步,在 Doris 内部建構不同的資料分層;最後在上層建構不同的資料應⽤,⽐如⾃助報表、⾃助資料抽取、資料⼤屏。除此之外,它還結合了⾃⼰的應⽤平台建構了資料開發與治理平台,完成了源資料管理、資料分析等操作。

使用收益:

  • 業務計算耗時從之前的兩⼩時降低到三分鐘。
  • 全鍊路的更新報表的時間從周級别更新到⼗分鐘級别。
  • Doris ⾼度相容 MySQL,報表遷移無壓力,開發周期從周級别降至⾄天級别。
如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

第 2 個案例是在某營運服務商的應用,其架構是通過 Flink CDC 将 RDS 的資料同步到 Doris 中,同時通過 Routine Load 直接訂閱 Kafka 中接入的日志資料,然後在 Doris 内部建構實時數倉。在資料排程時, 通過開源 DolphinScheduler 完成資料排程;使⽤ Prometheus+Grafana 進⾏資料監控。

使用收益: 采⽤ Flink+Doris 架構體系後,架構簡潔、元件減少,解決了多架構下的資料的備援存儲,伺服器資源節省了 30%,資料存儲磁盤占⽤節省了 60%,營運成本⼤幅降低。該案例每天在⽤戶的業務場景上,⽀持數萬次的⽤戶的線上查詢和分析。

如何基于 Apache Doris 與 Apache Flink 快速建構極速易用的實時數倉

第 3 個應用是在供應鍊企業,在過去該企業采取了 Hadoop 體系,使用元件⽐較繁多,有 RDS、HBase、Hive、HDFS、Yarn、Kafka 等多個技術棧,在該架構下,查詢性能無法得到有效快速的提升,維護和開發成本一直居高不下。

使用收益: 引入 Doris 之後,将 RDS 的資料通過 Flink CDC 實時同步到 Doris ⾥,伺服器資源成本得到了很⼤的降低。資料的查詢時間從 Spark 的 2~5 ⼩時,縮短到⼗分鐘,查詢效率也⼤⼤提升。在資料的同步過程中,使⽤了 Flink CDC+MySQL 全量加增量的資料同步⽅式,同時還利⽤ Doris 的 Light Schema Change 特性實時同步 Binlog ⾥的 DDL 表結構變更,實作資料接⼊數倉零開發成本。

# 總結

憑借 Apache Doris 豐富的分析功能和 Apache Flink 強大的實時計算能力,已經有越來越多的企業選擇基于 Apache Doris 和 Flink 建構極速易用的實時數倉架構,更多案例歡迎關注 SelectDB 公衆号以及相關技術部落格。後續我們仍會持續提升 Apache Doris 在實時資料處理場景的能力和性能,包括 Unique 模型上的部分列更新、單表物化視圖上的計算增強、自動增量重新整理的多表物化視圖等,後續研發進展也将在社群及時同步。在建構實時資料倉庫架構中遇到任何問題,歡迎聯系社群進行支援。同時也歡迎加入 Apache Doris 社群,一起将 Apache Doris 建設地更加強大!

作者介紹:

王磊, SelectDB 資深大資料研發專家、Apache Doris Contributor、阿裡雲 MVP,具有超 10 年大資料領域工作經驗,對資料治理、資料湖和實時數倉有深入了解和實踐,人氣技術暢銷書《圖解 Spark 大資料快速分析實戰》、《offer 來了:Java 面試核心知識點精講(原理篇&架構篇)》作者。