天天看點

【譯】Databricks使用Spark Streaming和Delta Lake對流式資料進行資料品質監控介紹

原文連結: https://databricks.com/blog/2020/03/04/how-to-monitor-data-stream-quality-using-spark-streaming-and-delta-lake.html 在這個一切都需要進行加速的時代,流資料的使用變得越來越普遍。我們經常不再聽到客戶問:“我可以流式傳輸這些資料嗎?”,更多的是問:“我們能以多快的速度流式傳輸這些資料?”,而諸如Kafka和Delta Lake之類技術的普及更突顯了這一勢頭。我們認為傳統流式資料傳輸的一種形式是以非常快的速度到達的半結構化或非結構化(例如JSON)資料,通常情況下一批資料的量也比較小。這種形式的工作場景橫跨各行各業,舉一個這樣的客戶案例,某個證券交易所和資料提供商,他們負責每分鐘流式傳輸數十萬個資料項目,包括股票行情、新聞、報價及其他财務資料。該顧客使用Databricks、Delta Lake以及 Structured Streaming

,實時高可用地處理和分析這些流式資料。但是,随着使用流式資料普遍性的提升,我們見到了另一種類型的客戶,他們使用流式技術進行低頻次、類批處理的資料處理方式。在這種架構下,流式資料處理扮演的角色通常為監控特定的目錄、S3存儲桶或其他存放資料的區域,并且會在資料到達之後立即自動處理資料,這種架構消除了傳統排程的許多負擔,特别是在任務失敗或隻需要處理部分資料的情況下。所有這些應用場景都表明,流式技術已經不再隻是用于實時或類實時的資料計算。

盡管流式技術的出現有許多積極的方面,但這種體系結構也帶來了一些麻煩。特别是,曆史上一直存在着一個權衡:我們是要高品質的資料還是高速資料?實際上,這不是一個有意義的問題,對于所有實際操作來說,品質都必須與速度相關聯,為了實作高速度,我們需要高品質的資料。畢竟,低品質、高速度的資料通常都需要分批進行進一步的處理;另一方面,高品質、低速度的資料不能滿足許多現代場景的需要。随着越來越多的公司将流式傳輸資料作為其資料處理體系結構的關鍵,速度和品質都必須同時提高。

在本博文中,我們将深入探讨一種資料管理架構,該架構可以在資料到達時,通過主動監控和分析來檢測流式資料中損壞或不良的資料,并且不會造成瓶頸。

建構流式資料分析和監控流程

在Databricks,我們看到客戶中不斷湧現出許多資料處理模式,這些新模式的産生推動了可能的極限,在速度和品質問題上也不例外。為了幫助解決這一沖突,我們開始考慮使用正确的工具,不僅可以支援所需的資料速度,還可以提供可接受的資料品質水準。Structured Streaming和Delta Lake非常适合用于資料擷取和存儲層,因為他們能夠配合創造一個具有擴充性、容錯性和類實時的系統,并且具有exactly-once處理

保證

為企業資料品質分析找到可接受的工具要困難一些,特别是這個工具需要具有對資料品質名額的狀态彙總的能力。另外,還需要能夠對整個資料集進行檢查(例如檢測出多少比例的記錄為空值),這些都會随着所提取的資料量的增加而增加計算成本。這對所有流式系統而言都是需要的,這一要求就排除了很多可用的工具。

在我們最初的解決方案中,我們選擇了Amazon的資料品質檢測工具

Deequ

,因為它能提供簡單而強大的API,有對資料品質名額進行狀态聚合的能力,以及對Scala的支援。将來,其他Spark原生的工具将提供額外的選擇。

【譯】Databricks使用Spark Streaming和Delta Lake對流式資料進行資料品質監控介紹

流式資料品質監控的實作

我們通過在EC2執行個體上運作一個小型的Kafka producer來模拟資料流,該執行個體将模拟的股票交易資訊寫入Kafka topic,并使用原生的Databricks連接配接器将這些資料導入到Delta Lake表當中。為了展示Spark Streaming中資料品質檢查的功能,我們選擇在整個流程中實作Deequ的不同功能:

  • 根據曆史資料生成限制條件;
  • 使用 foreachBatch 算子對到達的資料進行增量品質分析;
  • 使用foreachBatch算子對到達的資料執行(較小的)單元測試,并将品質不佳的batch隔離到品質不佳記錄表中;
  • 對于每個到達的batch,将最新的狀态名額寫入到Delta表當中;
  • 對整個資料集定期執行(較大的)單元測試,并在MLFlow中跟蹤結果;
  • 根據驗證結果發送通知(如通過電子郵件或Slack);
  • 捕獲MLFlow中的名額以進行可視化和記錄。

我們結合了

MLFlow

來跟蹤一段時間内資料性能名額的品質、Delta表的版本疊代以及結合了一個用于通知和告警的Slack連接配接器。整個流程可以用如下的圖檔進行表示:

【譯】Databricks使用Spark Streaming和Delta Lake對流式資料進行資料品質監控介紹

由于Spark中具有統一的批處理/流式處理接口,是以我們能夠在這個流程的任何位置提取報告、告警和名額,作為實時更新或批處理快照。這對于設定觸發器或限制特别有用,是以,如果某個名額超過了門檻值,則可以執行資料品質改善措施。還要注意的是,我們并沒有對初始到達的原始資料造成影響,這些資料将立即送出到我們的Delta表,這意味着我們不會限制資料輸入的速率。下遊系統可以直接從該表中讀取資料,如果超過了上述任何觸發條件或品質門檻值,則可能會中斷。此外,我們可以輕松地建立一個排除品質不佳記錄的view以提供一個幹淨的表。

在一個較高的層次,執行我們的資料品質跟蹤和驗證的代碼如下所示:

spark.readStream
.table("trades_delta")
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

    // reassign our current state to the previous next state
    val stateStoreCurr = stateStoreNext

    // run analysis on the current batch, aggregate with saved state
    val metricsResult = AnalysisRunner.run(data=batchDF, ...)
    
    // verify the validity of our current microbatch
    val verificationResult = VerificationSuite()
        .onData(batchDF)
        .addCheck(...).run()

    // if verification fails, write batch to bad records table
    if (verificationResult.status != CheckStatus.Success) {...}

    // write the current results into the metrics table
    Metric_results.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("deequ_metrics")
}
.start()           

使用資料品質工具Deequ

在Databricks中使用Deequ是相對比較容易的事情,你需要首先定義一個analyzer,然後在dataframe上運作該analyzer。例如,我們可以跟蹤Deequ本地提供的幾個相關名額檢查,包括檢查數量和價格是否為非負數、原始IP位址是否不為空以及符号字段在所有事務中的唯一性。Deequ的StateProvider對象在流式資料配置中特别有用,它能允許使用者将我們名額的狀态儲存在記憶體或磁盤中,并在以後彙總這些名額。這意味着每個處理的批次僅分析該批次中的資料記錄,而不會分析整個表。即使随着資料大小的增長,這也可以使性能保持相對穩定,這在長時間運作的生産環境中很重要,因為生産環境需要在任意數量的資料上保持一緻。

MLFlow還可以很好地跟蹤名額随時間的演變,在我們的

notebook

中,我們跟蹤在foreachBatch代碼中分析的所有Deequ限制作為名額,并使用Delta的versionID和時間戳作為參數。在Databricks的notebook中,內建的MLFlow服務對于名額跟蹤特别友善。

通過使用Structured Streaming、Delta Lake和Deequ,我們能夠消除傳統情況下資料品質和速度之間的權衡,而專注于實作兩者的可接受水準。這裡特别重要的是靈活性——不僅在如何處理不良記錄(隔離、報錯、告警等),而且在體系結構上(例如何時以及在何處執行檢查?)和生态上(如何使用我們的資料?)。開源技術(如Delta Lake、Structured Streaming和Deequ)是這種靈活性的關鍵。随着技術的發展,能夠使用最新最、最強大的解決方案是提升其競争優勢的驅動力。最重要的是,你的資料的速度和品質一定不能對立,而要保持一緻,尤其是在流式資料處理越來越靠近核心業務營運時。很快,這将不會是一種選擇,而是一種期望和要求,我們正朝着這個未來方向一次一小步地不斷前進。

繼續閱讀