天天看點

Spark Packages尋寶(一):簡單易用的資料準備工具Optimus

作者:李呈祥,花名司麟 ,阿裡雲智能EMR團隊進階技術專家,Apache Hive Committer, Apache Flink Committer,目前主要專注于EMR産品中開源計算引擎的優化工作。

Spark社群在

Spark Packages

網站中索引了許多第三方庫,這些第三方庫由不同的開發者貢獻,作為Spark生态圈的一部分,擴充了Spark的使用範圍和使用場景,其中很多對于我們日常的使用可能有幫助,我們準備開啟一個系列文章介紹Spark Packages中一些有意思的第三方庫,作為系列的第一篇,本文主要介紹Optimus,一個基于PySpark的簡單易用的資料準備工具。

本文的部分内容源自Optimus官網和相關介紹文章,原文連結參考文末引用部分。

在Spark(Pyspark)的支援下,Optimus允許使用者使用自己的或一組預先建立的資料轉換功能來清理資料,對其進行概要分析并應用與資料分析和機器學習等場景,可以輕松地利用python語言進行所有這些操作。Optimus主要關注與以下幾個方面:

  • 建立一個可靠的API來通路和處理資料。
  • 讓使用者輕松地從Pandas遷移。
  • 使資料探索更加容易。

建立一個可靠的API來通路和處理資料

首先,我們來看看Optimus的基本使用方式

from pyspark.sql import SparkSession
from optimus import Optimus

// 建立context
spark = SparkSession.builder.appName('optimus').getOrCreate()
op= Optimus(spark)

// 加載資料
df = op.load.csv("../examples/data/foo.csv")

// 執行清理
new_df = df\
    .rows.sort("rank","desc")\
    .withColumn('new_age', df.age)\
    .cols.lower(["names","function"])\
    .cols.date_transform("date arrival", "yyyy/MM/dd", "dd-MM-YYYY")\
    .cols.years_between("date arrival", "dd-MM-YYYY", output_cols = "from arrival")\
    .cols.remove_accents("names")\
    .cols.remove_special_chars("names")\
    .rows.drop(df["rank"]>8)\
    .cols.rename(str.lower)\
    .cols.trim("*")\
    .cols.unnest("japanese name", output_cols="other names")\
    .cols.unnest("last position seen",separator=",", output_cols="pos")\
    .cols.drop(["last position seen", "japanese name","date arrival", "cybertronian", "nulltype"])

//儲存結果
new_df.save.csv("data/foo.csv")
           

Optimus基本PySpark架構,重新組織了對資料進行清理/準備的相關API,把資料處理整理為針對dataframe.rows和dataframe.cols兩類操作,基于rows和cols實作了非常豐富的針對資料清理和準備相關的接口,使用者可以使用這些接口非常友善高效地完成相關工作。在Optimus中,核心的資料操作可以歸納為如下幾類:

  • 建立一個DataFrame
  • 用append()追加行或列
  • 使用select()選擇行或列
  • 使用apply()更新或轉換列資料
  • 使用drop()删除行或列
  • 使用read()加載資料
  • 使用write()儲存資料

針對列的操作

對于資料集的操作主要是針對列進行的,是以這裡主要介紹一些典型的針對列的操作類型:

Aggregation

Optimus擴充了PySpark的操作,建立了一種更簡單的方式來對資料集進行統計。

print(df.cols.min(“ height”))
print(df.cols.percentile(['height','rank'],[0.05,0.25,0.5,0.75,0.95]))
print(df.cols.max(“ height”))
print(df.cols.median([“ height”,“ rank”]))
print(df.cols.range([“ height”,“ rank”])))
print(df.cols.std([“ height”,“ rank”]))17.5    
{'height':{0.05:17.5,0.25:17.5,0.5:26.0,0.75:28.0,0.95:28.0},'rank':{0.05:7.0,0.25:7.0,0.5:7.0,0.75:10.0,0.95 :10.0}} 
28.0 
{'height':26.0,'rank':7.0} 
{'height':{'min':17.5,'max':28.0},'rank':{'min':7,'max ':10}} 
{'height':{'stddev':5.575242894559244},'rank':{'stddev':1.7320508075688772}}           
Transformation and Chaining

類似PySpark DataFrame的操作,Optimus的資料轉換操作也可以連結起來,甚至還可以和PySpark的DataFrame操作連結起來。利用Spark的延遲計算的特性(在示例中show()才會觸發計算),使用Catalyst優化執行計劃。

df = df\
    .rows.sort(["rank","height"])\
    .cols.lower(["names","function"])\
    .cols.remove_accents("names")\
    .cols.remove_special_chars("names")\
    .cols.trim("names")\
    .show()
+---------+------+---------+----+
|    names|height| function|rank|
+---------+------+---------+----+
|  optimus|  28.0|   leader|  10|
| ironhide|  26.0| security|   7|
|bumblebee|  17.5|espionage|   7|
+---------+------+---------+----+           
Nest/Unnest

使用nest和unnest操作可以将多列合并成一個新列或者将一列拆分為多列,如下所示:

df.cols.nest(["names", "function"], output_col = "new_col", shape ="string").show()
+---------+------+---------+----+-------------------+
|    names|height| function|rank|            new_col|
+---------+------+---------+----+-------------------+
|  optimus|  28.0|   leader|  10|     optimus leader|
| ironhide|  26.0| security|   7|  ironhide security|
|bumblebee|  17.5|espionage|   7|bumblebee espionage|
+---------+------+---------+----+-------------------+

df.cols.unnest("new_col", " ").cols.drop("new_col")
+---------+------+---------+----+---------+---------+
|    names|height| function|rank|new_col_0|new_col_1|
+---------+------+---------+----+---------+---------+
|  optimus|  28.0|   leader|  10|  optimus|   leader|
| ironhide|  26.0| security|   7| ironhide| security|
|bumblebee|  17.5|espionage|   7|bumblebee|espionage|
+---------+------+---------+----+---------+---------+           
自定義轉換

Optimus具有兩個函數apply()和apply_expr(),使用者可以在其中實作函數(UDF或Pandas UDF)或清單達式。

from pyspark.sql import functions as F
def func(value, args):
    return value + 1
  
df.cols.apply("height", func, "int")\
  .cols.apply_expr("rank", F.col("rank")+1)\
  .show()
    
+---------+------+---------+----+
|    names|height| function|rank|
+---------+------+---------+----+
|  optimus|    29|   leader|  11|
| ironhide|    27| security|   8|
|bumblebee|    18|espionage|   8|
+---------+------+---------+----+           

讓使用者輕松地從Pandas遷移

在資料分析領域,Python是通用語言,而Pandas是最常用的庫,是以Optimus在設計時盡量和Pandas的接口保持一緻,以下是Optimus和Pandas以及PySpark的接口對比:

Description Pandas Spark Optimus
Read csv file pd.read_csv() spark.read.csv() op.read.csv()
Create Dataframe pd.Dataframe df.createdataframe() op.create.df()
Append Row df.append df.union() df.row().append()
Column Mean df.mean df1.agg({"x": "max"}) df.cols().mean()
Show Rows from Dataframe df.head() df.show()
Drop Columns df.drop() df.cols().drop()
Sum all values in a Column df.sum() df1.agg({"x": "sum"}) function df.cols().sum()
Save Dataframe to csv df.to_csv() df.write.csv() df.save().csv()
Get a value by index df.get() NA
Get the mode of a column df.mode() NI df.cols().mode()
Cast a Column df.astype() df.column.cast() df.cols().cast(), astype() as alias
Substract 2 dataframes df.sub()
Merge to dataframes pd.concat() optimus.concat()
Apply a user defined fucntion to a column df.apply(func) fn = F.udf(labmbda x:x+1, DoubleType()) df.withColumn('disp1', n(df.disp)) df.cols().apply(func)
Group rows df.groupby()
Joint operation between to dataframes df.join()
Fill Null values with x df.fillna()
Get the max number of a Column df.max() df.cols().max()
Reset index reset_index()

NI= Not implemented

NA= Not

除了在Spark無法實作的功能(如reset_index),Optimus實作了幾乎所有Pandas可應用于Spark的功能,而且兩個接口基本一緻,大大友善了Pandas使用者的遷移。

使資料探索更輕松

Optimus具有功能強大的内置資料探查器,除所有基本操作外,它還提供了獨特的資料探查功能。使用者可以檢視特定列中存在多少種資料類型。例如,有一百萬行的顔色值為白色,黑色,紅色以及數百種顔色,如何确定一百萬行中沒有“ 1”數字?資料探查使得使用者可以了解資料集的品質,是否有髒資料,為資料清理和準備提供前提資訊和驗證方式,使資料集以合适的狀态用于後續的資料分析和ML/DL處理。

df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/Meteorite_Landings.csv").h_repartition()
// 對name列進行profile
op.profiler.run(df, "name", infer=False)           
Spark Packages尋寶(一):簡單易用的資料準備工具Optimus

可以看到profile提供了詳細的資料統計資訊,包括name列的資料類型,null數量,distinct count數量,topN出現次數,histogram分布等等,基于這些資訊,使用者可以準确了解資料的品質。

Profile由于計算量比較大,可能會比較耗時,特别是對于distinct count這種操作,使用relative_error 和 approx_count參數可以以降低精度為代價加速profile速度。

op.profiler.run(df, "name", infer=False, relative_error =1, approx_count=True)           

總結

本文主要介紹了Optimus項目,作為一個Spark的第三方庫,Optimus基于PySpark,為使用者提供了一套完整的資料品質探查和資料清理工具集,接口參考Pandas設計,易用且強大,非常适合大規模資料的清理準備工作。限于篇幅,還有很多Optimus的清理接口和Profile功能沒有介紹,感興趣的同學可以通路

Optimus官網

探索更多功能和用法。

引用

  1. https://hi-optimus.com/
  2. https://github.com/ironmussa/Optimus
  3. http://docs.hioptimus.com/en/latest/sections/overview.html
  4. https://towardsdatascience.com/announcing-optimus-v2-agile-data-science-workflows-made-easy-c127a12d9e13

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

Spark Packages尋寶(一):簡單易用的資料準備工具Optimus

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

Spark Packages尋寶(一):簡單易用的資料準備工具Optimus

Apache Spark技術交流社群公衆号,微信掃一掃關注

Spark Packages尋寶(一):簡單易用的資料準備工具Optimus