作者:夏軍@小米
小米業務線衆多,從資訊流,電商,廣告到金融等覆寫了衆多領域,小米流式平台為小米集團各業務提供一體化的流式資料解決方案,主要包括資料采集,資料內建和流式計算三個子產品。目前每天資料量達到 1.2 萬億條,實時同步任務 1.5 萬,實時計算的資料 1 萬億條。
伴随着小米業務的發展,流式平台也經曆三次大更新改造,滿足了衆多業務的各種需求。最新的一次疊代基于 Apache Flink,對于流式平台内部子產品進行了徹底的重構,同時小米各業務也在由 Spark Streaming 逐漸切換到 Flink。
背景介紹
小米流式平台的願景是為小米所有的業務線提供流式資料的一體化、平台化解決方案。具體來講包括以下三個方面:
- 流式資料存儲:流式資料存儲指的是消息隊列,小米開發了一套自己的消息隊列,其類似于 Apache kafka,但它有自己的特點,小米流式平台提供消息隊列的存儲功能;
- 流式資料接入和轉儲:有了消息隊列來做流式資料的緩存區之後,繼而需要提供流式資料接入和轉儲的功能;
- 流式資料處理:指的是平台基于 Flink、Spark Streaming 和 Storm 等計算引擎對流式資料進行處理的過程。

下圖展示了流式平台的整體架構。從左到右第一列橙色部分是資料源,包含兩部分,即 User 和 Database。
- User 指的是使用者各種各樣的埋點資料,如使用者 APP 和 WebServer 的日志,其次是 Database 資料,如 MySQL、HBase 和其他的 RDS 資料。
- 中間藍色部分是流式平台的具體内容,其中 Talos 是小米實作的消息隊列,其上層包含 Consumer SDK 和 Producer SDK。
- 此外小米還實作了一套完整的 Talos Source,主要用于收集剛才提到的使用者和資料庫的全場景的資料。
Talos Sink 和 Source 共同組合成一個資料流服務,主要負責将 Talos 的資料以極低的延遲轉儲到其他系統中;Sink 是一套标準化的服務,但其不夠定制化,後續會基于 Flink SQL 重構 Talos Sink 子產品。
下圖展示了小米的業務規模。在存儲層面小米每天大概有 1.2 萬億條消息,峰值流量可以達到 4300 萬條每秒。轉儲子產品僅 Talos Sink 每天轉儲的資料量就高達 1.6 PB,轉儲作業目前将近有 1.5 萬個。每天的流式計算作業超過 800 個,Flink 作業超過 200 個,Flink 每天處理的消息量可以達到 7000 億條,資料量在 1 PB 以上。
小米流式平台發展曆史
小米流式平台發展曆史分為如下三個階段:
- Streaming Platform 1.0:小米流式平台的 1.0 版本建構于 2010 年,其最初使用的是 Scribe、Kafka 和 Storm,其中 Scribe 是一套解決資料收集和資料轉儲的服務。
- Streaming Platform 2.0:由于 1.0 版本存在的種種問題,我們自研了小米自己的消息隊列 Talos,還包括 Talos Source、Talos Sink,并接入了 Spark Streaming。
- Streaming Platform 3.0:該版本在上一個版本的基礎上增加了 Schema 的支援,還引入了 Flink 和 Stream SQL。
Streaming Platform 1.0 整體是一個級聯的服務,前面包括 Scribe Agent 和 Scribe Server 的多級級聯,主要用于收集資料,然後滿足離線計算和實時計算的場景。離線計算使用的是 HDFS 和 Hive,實時計算使用的是 Kafka 和 Storm。雖然這種離線加實時的方式可以基本滿足小米當時的業務需求,但也存在一系列的問題。
- 首先是 Scribe Agent 過多,而配置和包管理機制缺乏,導緻維護成本非常高;
- Scribe 采用的 Push 架構,異常情況下無法有效緩存資料,同時 HDFS / Kafka 資料互相影響;
- 最後資料鍊級聯比較長的時候,整個全鍊路資料黑盒,缺乏監控和資料檢驗機制。
為了解決 Streaming Platform 1.0 的問題,小米推出了 Streaming Platform 2.0 版本。該版本引入了 Talos,将其作為資料緩存區來進行流式資料的存儲,左側是多種多樣的資料源,右側是多種多樣的 Sink,即将原本的級聯架構轉換成星型架構,優點是友善地擴充。
- 由于 Agent 自身數量及管理的流較多(具體資料均在萬級别),為此該版本實作了一套配置管理和包管理系統,可以支援 Agent 一次配置之後的自動更新和重新開機等。
- 此外,小米還實作了去中心化的配置服務,配置檔案設定好後可以自動地分發到分布式結點上去。
- 最後,該版本還實作了資料的端到端監控,通過埋點來監控資料在整個鍊路上的資料丢失情況和資料傳輸延遲情況等。
Streaming Platform 2.0 的優勢主要有:
- 引入了 Multi Source & Multi Sink,之前兩個系統之間導資料需要直接連接配接,現在的架構将系統內建複雜度由原來的 O(M*N) 降低為 O(M+N);
- 引入配置管理和包管理機制,徹底解決系統更新、修改和上線等一系列問題,降低運維的壓力;
- 引入端到端資料監控機制,實作全鍊路資料監控,量化全鍊路資料品質;
- 産品化解決方案,避免重複建設,解決業務運維問題。
下圖詳細介紹一下 MySQL 同步的案例,場景是将 MySQL 的一個表通過上述的機制同步到消息隊列 Talos。具體流程是 Binlog 服務僞裝成 MySQL 的 Slave,向 MySQL 發送 Dump binlog 請求;MySQL 收到 Dump 請求後,開始推動 Binlog 給 Binlog 服務;Binlog 服務将 binlog 以嚴格有序的形式轉儲到 Talos。之後會接入 Spark Streaming 作業,對 binlog 進行解析,解析結果寫入到 Kudu 表中。目前平台支援寫入到 Kudu 中的表的數量級超過 3000 個。
Agent Source 的功能子產品如下圖所示。其支援 RPC、Http 協定,并可以通過 File 來監聽本地檔案,實作記憶體和檔案雙緩存,保證資料的高可靠。平台基于 RPC 協定實作了 Logger Appender 和 RPC 協定的 SDK;對于 Http 協定實作了 HttpClient;對于檔案實作了 File Watcher 來對本地檔案進行自動地發現和掃描,Offset Manager 自動記錄 offset;Agent 機制與 K8S 環境深度整合,可以很容易地和後端的流式計算等相結合。
下圖是 Talos Sink 的邏輯流程圖,其基于 Spark Streaming 來實作一系列流程。最左側是一系列 Talos Topic 的 Partition 分片,基于每個 batch 抽象公共邏輯,如 startProcessBatch() 和 stopProcessBatch(),不同 Sink 隻需要實作 Write 邏輯;不同的 Sink 獨立為不同的作業,避免互相影響;Sink 在 Spark Streaming 基礎上進行了優化,實作了根據 Topic 流量進行動态資源排程,保證系統延遲的前提下最大限度節省資源。
下圖是平台實作的端到端資料監控機制。具體實作是為每個消息都有一個時間戳 EventTime,表示這個消息真正生成的時間,根據 EventTime 來劃分時間視窗,視窗大小為一分鐘,資料傳輸的每一跳統計目前時間視窗内接受到的消息數量,最後統計出消息的完整度。延遲是計算某一跳 ProcessTime 和 EventTime 之間的內插補點。
Streaming Platform 2.0 目前的問題主要有三點:
- Talos 資料缺乏 Schema 管理,Talos 對于傳入的資料是不了解的,這種情況下無法使用 SQL 來消費 Talos 的資料;
- Talos Sink 子產品不支援定制化需求,例如從 Talos 将資料傳輸到 Kudu 中,Talos 中有十個字段,但 Kudu 中隻需要 5 個字段,該功能目前無法很好地支援;
- Spark Streaming 自身問題,不支援 Event Time,端到端 Exactly Once 語義。
基于 Flink 的實時數倉
為了解決 Streaming Platform 2.0 的上述問題,小米進行了大量調研,也和阿裡的實時計算團隊做了一系列溝通和交流,最終決定将使用 Flink 來改造平台目前的流程,下面具體介紹小米流式計算平台基于Flink的實踐。
使用 Flink 對平台進行改造的設計理念如下:
- 全鍊路 Schema 支援,這裡的全鍊路不僅包含 Talos 到 Flink 的階段,而是從最開始的資料收集階段一直到後端的計算處理。需要實作資料校驗機制,避免資料污染;字段變更和相容性檢查機制,在大資料場景下,Schema 變更頻繁,相容性檢查很有必要,借鑒 Kafka 的經驗,在 Schema 引入向前、向後或全相容檢查機制。
- 借助 Flink 社群的力量全面推進 Flink 在小米的落地,一方面 Streaming 實時計算的作業逐漸從 Spark、Storm 遷移到 Flink,保證原本的延遲和資源節省,目前小米已經運作了超過 200 個 Flink 作業;另一方面期望用 Flink 改造 Sink 的流程,提升運作效率的同時,支援 ETL,在此基礎上大力推進 Streaming SQL;
- 實作 Streaming 産品化,引入 Streaming Job 和 Streaming SQL 的平台化管理;
- 基于 Flink SQL 改造 Talos Sink,支援業務邏輯定制化
下圖是 Streaming Platform 3.0 版本的架構圖,與 2.0 版本的架構設計類似,隻是表達的角度不同。具體包含以下幾個子產品:
- 抽象 Table:該版本中各種存儲系統如 MySQL 和 Hive 等都會抽象成 Table,為 SQL 化做準備。
- Job 管理:提供 Streaming 作業的管理支援,包括多版本支援、配置與Jar分離、編譯部署和作業狀态管理等常見的功能。
- SQL 管理:SQL 最終要轉換為一個 Data Stream 作業,該部分功能主要有 Web IDE 支援、Schema 探查、UDF/維表 Join、SQL 編譯、自動建構 DDL 和 SQL 存儲等。
- Talos Sink:該子產品基于 SQL 管理對 2.0 版本的 Sink 重構,包含的功能主要有一鍵建表、Sink 格式自動更新、字段映射、作業合并、簡單 SQL 和配置管理等。前面提到的場景中,基于 Spark Streaming 将 Message 從 Talos 讀取出來,并原封不動地轉到 HDFS 中做離線數倉的分析,此時可以直接用 SQL 表達很友善地實作。未來希望實作該子產品與小米内部的其他系統如 ElasticSearch 和 Kudu 等進行深度整合,具體的場景是假設已有 Talos Schema,基于 Talos Topic Schema 自動幫助使用者建立 Kudu 表。
- 平台化:為使用者提供一體化、平台化的解決方案,包括調試開發、監控報警和運維等。
Job 管理
Job 管理提供 Job 全生命周期管理、Job 權限管理和 Job 标簽管理等功能;支援Job 運作曆史展示,友善使用者追溯;支援 Job 狀态與延遲監控,可以實作失敗作業自動拉起。
SQL 管理
主要包括以下四個環節:
- 将外部表轉換為 SQL DDL,對應 Flink 1.9 中标準的 DDL 語句,主要包含 Table Schema、Table Format 和 Connector Properities。
- 基于完整定義的外部 SQL 表,增加 SQL 語句,既可以得到完成的表達使用者的需求。即 SQL Config 表示完整的使用者預計表達,由 Source Table DDL、Sink Table DDL 和 SQL DML語句組成。
- 将 SQL Config 轉換成 Job Config,即轉換為 Stream Job 的表現形式。
- 将 Job Config 轉換為 JobGraph,用于送出 Flink Job。
外部表轉換成 SQL DDL 的流程如下圖所示。
- 首先根據外部表擷取 Table Schema 和 Table Format 資訊,後者用于反解資料,如對于 Hive 資料反序列化;
- 然後再後端生成預設的 Connector 配置,該配置主要分為三部分,即不可修改的、帶預設值的使用者可修改的、不帶預設值的使用者必須配置的。
不可修改的配置情況是假設消費的是 Talos 元件,那麼 connector.type 一定是 talos,則該配置不需要改;而預設值是從 Topic 頭部開始消費,但使用者可以設定從尾部開始消費,這種情況屬于帶預設值但是使用者可修改的配置;而一些權限資訊是使用者必須配置的。
之是以做三層配置管理,是為了盡可能減少使用者配置的複雜度。Table Schema、Table Format 和 Connector 1 其他配置資訊,組成了SQL DDL。将 SQL Config 傳回給使用者之後,對于可修改的需要使用者填寫,這樣便可以完成從外部表到 SQL DDL 的轉換,紅色字型表示的是使用者修改的資訊。
SQL 管理引入了一個 External Table 的特性。假設使用者在平台上選擇消費某個 Topic 的時候,該特性會自動地擷取上面提到的 Table 的 Schema 和 Format 資訊,并且顯示去掉了注冊 Flink Table 的邏輯;擷取 Schema 時,該特性會将外部表字段類型自動轉換為 Flink Table 字段類型,并自動注冊為 Flink Tab 了。同時将 Connector Properties 分成三類,參數帶預設值,隻有必須項要求使用者填寫;所有參數均采用 Map 的形式表達,非常便于後續轉化為 Flink 内部的 TableDescriptor。
上面介紹了 SQL DDL 的建立過程,在已經建立的 SQL DDL 的基礎上,如 Source SQL DDL 和 Sink SQL DDL,要求使用者填寫 SQL query 并傳回給後端,後端會對 SQL 進行驗證,然後會生成一個 SQL Config,即一個 SQL 語句的完整表達。
SQL Config 轉換為 Job Config 的流程如下圖所示。
- 首先在 SQL Config 的基礎上增加作業所需要的資源、Job 的相關配置(Flink 的 state 參數等);
- 然後将 SQLConfig 編譯成一個 Job Descriptor,即 Job Config 的描述,如 Job 的 Jar 包位址、MainClass 和 MainArgs 等。
下圖展示了 Job Config 轉換為 Job Graph 的過程。對于 DDL 中的 Schema、Format 和 Property 是和 Flink 中的 Table Descriptor 是一一對應的,這種情況下隻需要調用 Flink 的相關内置接口就可以很友善地将資訊轉換為 Table Descriptor,如 CreateTableSource()、RegistorTableSource() 等。通過上述過程,DDL 便可以注冊到 Flink 系統中直接使用。對于 SQL 語句,可以直接使用 TableEnv 的 sqlUpdate() 可以完成轉換。
SQL Config 轉換為一個 Template Job 的流程如下所示。前面填寫的 Jar 包位址即該 Template 的 Jar 位址,MainClass 是該 Template Job。假設已經有了 SQL DDL,可以直接轉換成 Table Descriptor,然後通過 TableFactorUtil 的 findAndCreateTableSource() 方法得到一個 Table Source,Table Sink 的轉換過程類似。完成前兩步操作後,最後進行 sqlUpdate() 操作。這樣便可以将一個 SQL Job 轉換為最後可執行的 Job Graph 送出到叢集上運作。
Talos Sink 采用了下圖所示的三種模式:
- Row:Talos 的資料原封不動地灌到目标系統中,這種模式的好處是資料讀取和寫入的時候無需進行序列化和反序列化,效率較高;
- ID mapping:即左右兩邊字段進行 mapping,name 對應 field_name,timestamp 對應 timestamp,其中 Region 的字段丢掉;
- SQL:通過 SQL 表達來表示邏輯上的處理。
未來規劃
小米流式平台未來的計劃主要有以下幾點:
- 在 Flink 落地的時候持續推進 Streaming Job 和平台化建設;
- 使用 Flink SQL 統一離線數倉和實時數倉;
- 在 Schema 的基礎上資料血緣分析和展示,包括資料治理方面的内容;
- 持續參與 Flink 社群的建設。
作者介紹:
夏軍,小米流式平台負責人,主要負責流式計算,消息隊列,大資料內建等系統的研發工作,主要包括 Flink,Spark Streaming,Storm,Kafka 等開源系統和一系列小米自研的相關系統。