天天看點

【實踐案例】Databricks 資料洞察在美的暖通與樓宇的應用實踐

作者

美的暖通與樓宇事業部 先行研究中心智能技術部

美的暖通 IoT 資料平台建設背景

美的暖通與樓宇事業部(以下簡稱美的暖通)是美的集團旗下五大闆塊之一,産品覆寫多聯機組、大型冷水機組、單元機、機房空調、扶梯、直梯、貨梯以及樓宇自控軟體和建築弱電內建解決方案,遠銷海内外200多個國家。目前事業部裝置資料上雲僅停留在資料存儲層面,缺乏挖掘資料價值的平台,造成大量資料荒廢,并且不斷消耗存儲資源,增加存儲費用和維護成本。另一方面,現有資料驅動應用缺乏部署平台,難以産生實際價值。是以,急需統一通用的 IoT 資料平台,以支援裝置運作資料的快速分析和模組化。

我們的 IoT 資料平台建設基于阿裡雲 Databricks 資料洞察全托管 Spark 産品,以下是整體業務架構圖。在本文後面的章節,我們将就IoT資料平台建設技術選型上的一些思考,以及 Spark 技術棧尤其是 Delta Lake 場景的應用實踐做一下分享。

【實踐案例】Databricks 資料洞察在美的暖通與樓宇的應用實踐

選擇 Spark & Delta Lake

在資料平台計算引擎層技術選型上,由于我們資料團隊剛剛成立,前期的架構選型我們做了很多的調研,綜合各個方面考慮,希望選擇一個成熟且統一的平台:既能夠支援資料處理、資料分析場景,也能夠很好地支撐資料科學場景。加上團隊成員對 Python 及 Spark 的經驗豐富,是以,從一開始就将目标鎖定到了 Spark 技術棧。

選擇 Databricks 資料洞察 Delta Lake

通過與阿裡雲計算平台團隊進行多方面的技術交流以及實際的概念驗證,我們最終選擇了阿裡雲 Databricks 資料洞察産品。作為 Spark 引擎的母公司,其商業版 Spark 引擎,全托管 Spark 技術棧,統一的資料工程和資料科學等,都是我們決定選擇 Databricks 資料洞察的重要原因。

具體來看,Databricks 資料洞察提供的核心優勢如下:

  • Saas 全托管 Spark:免運維,無需關注底層資源情況,降低運維成本,聚焦分析業務
  • 完整 Spark 技術棧內建:一站式內建 Spark 引擎和 Delta Lake 資料湖,100%相容開源 Spark 社群版;Databricks 做商業支援,最快體驗 Spark 最新版本特性
  • 總成本降低:商業版本 Spark 及 Delta Lake 性能優勢顯著;同時基于計算存儲分離架構,存儲依托阿裡雲 OSS 對象存儲,借助阿裡雲 JindoFS 緩存層加速;能夠有效降低叢集總體使用成本
  • 高品質支援以及SLA保障:阿裡雲和 Databricks 提供覆寫 Spark 全棧的技術支援;提供商業化 SLA 保障與7*24小時 Databricks 專家支援服務

IoT 資料平台整體架構

【實踐案例】Databricks 資料洞察在美的暖通與樓宇的應用實踐

整體的架構如上圖所示。

我們接入的 IoT 資料分為兩部分,曆史存量資料和實時資料。目前,曆史存量資料是通過 Spark SQL 以天為機關從不同客戶關系資料庫批量導入 Delta Lake 表中;實時資料通過 IoT 平台采集到雲 Kafka ,經由 Spark Structured Streaming 消費後實時寫入到 Delta Lake 表中。在這個過程中,我們将實時資料和曆史資料都 sink 到同一張 Delta 表裡,這種批流一體操作可大大簡化我們的 ETL 流程(參考後面的案例部分)。資料管道下遊,我們對接資料分析及資料科學工作流。

IoT 資料采集:從 Little Data 到 Big Data

作為 IoT 場景的典型應用,美的暖通最核心的資料均來自 IoT 終端裝置。在整個 IoT 環境下,分布着無數個終端傳感器。從小的次元看,傳感器産生的資料本身屬于 Small Data(或者稱為 Little Data)。當把所有傳感器連接配接成一個大的 IoT 網絡,産生自不同傳感器的資料經由 Gateway 與雲端相連接配接,并最終在雲端形成 Big Data 。

在我們的場景下,IoT 平台本身會對不同協定的資料進行初步解析,通過定制的硬體網絡裝置将解析後的半結構化 JSON 資料經由網絡發送到雲 Kafka。雲 Kafka 扮演了整個資料管道的入口。

資料入湖:Delta Lake

IoT 場景下的資料有如下幾個特點:

  • 時序資料:傳感器産生的資料記錄中包含時間相關的資訊,資料本身具有時間屬性,是以不同的資料之間可能存在一定的相關性。利用 as-of-join 将不同時間序列資料 join 到一起是下遊資料預測分析的基礎
  • 資料的實時性:傳感器實時生成資料并以最低延遲的方式傳輸到資料管道,觸發規則引擎,生成告警和事件,通知相關從業人員。
  • 資料體量巨大:IoT 網絡環境下遍布各地的成千上萬台裝置及其傳感器再通過接入服務将海量的資料歸集到平台
  • 資料協定多樣:通常在 IoT 平台接入的不同種類裝置中,上傳資料協定種類多樣,資料編碼格式不統一

IoT 資料上述特點給資料處理、資料分析及資料科學等帶來了諸多挑戰,慶幸的是,這些挑戰借助 Spark 和 Delta Lake 都可以很好地應對。Delta Lake 提供了 ACID 事務保證,支援增量更新資料表以及流批同時寫資料。借助 Spark Structed Streaming 可以實作 IoT 時序資料實時入湖。

以下是 Delta Lake 經典的三級資料表架構。具體到美的暖通 IoT 資料場景,我們針對每一層級的資料表分别做了如下定義:

【實踐案例】Databricks 資料洞察在美的暖通與樓宇的應用實踐
  • Bronze 表:存儲原生資料(Raw Data),資料經由 Spark Structed Streaming 從 Kafka 消費下來後 upsert 進 Delta Lake 表,該表作為唯一的真實資料表  (Single Source of Truth)
  • Silver表:該表是在對 Bronze 表的資料進行加工處理的基礎上生成的中間表,在美的暖通的場景下,資料加工處理的步驟涉及到一些複雜的時序資料計算邏輯,這些邏輯都包裝在了 Pandas UDF 裡提供給 Spark 計算使用
  • Gold 表:Silver 表的資料施加 Schema 限制并做進一步清洗後的資料彙入 Gold 表,該表提供給下遊的 Ad Hoc 查詢分析及資料科學使用

資料分析:Ad-Hoc 查詢

我們内部在開源 Superset 基礎上定制了内部版本的 SQL 查詢與資料可視化平台,通過 PyHive 連接配接到 Databricks 資料洞察 Spark Thrift Server 服務,可以将 SQL 送出到叢集上。商業版本的 thrift server 在可用性及性能方面都做了增強,Databricks 資料洞察針對 JDBC 連接配接安全認證提供了基于 LDAP 的使用者認證實作。借助 Superset ,資料分析師及資料科學家可以快速高效的對 Delta Lake 表進行資料探索。

資料科學:Workspace

樓宇能耗預測與裝置故障診斷預測是美的暖通 IoT 大資料平台建設的兩個主要業務目标。在 IoT 資料管道下遊,需要對接機器學習平台。現階段為了更快速友善地支撐起資料科學場景,我們将 Databricks 資料洞察叢集與阿裡雲資料開發平台 DDC 打通。DDC 內建了在資料科學場景下更友好的 Jupyter Notebook ,通過在 Jupyter 上使用 PySpark ,可以将作業跑到 Databricks 資料洞察叢集上;同時,也可以借助 Apache Airflow 對作業進行排程。同時,考慮到機器學習模型建構、疊代訓練、名額檢測、部署等基本環節,我們也在探索 MLOps ,目前這部分工作還在籌備中。

典型應用場景介紹

Delta Lake 資料入湖(批流一體)

使用 UDF 函數定義流資料寫入 Delta Lake 的 Merge 規則

%spark
import org.apache.spark.sql._
import io.delta.tables._
 
// Function to upsert `microBatchOutputDF` into Delta table using MERGE
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
 microBatchOutputDF.createOrReplaceTempView("updates")
  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
 microBatchOutputDF.sparkSession.sql(s"""
   MERGE INTO delta_{table_name} t
   USING updates s
   ON s.uuid = t.uuid
   WHEN MATCHED THEN UPDATE SET 
   t.device_id = s.device_id,
   t.indoor_temperature =
s.indoor_temperature,
   t.ouoor_temperature = s.ouoor_temperature,
   t.chiller_temperature =
s.chiller_temperature,
   t.electricity = s.electricity,
   t.protocal_version = s.protocal_version,
   t.dt=s.dt,
   t.update_time=current_timestamp()
   WHEN NOT MATCHED THEN INSERT 
   (t.uuid,t.device_id,t.indoor_temperature,t.ouoor_temperature ,t.chiller_temperature
,t.electricity,t.protocal_version,t.dt,t.create_time,t.update_time)
  values
(s.uuid,s.device_id,s.indoor_temperature,s.ouoor_temperature,s.chiller_temperature,s.electricity,s.protocal_version
,s.dt,current_timestamp(),current_timestamp())
   """)
}      

使用 Spark Structured Streaming 實時流寫入 Delta Lake

%spark
 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
 
def getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {
   var streamingInputDF =  
 spark.readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", servers)
   .option("subscribe", topic)     
   .option("startingOffsets", "latest")  
   .option("minPartitions", "10")  
   .option("failOnDataLoss", "true")
   .load()
val resDF=streamingInputDF
   .select(col("value").cast("string"))
   .withColumn("newMessage",split(col("value"), " "))
   .filter(col("newMessage").getItem(7).isNotNull)
   .select(
       col("newMessage").getItem(0).as("uuid"),
       col("newMessage").getItem(1).as("device_id"),
       col("newMessage").getItem(2).as("indoor_temperature"),
       col("newMessage").getItem(3).as("ouoor_temperature"),
       col("newMessage").getItem(4).as("chiller_temperature"),
       col("newMessage").getItem(5).as("electricity"),
       col("newMessage").getItem(6).as("protocal_version")
   )
   .withColumn("dt",date_format(current_date(),"yyyyMMdd"))  
val query = resDF
     .writeStream
     .format("delta")
     .option("checkpointLocation", checkpoint_dir)
     .trigger(Trigger.ProcessingTime("60 seconds")) // 執行流處理時間間隔
     .foreachBatch(upsertToDelta _) //引用upsertToDelta函數
     .outputMode("update")
   query.start()
}      

資料災備:Deep Clone

由于 Delta Lake 的資料僅接入實時資料,對于存量曆史資料我們是通過 SparkSQL 一次性 Sink Delta Lake 的表中,這樣我們流和批處理時隻維護一張 Delta 表,是以我們隻在最初對這兩部分資料做一次 Merge 操作。同時為了保證資料的高安全,我們使用 Databricks Deep Clone 來做資料災備,每天會定時更新來維護一張從表以備用。對于每日新增的資料,使用 Deep Clone 同樣隻會對新資料 Insert 對需要更新的資料 Update 操作,這樣可以大大提高執行效率。

CREATE OR REPLACE TABLE delta.delta_{table_name}_clone
 
DEEP CLONE delta.delta_{table_name};      

性能優化:OPTIMIZE & Z-Ordering

在流處理場景下會産生大量的小檔案,大量小檔案的存在會嚴重影響資料系統的讀性能。Delta Lake 提供了 OPTIMIZE 指令,可以将小檔案進行合并壓縮,另外,針對 Ad-Hoc 查詢場景,由于涉及對單表多個次元資料的查詢,我們借助 Delta Lake 提供的 Z-Ordering 機制,可以有效提升查詢的性能。進而極大提升讀取表的性能。DeltaLake 本身提供了 Auto Optimize 選項,但是會犧牲少量寫性能,增加資料寫入 delta 表的延遲。相反,執行 OPTIMIZE 指令并不會影響寫的性能,因為 Delta Lake 本身支援 MVCC,支援 OPTIMIZE 的同時并發執行寫操作。是以,我們采用定期觸發執行 OPTIMIZE 的方案,每小時通過 OPTIMIZE 做一次合并小檔案操作,同時執行 VACCUM 來清理過期資料檔案:

OPTIMIZE delta.delta_{table_name} ZORDER by device_id, indoor_temperature;
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta.delta_{table_name} RETAIN 1 HOURS;      

另外,針對 Ad-Hoc 查詢場景,由于涉及對單表多個次元資料的查詢,我們借助 Delta Lake 提供的 Z-Ordering 機制,可以有效提升查詢的性能。

總結與展望

我們基于阿裡雲 Databricks 資料洞察産品提供的商業版 Spark 及 Delta Lake 技術棧快速建構了 IoT 資料處理平台,Databricks 資料洞察全托管免運維、商業版本引擎性能上的優勢以及計算/存儲分離的架構,為我們節省了總體成本。同時,Databricks 資料洞察産品自身提供的豐富特性,也極大提升了我們資料團隊的生産力,為資料分析業務的快速開展傳遞奠定了基礎。未來,美的暖通希望與阿裡雲 Databricks 資料洞察團隊針對 IoT 場景輸出更多行業先進解決方案。

擷取更詳細的 Databricks 資料洞察相關資訊,可至産品詳情頁檢視:

https://www.aliyun.com/product/bigdata/spark

阿裡巴巴開源大資料技術團隊成立 Apache Spark 中國技術社群,定期推送精彩案例,技術專家直播,隻為營造純粹的 Spark 氛圍,歡迎關注公衆号!

掃描下方二維碼入 Databricks 資料洞察産品交流釘釘群一起參與交流讨論!

【實踐案例】Databricks 資料洞察在美的暖通與樓宇的應用實踐

繼續閱讀