天天看點

【譯】Apache Spark 資料模組化之時間次元(一)

編譯:誠曆,阿裡巴巴計算平台事業部 EMR 技術專家,Apache Sentry PMC,Apache Commons Committer,目前從事開源大資料存儲和優化方面的工作。

原文連結 :

http://blog.madhukaraphatak.com/data-modeling-spark-part-1/

資料模組化是資料分析重要的組成之一,正确的建立模型有助于使用者更好地解答業務相關的問題。在過去幾十年中,資料模組化技術也一直是SQL資料倉庫的基礎。

Apache Spark作為新一代的數倉技術的代表,我們能夠在 Spark 中使用早期的資料模組化技術。這使得Spark data pineline 更加有效。

在本系列文章中,我将讨論spark中不同的資料模組化。本系列的第一篇文章中将讨論如何使用日期次元。

資料分析中資料和時間的重要性

我們分析的大多數資料通常都包含日期或時間戳。例如,它可能是

  • 股票的交易日期
  • POS系統的交易時間

我們所做的很多分析通常都是關于日期或時間的。我們通常希望使用相同的方法對資料進行切分。

使用内置的Spark進行資料分析

本節讨論如何使用内置的spark日期函數進行資料分析。

蘋果股票資料

在本例中,我們将使用蘋果股票資料。以下是樣本資料

Date Open High Low Close Volume AdjClose
2013-12-31 00:00:00 554.170013 561.279976 554.000023 561.019997 55771100 76.297771
2013-12-30 00:00:00 557.460022 560.089989 552.319984 554.519981 63407400 75.41378

加載到Spark Dataframe

下面的代碼将資料加載到spark dataframe中。

val appleStockDf = sparkSession.read.format("csv").
      option("header","true")
      .option("inferSchema","true")
      .load("src/main/resources/applestock_2013.csv")           

分析日期

在本節中,讓我們看看如何回答與日期相關的問題。

  • 有屬于周末的記錄嗎?

這種分析通常是為了確定資料的品質。周末應該不會有任何資料,因為周末不會有交易。

assert(sparkSession.sql
       ("select * from stocks where dayofweek(Date)==1 or 
       dayofweek(Date)==7").count() == 0)           

在上述代碼中,1表示星期天,7表示星期六。我們可以看到,代碼是不可讀的,除非我們知道如何解碼這些神奇的數字。

  • 顯示季度最高價格

這個分析找到了給定季度的最大值。

appleStockDf.groupBy(year($"Date"),quarter($"Date")).
      avg("Close").
      sort("year(Date)","quarter(Date)")
      .show()           

使用Spark日期函數進行資料分析的挑戰

盡管我們可以使用spark builtin資料函數來完成上面的分析,但是編寫它們是很困難的。此外,從外部BI解決方案很難表達這些需求,通常業務分析師使用者是最終使用者。是以,我們需要一種更簡單、更好的方法來實作上述目标。

日期次元

日期維是一個靜态資料集,它在列中列出給定日期的所有不同屬性。這個示例資料集模式如下所示

t
 |-- date_key: integer (nullable = true)
 |-- full_date: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_num_in_month: integer (nullable = true)
 |-- day_num_overall: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- day_abbrev: string (nullable = true)
 |-- weekday_flag: string (nullable = true)
 |-- week_num_in_year: integer (nullable = true)
 |-- week_num_overall: integer (nullable = true)
 |-- week_begin_date: string (nullable = true)
 |-- week_begin_date_key: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- month_num_overall: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- month_abbrev: string (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- yearmo: integer (nullable = true)
 |-- fiscal_month: integer (nullable = true)
 |-- fiscal_quarter: integer (nullable = true)
 |-- fiscal_year: integer (nullable = true)
 |-- last_day_in_month_flag: string (nullable = true)
 |-- same_day_year_ago: string (nullable = true)           

在上面的格式中,一些重要的列是

* full_date - Timestamp for given day
* year - year in the date
* quarter - quarter the given date belongs
etc.           

這個靜态資料集可以生成多年并保持可用。我們在示例中使用的示例可以從下面的連結下載下傳。

https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/books/microsoft-data-warehouse-dw-toolkit/.

使用日期次元進行資料分析

本節讨論如何使用日期次元進行上述分析。

加載資料以觸發Dataframe

我們可以為我們的資料集建立一個dataframe,如下所示

val originalDf = sparkSession.read.format("csv").
      option("header","true")
      .option("inferSchema","true")
      .load(dataPath)

    //replace space in the column names
    val new_columns = originalDf.schema.fields
      .map(value => value.copy(name = value.name.replaceAll("\\s+","_")))

    val newSchema = StructType(new_columns)
    val newNameDf = sparkSession.createDataFrame(originalDf.rdd, newSchema)

    import org.apache.spark.sql.functions._
    val dateDf = newNameDf.withColumn("full_date_formatted",
      to_date(newNameDf.col("full_date"),"dd/MM/yy"))           

在上面的代碼中,進行了預處理,将字元串轉換為spark date資料類型。

與股票資料連接配接

我們可以使用spark連接配接将股票資料與日期結合起來

val joinedDF = appleStockDf.join(dateDf, appleStockDf.col("Date") ===
      dateDf.col("full_date_formatted"))
This join doesn’t increase size of the data as it’s an inner join.           

這個連接配接并不會增加資料的大小,因為它是一個内部連接配接。

分析

本節介紹如何在不使用複雜的spark函數的情況下進行分析

assert(joinedDF.filter("weekday_flag != 'y'").count()==0)           
joinedDF.groupBy("year","quarter").
      avg("Close").
      sort("year","quarter")
      .show()           

日期次元的優點

本節讨論日期次元的優點。

跨不同分析的重用

相同的資料集可以用于不同的資料分析。與在查詢中編寫特殊函數或在資料集本身上添加這些列不同,擁有标準的日期次元有助于使所有的資料分析标準化。

Scalable

使用者可以在日期次元上添加更多的屬性,如區域假日等。這将豐富每個人的分析。這裡不需要額外的查詢。

使用者友好

使用日期次元生成的查詢更容易了解。

引用

https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/kimball-techniques/dimensional-modeling-techniques/calendar-date-dimension/.

代碼

文本示例代碼可以在 github 上檢視

https://github.com/phatak-dev/spark2.0-examples/blob/2.4/src/main/scala/com/madhukaraphatak/examples/sparktwo/datamodeling/DateHandlingExample.scala

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

【譯】Apache Spark 資料模組化之時間次元(一)

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

【譯】Apache Spark 資料模組化之時間次元(一)