編譯:江宇,阿裡雲EMR技術專家。從事Hadoop核心開發,目前專注于機器學習、深度學習大資料平台的建設。
簡介:
在機器學習領域,Apache Spark 由于其支援 SQL 類型的操作以及高效的資料處理,被廣泛的用于資料預處理流程,同時 TensorFlow 作為廣受歡迎的深度學習架構被廣泛的用于模型訓練。盡管兩個架構有一些共同支援的資料格式,但是,作為 TFRecord—TensorFlow 的原生格式,并沒有被 Spark 完全支援。盡管之前有過一些嘗試,試圖解決兩個系統之間的差異(比如
Spark-TensorFlow-Connector),但是現有的實作都缺少很多 Spark 支援的重要特性。
本文中,我們将介紹 Spark 的一個新的資料源,
Spark-TFRecord。Spark-TFRecord 的目的是提供在Spark中對原生的 TensorFlow 格式進行完全支援。本項目的目的是将TFRecord 作為Spark資料源社群中的第一類公民,類似于 Avro,JSON,Parquet等。Spark-TFRecord 不僅僅提供簡單的功能支援,比如 Data Frame的讀取、寫入,還支援一些高階功能,比如ParititonBy。使用 Spark-TFRecord 将會使資料處理流程與訓練工程完美結合。
LinkedIn 内部 Spark 和 TensorFlow 都被廣泛的使用。Spark 被用于資料處理、訓練資料預處理流程中。Spark 同時也是資料分析的領先工具。随着原來越多的商業部門使用深度學習模型,TensorFlow 成為了模型訓練和模型服務的主流工具。開源的TensorFlow 模型使用 TFRecord 作為資料格式,而LinkedIn 内部大部分使用 Avro 格式。為了模型訓練,我們或者修改代碼使模型訓練能夠讀取avro格式,或者将avro格式的datasets轉化為TFRecord。Spark-TFRecod主要是解決後者,即将不同格式轉化為TFRecord。
現有的項目和之前的嘗試
在 Spark-TFRecord 項目之前,社群提供 Spark-TensorFlow-Connector , 在 Spark 中讀寫 TFRecord 。Spark-TensorFlow-Connector 是 TensorFlow 生态圈的一部分,并且是由 DataBricks,spark 的創始公司提供。盡管 Spark-TensorFlow-Connector 提供基本的讀寫功能,但是我們在LinkedIn的使用中發現了兩個問題。首先,它基于 RelationProvider 接口。這個接口主要用于Spark 與資料庫連接配接,磁盤讀寫操作都是由資料庫來支援。然而 Spark-TensorFlow-Connector 的使用場景是磁盤IO,而不是連接配接資料庫,這塊接口需要開發者自己實作 RelationProvider 來支援IO操作。這就是為什麼Spark-TensorFlow-Connector 大量代碼是用于不同的磁盤讀寫場景。
此外,Spark-TensorFlow-Connector 缺少一些 Spark支援的重要功能,比如 PartitionBy 用于将dataset 根據不同列進行分片。我們發現這個功能在LinkedIn 中對于模型訓練非常重要,提供訓練過程中根據實體IDs進行切分進行分布式訓練。這個功能在TensorFlow 社群中也是高需求。
Spark-TFRrecord 為了解決上述問題,實作了FileFormat 接口,其他的原生格式比如 Avro,Parquet 等也實作了該接口。使用該接口後,TFRecord 就擷取了所有的 DataFrame 和 DataSet 的I/O API,包括之前說的 PartitionBy 功能。此外,之後的 Spark I/O 接口的功能增強也能夠自動擷取到。
設計
我們起初考慮對 Spark-TensorFlow-Connector 打更新檔的方式去擷取 PartitionBy 功能。檢查過源碼後,我們發現 Spark-TensorFlow-Connector 使用的RelationProvider接口,是用于連接配接 Spark 與 SQL 資料庫的,不适用于 TensorFlow 場景。然後并沒有一個簡單解決方式去解決 RelationProvider 并不提供磁盤I/O操作這一問題。于是,我們決定采取了不同的方式,我們實作了FileFormat,FileFormat是用來實作底層的基于檔案的I/O操作。實作這一功能對LinkedIn的場景是非常有用的,我們的datasets基本上都是直接讀寫磁盤。
下圖展示了各個子產品

每個子產品作用如下:
Schema Inferencer: 用于将Spark的資料類型推測為TFRecord的資料類型,我們複用了很多Spark-Tensorflow-Connector功能。
TFRecord Reader: 讀取磁盤中TFRecord檔案并使用反序列化器将TFRecord轉換為Spark的InternalRow資料結構。
TFRecord Writer:将Spark的InternalRow資料結構通過序列化器轉化為TFRecord格式并儲存至磁盤。我們使用
TensorFlow Hadoop庫的寫入器。
TFRecord Deserializer: 反序列化器,将TFRecord轉化為Spark InternalRow。
TFRecord Serializer: 序列化器,将Spark InternalRow轉化為TFRecord。
如何使用Spark-TFRecord
Spark-TFRecord與Spark-TensorFlow-Connector完全後向相容。遷移十分友善,隻需要加入spark-tfrecord jar包并且指定資料格式為“tfrecord”。下面的例子顯示了如何使用Spark-TFRecord去讀取傾斜和partition TFRecord檔案。更多的例子可以參照
github倉庫。
// launch spark-shell with the following command:
// SPARK_HOME/bin/spark-shell --jar target/spark-tfrecord_2.11-0.1.jar
import org.apache.spark.sql.SaveMode
val df = Seq((8, "bat"),(8, "abc"), (1, "xyz"), (2, "aaa")).toDF("number", "word")
df.show
// scala> df.show
// +------+----+
// |number|word|
// +------+----+
// | 8| bat|
// | 8| abc|
// | 1| xyz|
// | 2| aaa|
// +------+----+
val tf_output_dir = "/tmp/tfrecord-test"
// dump the tfrecords to files.
df.repartition(3, col("number")).write.mode(SaveMode.Overwrite).partitionBy("number").format("tfrecord").option("recordType", "Example").save(tf_output_dir)
// ls /tmp/tfrecord-test
// _SUCCESS number=1 number=2 number=8
// read back the tfrecords from files.
val new_df = spark.read.format("tfrecord").option("recordType", "Example").load(tf_output_dir)
new_df.show
// scala> new_df.show
// +----+------+
// |word|number|
// +----+------+
// | bat| 8|
// | abc| 8|
// | xyz| 1|
// | aaa| 2|
總結
Spark-TFRecord使得Record可以作為Spark 資料格式的一等公民與其他資料格式一起使用。包含了所有dataframe API的功能,比如讀、寫、分區等。目前我們僅限于schemas符合Spark-Tensorflow-Connector要求。未來的工作将會提供更複雜的schemas支援。
原文連結:
https://engineering.linkedin.com/blog/2020/spark-tfrecord阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。
Apache Spark技術交流社群公衆号,微信掃一掃關注