天天看點

實時數倉模型(持續更新ing)

背景

都在說實時資料架構,你了解多少?​mp.weixin.qq.com

實時數倉模型(持續更新ing)

早期資料倉庫建構主要指的是把企業的業務資料庫如 ERP、CRM、SCM 等資料按照決策分析的要求模組化并彙總到資料倉庫引擎中,其應用以報表為主,目的是支援管理層和業務人員決策(中長期政策型決策)。随着業務和環境的發展,這兩方面都在發生着劇烈變化。

  • 随着IT技術走向網際網路、移動化,資料源變得越來越豐富,在原來業務資料庫的基礎上出現了非結構化資料,比如網站 log,IoT 裝置資料,APP 埋點資料等,這些資料量比以往結構化的資料大了幾個量級,對 ETL 過程、存儲都提出了更高的要求;
  • 網際網路的線上特性也将業務需求推向了實時化,随時根據目前客戶行為而調整政策變得越來越常見,比如大促過程中庫存管理,營運管理等(即既有中遠期政策型,也有短期操作型);同時公司業務網際網路化之後導緻同時服務的客戶劇增,有些情況人工難以完全處理,這就需要機器自動決策。比如欺詐檢測和使用者稽核。

正常的大資料開發主要包含離線大資料開發和實時大資料開發,也就是批處理和流處理,這兩塊的處理方式不一樣,一般離線的主要是:Hive、Spark等等,實時的主要是:Flink、Storm、StructedStreaming等等,目前常用的大資料架構分為:離線大資料架構、Lambda 架構和Kappa 架構。

離線大資料架構(最常見的離線數倉模型):

資料源通過離線的方式導入到離線數倉中,下遊應用根據業務需求選擇直接讀取 DM 或加一層資料服務,比如 MySQL 或 Redis,資料存儲引擎是 HDFS/Hive,ETL 工具可以是 MapReduce 、Spark或 HiveSQL。資料倉庫從模型層面分為操作資料層 ODS、資料倉庫明細層 DWD、資料集市層 DM,最後到APP層做資料展現。

離線數倉主要基于sqoop、datax、hive等技術來建構 T+1 的離線資料,通過定時任務每天垃取增量資料導入到hive表中,然後建立各個業務相關的主題,對外提供T+1的資料查詢接口。

問題:

  • 主要是離線的資料開發,對于實時的資料開發不支援;
  • 數倉模型、資料名額體系、如果設計的不合理,會導緻資料備援和重複開發;

Lambda 架構:

為了計算一些實時名額,就在原來離線數倉的基礎上增加了一個實時計算的鍊路,并對資料源做流式改造(即把資料發送到消息隊列),實時計算去訂閱消息隊列,直接完成名額增量的計算,推送到下遊的資料服務中去,由資料服務層完成離線&實時結果的合并。

實時數倉主要是基于資料采集工具,如canal等原始資料寫入到kafka這樣的資料通道中,最後一般都是寫入到類似于HBase這樣的OLAP存儲系統中。對外提供分鐘級别,甚至秒級别的查詢方案。

  • 同樣的需求需要開發兩套一樣的代碼:這是 Lambda 架構最大的問題,兩套代碼不僅僅意味着開發困難(同樣的需求,一個在批處理引擎上實作,一個在流處理引擎上實作,還要分别構造資料測試保證兩者結果一緻),後期維護更加困難,比如需求變更後需要分别更改兩套代碼,獨立測試結果,且兩個作業需要同步上線。
  • 資源占用增多:同樣的邏輯計算兩次,整體資源占用會增多(多出實時計算這部分)
實時數倉模型(持續更新ing)

Kappa 架構:

Lambda 架構雖然滿足了實時的需求,但帶來了更多的開發與運維工作,其架構背景是流處理引擎還不完善,流處理的結果隻作為臨時的、近似的值提供參考。後來随着 Flink 等流處理引擎的出現,流處理技術很成熟了,這時為了解決兩套代碼的問題,LickedIn 的 Jay Kreps 提出了 Kappa 架構。

  • Kappa 架構可以認為是 Lambda 架構的簡化版(隻要移除 lambda 架構中的批處理部分即可)。
  • 在 Kappa 架構中,需求修改或曆史資料重新處理都通過上遊重放完成。
  • Kappa 架構最大的問題是流式重新處理曆史的吞吐能力會低于批處理,但這個可以通過增加計算資源來彌補。
  • Kappa 架構可能也需要利用離線的資料進行校驗。

Lambda 架構與 Kappa 架構的對比

實時數倉模型(持續更新ing)
  1. 在真實的場景中,很多時候并不是完全規範的 Lambda 架構或 Kappa 架構,可以是兩者的混合,比如大部分實時名額使用 Kappa 架構完成計算,少量關鍵名額(比如金額相關)使用 Lambda 架構用批處理重新計算,增加一次校對過程。
  2. Kappa 架構并不是中間結果完全不落地,現在很多大資料系統都需要支援機器學習(離線訓練),是以實時中間結果需要落地對應的存儲引擎供機器學習使用,另外有時候還需要對明細資料查詢,這種場景也需要把實時明細層寫出到對應的引擎中。
注意:實時數倉架構和資料中台一樣,雖然都是屬于目前比較熱門的概念,但是對于實時數倉的狂熱追求大可不必,首先,在技術上幾乎沒有難點,基于強大的開源中間件實作實時資料倉庫的需求已經變得沒有那麼困難。其次,實時數倉的建設一定是伴随着業務的發展而發展,武斷的認為Kappa架構一定是最好的實時數倉架構是不對的。實際情況中随着業務的發展數倉的架構變得沒有那麼非此即彼。

實時數倉模型

實時數倉模型(持續更新ing)

實時數倉需要解決的問題:

1)第一,要支援同時讀寫,就意味着你寫的時候還可以讀,不應該讀到一個錯誤的結果。同時還可以支援多個寫,且能保證資料的一緻性;

2)第二,可以高吞吐地從大表讀取資料。大資料方案不能有諸多限制,比如,我聽說有些方案裡最多隻可以支援幾個并發讀,或者讀的檔案太多了就不讓你送出作業了。如果這樣,對業務方來說,你的整個設計是不滿足他的需求的;

3)第三,錯誤是無可避免,你要可以支援復原,可以重做,或者可以删改這個結果,不能為了支援删改而要求業務方去做業務邏輯的調整;

4)第四,在重新改變業務邏輯的時候要對資料做重新處理,這個時候,業務是不能下線的。在資料被重新處理完成之前,資料湖的資料是要一直可被通路的;

5)第五,因為有諸多原因,資料可能會有晚到的情況,你要能處理遲到資料而不推遲下階段的資料處理。

基于以上五點,我們基于Flink或者Structured Streaming 産生了一個新的批流一體化架構。

實時資料體系大緻分為三類場景:流量類、業務類和特征類,這三種場景各有不同。

  • 在資料模型上,流量類是扁平化的寬表,業務數倉更多是基于範式的模組化,特征資料是 KV 存儲;
  • 從資料來源區分,流量數倉的資料來源一般是日志資料,業務數倉的資料來源是業務 binlog 資料,特征數倉的資料來源則多種多樣;
  • 從資料量而言,流量和特征數倉都是海量資料,每天十億級以上,而業務數倉的資料量一般每天百萬到千萬級;
  • 從資料更新頻率而言,流量資料極少更新,則業務和特征資料更新較多,流量資料一般關注時序和趨勢,業務資料和特征資料關注狀态變更;
  • 在資料準确性上,流量資料要求較低,而業務資料和特征資料要求較高。

實時數倉的實施關鍵點:

  1. 端到端資料延遲、資料流量的監控
  2. 故障的快速恢複能力
  3. 資料的回溯處理,系統支援消費指定時間段内的資料
  4. 實時資料從實時數倉中查詢,T+1資料借助離線通道修正
  5. 資料地圖、資料血緣關系的梳理
  6. 業務資料品質的實時監控,初期可以根據規則的方式來識别品質狀況

其實,你需要的不是實時數倉,需要的是一款合适且強大的OLAP資料庫。

實時數倉模型(持續更新ing)
  • 接入層:該層利用各種資料接入工具收集各個系統的資料,包括 binlog 日志、埋點日志、以及後端服務日志,資料會被收集到 Kafka 中;這些資料不隻是參與實時計算,也會參與離線計算,保證明時和離線的原始資料是統一的;
  • 存儲層:該層對原始資料、清洗關聯後的明細資料進行存儲,基于統一的實時資料模型分層理念,将不同應用場景的資料分别存儲在 Kafka、HDFS、Kudu、 Clickhouse、Hbase、Redis、Mysql 等存儲引擎中,各種存儲引擎存放的具體的資料類型在實時資料模型分層部分會詳細介紹;
  • 計算層:計算層主要使用 Flink、Spark、Presto 以及 ClickHouse 自帶的計算能力等四種計算引擎,Flink 計算引擎主要用于實時資料同步、 流式 ETL、關鍵系統秒級實時名額計算場景,Spark SQL 主要用于複雜多元分析的準實時名額計算需求場景,Presto 和 ClickHouse 主要滿足多元自助分析、對查詢響應時間要求不太高的場景;
  • 平台層:在平台層主要做三個方面的工作,分别是對外提供​​​​統一查詢服務、中繼資料及名額管理、資料品質及血緣;
  • 應用層:以統一查詢服務對各個業務線資料場景進行支援,業務主要包括實時大屏、實時資料産品、實時 OLAP、實時特征等。

是以所謂的實時數倉就是在之前的Lambda的資料結構中,将離線的資料開發和實時的資料開發合并到一起,也就是所謂的流批一體化,這個實作方式主要是:

log/mysql/hive(資料源) -> SparkCore/Sparksql/hive(資料處理) -> hdfs/mysql等(資料存儲)

kafka/binlog(資料源) ->StructuredStreaming/flink(資料處理) -> mysql/redis/es等(資料存儲)

将這兩個處理邏輯合并。

基于StructuredStreaming的批流一體化(離線開發為核心)

StructuredStreaming的批流一體化,雖然能加實時和離線整合起來,但是spark是以離線開發為主。

案例:

實時擷取log日志,做實時的資料大屏展現,并落庫到HDFS作為ODS資料源。

實時數倉模型(持續更新ing)

而且StructuredStreaming可以完成離線的資料存儲(FileSink,寫到檔案體系中),同時完成實時的資料存儲(ForeachSink,自定義存儲方式)。

例如:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import org.json._
import utils._


/**
  * Created by gaowei
  */
object ZooMessage {
  case class Log(timestamp : String, hostname : String, name : String, version : String, index : String,
                  input_type : String, offset : String, source : String, types : String, uid : String,
                  zid : String, text : String, sessionKey:String,ds:String)

  def logParing(log : String) = {
      val str = Config.decodeUFT8(log)
      try {
        var json = new JSONObject()
        if (str.startsWith("\"")){
          json = new JSONObject(str.replace("\\", "").dropRight(1).drop(1))
        }
        json =  new JSONObject(str)
        val timestamp = json.optString("@timestamp", "")
        val ds = timestamp.split("T")()
        val beat = json.getJSONObject("beat")
        val hostname = beat.optString("hostname", "")
        val name = beat.optString("name", "")
        val version = beat.optString("version", "")
        val index = json.optString("index", "")
        val input_type = json.optString("input_type", "")
        val message = json.optString("message", "")
        var uid = ""
              

繼續閱讀