天天看點

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

作者:王金海@趣頭條

摘要:本文由趣頭條資料平台負責人王金海分享,主要介紹趣頭條 Flink-to-Hive 小時級場景和 Flink-to-ClickHouse 秒級場景,内容分為以下四部分:

  • 一、業務場景與現狀分析
  • 二、Flink-to-Hive 小時級場景
  • 三、Flink-to-ClickHouse 秒級場景
  • 四、未來發展與思考

趣頭條查詢的頁面分為離線查詢頁面和實時查詢頁面。趣頭條今年所實作的改造是在實時查詢中接入了 ClickHouse 計算引擎。根據不同的業務場景,實時資料報表中會展現資料名額曲線圖和詳細的資料名額表。目前資料名額的采集和計算為每五分鐘一個時間視窗,當然也存在三分鐘或一分鐘的特殊情況。資料名額資料全部從 Kafka 實時資料中導出,并導入 ClickHouse 進行計算。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

1.小時級實作架構圖

如下圖所示,Database 中的 Binlog 導出到 Kafka,同時 Log Server 資料也會上報到 Kafka。所有資料實時落地到 Kafka 之後,通過 Flink 抽取到 HDFS。下圖中 HDFS 到 Hive 之間為虛線,即 Flink 并非直接落地到 Hive,Flink 落地到 HDFS 後,再落地到 Hive 的時間可能是小時級、半小時級甚至分鐘級,需要知道資料的 Event time 已經到何時,再觸發 alter table,add partition,add location 等,寫入其分區。

這時需要有一個程式監控目前 Flink 任務的資料時間已經消費到什麼時候,如9點的資料,落地時需要檢視 Kafka 中消費的資料是否已經到達9點,然後在 Hive 中觸發分區寫入。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

2.實作原理

趣頭條主要使用了 Flink 高階版本的一個特性——StreamingFileSink。StreamingFileSink 主要有幾點功能。

  • 第一, forBulkFormat 支援 avro、parquet 格式,即列式存儲格式。
  • 第二, withBucketAssigner 自定義按資料時間分桶,此處會定義一個EventtimeBucket,既按資料時間進行資料落地到離線中。
  • 第三, OnCheckPointRollingPolicy,根據 CheckPoint 時間進行資料落地,在一定的 CheckPoint 時間内資料落地并回穩。按照 CheckPoint 落地還有其它政策,如按照資料大小。
  • 第四, StreamingFileSink 是 Exactly-Once 語義實作。

Flink 中有兩個 Exactly-Once 語義實作,第一個是 Kafka,第二個是 StreamingFileSink。下圖為 OnCheckPointRollingPolicy 設計的每10分鐘落地一次到HDFS檔案中的 demo。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

■ 如何實作 Exactly-Once

下圖左側為一個簡單的二 PC 模型。Coordinator 發送一個 prepare,執行者開始觸發 ack 動作,Coordinator 收到 ack 所有消息後,所有 ack 開始觸發 commit,所有執行者進行落地,将其轉化到 Flink 的模型中,Source 收到 checkpoint barrier 流時,開始觸發一個 snapshot。

每個算子的 CheckPoint、snapshot 都完成之後,CheckPoint 會給 Job Manager 發送 notifyCheckpointComplete。下圖中二階段模型和 Flink 模型左側三條線部分是一緻的。是以用 Flink 可以實作二階段送出協定。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

■ 如何使用 Flink 實作二階段送出協定

首先,StreamingFileSink 實作兩個接口,CheckpointedFunction 和CheckpointListener。CheckpointedFunction 實作 initializeState 和 snapshotState 函數。CheckpointListener 是 notifyCheckpointComplete 的方法實作,是以這兩個接口可以實作二階段送出語義。

  • initializeState

initializeState 在任務啟動時會觸發三個動作。第一個是 commitPendingFile。實時資料落地到 Hdfs 上有三個狀态。第一個狀态是 in-progress ,正在進行狀态。第二個狀态是 pending 狀态,第三個狀态是 finished 狀态。

initializeState 在任務啟動時還會觸發 restoreInProgressFile,算子實時寫入。如果 CheckPoint 還未成功時程式出現問題,再次啟動時 initializeState 會 commit PendingFile,然後采用 Hadoop 2.7+ 版本的 truncate 方式重置或截斷 in-progress 檔案。

  • invoke

實時寫入資料。

  • snapshotState

觸發 CheckPoint 時會将 in-progress 檔案轉化為 pending state,同時記錄資料長度(truncate 方式需要截斷長度)。snapshotState 并非真正将資料寫入 HDFS,而是寫入 ListState。Flink 在 Barrier 對齊狀态時内部實作 Exactly-Once 語義,但是實作外部端到端的 Exactly-Once 語義比較困難。Flink 内部實作 Exactly-Once 通過 ListState,将資料全部存入 ListState,等待所有算子 CheckPoint 完成,再将 ListState 中的資料刷到 HDFS 中。

  • notifyCheckpointComplete

notifyCheckpointComplete 會觸發 pending 到 finished state 的資料寫入。實作方法是 rename,Streaming 不斷向 HDFS 寫入臨時檔案,所有動作結束後通過 rename 動作寫成正式檔案。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

3.跨叢集多 nameservices

趣頭條的實時叢集和離線叢集是獨立的,離線叢集有多套,實時叢集目前有一套。通過實時叢集寫入離線叢集,會産生 HDFS nameservices 問題。在實時叢集中将所有離線叢集的 nameservices 用 namenode HA 的方式全部打入實時叢集并不合适。那麼如何在任務中通過實時叢集送出到各個離線叢集?

如下圖所示,在 Flink 任務的 resource 下面,在 HDFS 的 xml 中間加入 。在 PropertyHong Kong 中添加 nameservices,如 stream 是實時叢集的 namenode HA 配置,data 是即将寫入的離線叢集的 namenode HA 配置。那麼兩個叢集中間的 HDFS set 不需要互相修改,直接可以在用戶端實作。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

4.多使用者寫入權限

實時要寫入離線 HDFS,可能會涉及使用者權限問題。實時送出的使用者已經定義好該使用者在所有程式中都是同一個使用者,但離線中是多使用者的,是以會造成實時和離線使用者不對等。趣頭條在 API 中添加了 withBucketUser 寫 HDFS。配置好 nameservices後,接下來隻需要知道該 HDFS 路徑通過哪個使用者來寫,比如配置一個 stream 使用者寫入。

API 層級的好處是一個 Flink 程式可以指定多個不同的 HDFS 和不同的使用者。多使用者寫入的實作是在 Hadoop file system 中加一個 ugi.do as ,代理使用者。以上為趣頭條使用 Flink 方式進行實時資料同步到 Hive 的一些工作。其中可能會出現小檔案問題,小檔案是背景程式進行定期 merge,如果 CheckPoint 間隔時間較短,如3分鐘一次,會出現大量小檔案問題。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

1.秒級實作架構圖

趣頭條目前有很多實時名額,平均每五分鐘或三分鐘計算一次,如果每一個實時名額用一個 Flink 任務,或者一個 Flink SQL 來寫,比如消費一個 Kafka Topic,需要計算其日活、新增、流程等等當使用者提出一個新需求時,需要改目前的 Flink 任務或者啟動一個新的 Flink 任務消費 Topic。

是以會出現 Flink 任務不斷修改或者不斷起新的 Flink 任務的問題。趣頭條嘗試在 Flink 後接入 ClickHouse,實作整體的 OLAP。下圖為秒級實作架構圖。從 Kafka 到 Flink,到 Hive,到 ClickHouse 叢集,對接外部 Horizon(實時報表),QE(實時 adhoc 查詢),千尋(資料分析),使用者畫像(實時圈人)。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

2.Why Flink+ClickHouse

  • 名額實作 sql 化描述:分析師提出的名額基本都以 SQL 進行描述。
  • 名額的上下線互不影響:一個 Flink 任務消費 Topic,如果還需要其它名額,可以保證名額的上下線互不影響。
  • 資料可回溯,友善異常排查:當日活下降,需要回溯排查是哪些名額口徑的邏輯問題,比如是報的資料差異或是資料流 Kafka 掉了,或者是因為使用者沒有上報某個名額導緻日活下降,而 Flink 則無法進行回溯。
  • 計算快,一個周期内完成所有名額計算:需要在五分鐘内将成百上千的所有次元的名額全部計算完成。
  • 支援實時流,分布式部署,運維簡單:支援 Kafka 資料實時流。

目前趣頭條 Flink 叢集有 100+ 台 32 核 128 G 3.5T SSD,日資料量 2000+ 億,日查詢量 21w+ 次,80% 查詢在 1s 内完成。下圖為單表測試結果。ClickHouse 單表測試速度快。但受制于架構,ClickHouse 的 Join 較弱。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

下圖是處理相對較為複雜的 SQL,count+group by+order by,ClickHouse 在 3.6s内完成 26 億資料計算。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

3.Why ClickHouse so Fast

ClickHouse 采用列式存儲 +LZ4、ZSTD 資料壓縮。其次,計算存儲結合本地化+向量化執行。Presto 資料可能存儲在 Hadoop 叢集或者 HDFS 中,實時拉取資料進行計算。而 ClickHouse 計算存儲本地化是指每一台計算機器存在本地 SSD 盤,隻需要計算自己的資料,再進行節點合并。同時,LSM merge tree+Index。将資料寫入 ClickHouse 之後,會在背景開始一個線程将資料進行 merge,做 Index 索引。如建常見的 DT 索引和小時級資料索引,以提高查詢性能。第四,SIMD+LLVM 優化。SIMD 是單指令多資料集。第五,SQL 文法及 UDF 完善。ClickHouse 對此有很大需求。在資料分析或者次元下拽時需要更高的特性,如時間視窗的一部分功能點。

  • Merge Tree:如下圖所示。第一層為實時資料寫入。背景進行每一層級資料的merge。merge 時會進行資料排序,做 Index 索引。
  • ClickHouse Connector:ClickHouse 有兩個概念,Local table 和Distributed table。一般是寫 Local table ,讀 Distributed table。ClickHouse 一般以 5~10w一個批次進行資料寫入,5s一個周期。趣頭條還實作了 RoundRobinClickHouseDataSource。
  • BalancedClickHouseDataSource :MySQL 中配置一個 IP 和端口号就可以寫入資料,而 BalancedClickHouseDataSource 需要寫 Local 表,是以必須要知道該叢集有多少個 Local 表,每一個 Local 表的 IP 和端口号。如有一百台機器,需要将一百台機器的 IP 和端口号全部配置好,再進行寫入。BalancedClickHouseDataSource 有兩個 schedule。scheduleActualization和 scheduleConnectionsCleaning 。配置一百台機器的 IP 和端口号,會出現某些機器不連接配接或者服務不響應問題,scheduleActualization 會定期發現機器無法連接配接的問題,觸發下線或删除 IP 等動作。scheduleConnectionsCleaning 會定期清理 ClickHouse 中無用的 http 請求。
趣頭條基于 Flink+ClickHouse 建構實時資料分析平台
  • RoundRobinClickHouseDataSource:趣頭條對BalancedClickHouseDataSource 進行加強的結果,實作了三個語義。testOnBorrow 設定為 true,嘗試 ping 看能否擷取連接配接。用 ClickHouse 寫入時是一個 batch,再将 testOnReturn 設定為 false,testWhileIdel 設定為true,填入官方 scheduleActualization 和 scheduleConnectionsCleaning 的功能。ClickHouse 背景不斷進行 merge,如果 insert 過快使背景 merge 速度變慢,跟不上 insert,出現報錯。是以需要盡量不斷往下寫,等寫完目前機器,再寫下一個機器,以5s間隔進行寫入,使 merge 速度能夠盡量與 insert 速度保持一緻。
趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

4.Backfill

Flink 導入 ClickHouse,在資料查詢或展示報表時,會遇到一些問題,比如 Flink 任務出現故障、報錯或資料反壓等,或 ClickHouse 叢集出現不可響應,zk 跟不上,insert 過快或叢集負載等問題,這會導緻整個任務出現問題。

如果流資料量突然暴增,啟動 Flink 可能出現一段時間内不斷追資料的情況,需要進行調整并行度等操作幫助 Flink 追資料。但這時已經出現資料積壓,若還要加大 Flink 并發度處理資料,ClickHouse 限制 insert 不能過快,否則會導緻惡性循環。是以當 Flink 故障或 ClickHouse 叢集故障時,等待 ClickHouse 叢集恢複後,Flink 任務從最新資料開始消費,不再追過去一段時間的資料,通過 Hive 将資料導入到 ClickHouse。

由于之前已經通過 Kafka 将資料實時落地到 Hive,通過 Hive 将資料寫入 ClickHouse 中。ClickHouse 有分區,隻需要将上一個小時的資料删除,導入 Hive 的一小時資料,就可以繼續進行資料查詢操作。Backfill 提供了 Flink 任務小時級容錯以及 ClickHouse 叢集小時級容錯機制。

趣頭條基于 Flink+ClickHouse 建構實時資料分析平台

未來發展與思考

1.Connector SQL 化

目前, Flink-to-Hive 以及 Flink-to-ClickHouse 都是趣頭條較為固化的場景,隻需指定 HDFS 路徑以及使用者,其餘過程都可以通過 SQL 化描述。

2.Delta lake

Flink 是流批一體計算引擎,但是沒有流批一體的存儲。趣頭條會用 HBase、Kudu、Redis 等能夠與 Flink 實時互動的 KV 存儲進行資料計算。如計算新增問題,目前趣頭條的方案是需要将 Hive 曆史使用者刷到 Redis 或 HBase 中,與 Flink 進行實時互動判斷使用者是否新增。

但因為 Hive 中的資料和 Redis 中的資料是存儲為兩份資料。其次 Binlog 抽取資料會涉及 delete 動作,Hbase,Kudu 支援資料修改,定期回到 Hive 中。帶來的問題是 HBase,Kudu 中存在資料,Hive 又儲存了一份資料,多出一份或多份資料。如果有流批一體的存儲支援上述場景,當 Flink 任務過來,可以與離線資料進行實時互動,包括實時查詢 Hive 資料等,可以實時判斷使用者是否新增,對資料進行實時修改、更新或 delete,也能支援 Hive 的批的動作存儲。

未來,趣頭條考慮對 Flink 做流批的存儲,使 Flink 生态統一為流批結合。

作者介紹:

王金海,10 年網際網路曆練,先後在唯品會負責使用者畫像系統,提供人群的個性化營銷服務;餓了麼擔任架構師,負責大資料任務排程、中繼資料開發、任務畫像等工作;現為趣頭條資料中心平台負責人,負責大資料基礎計算層(spark、presto、flink、clickhouse)、平台服務層(libra 實時計算、kepler 離線排程)、資料産品層(qe即時查詢、horizon 資料報表、metadata 中繼資料、資料權限等)、以及團隊建設。