編譯:誠曆,阿裡巴巴計算平台事業部 EMR 技術專家,Apache Sentry PMC,Apache Commons Committer,目前從事開源大資料存儲和優化方面的工作。
原文連結 :
http://blog.madhukaraphatak.com/data-modeling-spark-part-1/資料模組化是資料分析重要的組成之一,正确的建立模型有助于使用者更好地解答業務相關的問題。在過去幾十年中,資料模組化技術也一直是SQL資料倉庫的基礎。
Apache Spark作為新一代的數倉技術的代表,我們能夠在 Spark 中使用早期的資料模組化技術。這使得Spark data pineline 更加有效。
在本系列文章中,我将讨論spark中不同的資料模組化。本系列的第一篇文章中将讨論如何使用日期次元。
資料分析中資料和時間的重要性
我們分析的大多數資料通常都包含日期或時間戳。例如,它可能是
- 股票的交易日期
- POS系統的交易時間
我們所做的很多分析通常都是關于日期或時間的。我們通常希望使用相同的方法對資料進行切分。
使用内置的Spark進行資料分析
本節讨論如何使用内置的spark日期函數進行資料分析。
蘋果股票資料
在本例中,我們将使用蘋果股票資料。以下是樣本資料
Date | Open | High | Low | Close | Volume | AdjClose |
---|---|---|---|---|---|---|
2013-12-31 00:00:00 | 554.170013 | 561.279976 | 554.000023 | 561.019997 | 55771100 | 76.297771 |
2013-12-30 00:00:00 | 557.460022 | 560.089989 | 552.319984 | 554.519981 | 63407400 | 75.41378 |
加載到Spark Dataframe
下面的代碼将資料加載到spark dataframe中。
val appleStockDf = sparkSession.read.format("csv").
option("header","true")
.option("inferSchema","true")
.load("src/main/resources/applestock_2013.csv")
分析日期
在本節中,讓我們看看如何回答與日期相關的問題。
- 有屬于周末的記錄嗎?
這種分析通常是為了確定資料的品質。周末應該不會有任何資料,因為周末不會有交易。
assert(sparkSession.sql
("select * from stocks where dayofweek(Date)==1 or
dayofweek(Date)==7").count() == 0)
在上述代碼中,1表示星期天,7表示星期六。我們可以看到,代碼是不可讀的,除非我們知道如何解碼這些神奇的數字。
- 顯示季度最高價格
這個分析找到了給定季度的最大值。
appleStockDf.groupBy(year($"Date"),quarter($"Date")).
avg("Close").
sort("year(Date)","quarter(Date)")
.show()
使用Spark日期函數進行資料分析的挑戰
盡管我們可以使用spark builtin資料函數來完成上面的分析,但是編寫它們是很困難的。此外,從外部BI解決方案很難表達這些需求,通常業務分析師使用者是最終使用者。是以,我們需要一種更簡單、更好的方法來實作上述目标。
日期次元
日期維是一個靜态資料集,它在列中列出給定日期的所有不同屬性。這個示例資料集模式如下所示
t
|-- date_key: integer (nullable = true)
|-- full_date: string (nullable = true)
|-- day_of_week: integer (nullable = true)
|-- day_num_in_month: integer (nullable = true)
|-- day_num_overall: integer (nullable = true)
|-- day_name: string (nullable = true)
|-- day_abbrev: string (nullable = true)
|-- weekday_flag: string (nullable = true)
|-- week_num_in_year: integer (nullable = true)
|-- week_num_overall: integer (nullable = true)
|-- week_begin_date: string (nullable = true)
|-- week_begin_date_key: integer (nullable = true)
|-- month: integer (nullable = true)
|-- month_num_overall: integer (nullable = true)
|-- month_name: string (nullable = true)
|-- month_abbrev: string (nullable = true)
|-- quarter: integer (nullable = true)
|-- year: integer (nullable = true)
|-- yearmo: integer (nullable = true)
|-- fiscal_month: integer (nullable = true)
|-- fiscal_quarter: integer (nullable = true)
|-- fiscal_year: integer (nullable = true)
|-- last_day_in_month_flag: string (nullable = true)
|-- same_day_year_ago: string (nullable = true)
在上面的格式中,一些重要的列是
* full_date - Timestamp for given day
* year - year in the date
* quarter - quarter the given date belongs
etc.
這個靜态資料集可以生成多年并保持可用。我們在示例中使用的示例可以從下面的連結下載下傳。
https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/books/microsoft-data-warehouse-dw-toolkit/.使用日期次元進行資料分析
本節讨論如何使用日期次元進行上述分析。
加載資料以觸發Dataframe
我們可以為我們的資料集建立一個dataframe,如下所示
val originalDf = sparkSession.read.format("csv").
option("header","true")
.option("inferSchema","true")
.load(dataPath)
//replace space in the column names
val new_columns = originalDf.schema.fields
.map(value => value.copy(name = value.name.replaceAll("\\s+","_")))
val newSchema = StructType(new_columns)
val newNameDf = sparkSession.createDataFrame(originalDf.rdd, newSchema)
import org.apache.spark.sql.functions._
val dateDf = newNameDf.withColumn("full_date_formatted",
to_date(newNameDf.col("full_date"),"dd/MM/yy"))
在上面的代碼中,進行了預處理,将字元串轉換為spark date資料類型。
與股票資料連接配接
我們可以使用spark連接配接将股票資料與日期結合起來
val joinedDF = appleStockDf.join(dateDf, appleStockDf.col("Date") ===
dateDf.col("full_date_formatted"))
This join doesn’t increase size of the data as it’s an inner join.
這個連接配接并不會增加資料的大小,因為它是一個内部連接配接。
分析
本節介紹如何在不使用複雜的spark函數的情況下進行分析
assert(joinedDF.filter("weekday_flag != 'y'").count()==0)
joinedDF.groupBy("year","quarter").
avg("Close").
sort("year","quarter")
.show()
日期次元的優點
本節讨論日期次元的優點。
跨不同分析的重用
相同的資料集可以用于不同的資料分析。與在查詢中編寫特殊函數或在資料集本身上添加這些列不同,擁有标準的日期次元有助于使所有的資料分析标準化。
Scalable
使用者可以在日期次元上添加更多的屬性,如區域假日等。這将豐富每個人的分析。這裡不需要額外的查詢。
使用者友好
使用日期次元生成的查詢更容易了解。
引用
https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/kimball-techniques/dimensional-modeling-techniques/calendar-date-dimension/.代碼
文本示例代碼可以在 github 上檢視
https://github.com/phatak-dev/spark2.0-examples/blob/2.4/src/main/scala/com/madhukaraphatak/examples/sparktwo/datamodeling/DateHandlingExample.scala阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。