原文連結: Apache Avro as a Built-in Data Source in Apache Spark 2.4Apache Avro
是一種流行的資料序列化格式。它廣泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的資料流場景。從
Apache Spark 2.4版本開始,Spark 原生支援了 Avro 資料的讀寫。新的内置 spark-avro 子產品最初來自 Databricks 開源項目
Avro Data Source for Apache Spark(後文簡稱為 spark-avro )。 此外, 它還提供了:
- 新函數 from_avro() 和 to_avro() 用于在 DataFrame 中讀寫 Avro 資料,而不僅僅是檔案。
- Avro 邏輯類型支援, 包括 Decimal、Timestamp 和日期類型。
- 2 倍的讀吞吐量提升和 10% 的寫吞吐量提升。
這篇部落格中, 我們會通過示例逐條的講解上述的每個功能,通過例子你會發現其 API 的易用性,高性能等優點。
Load 和 Save 函數
在 Apache Spark 2.4 中,隻需要在 DataFrameReander 和 DataFrameWriter 中将檔案格式指定為 “avro” 就能夠加載和儲存 Avro 格式資料。出于一緻性考慮,用法和其他内置資料源類似。
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
from_avro() 和 to_avro() 函數
為了進一步簡化資料轉換流程,我們引入了 2 個新的内置函數: from_avro() 和 to_avro()。Avro 常用于序列化/反序列化基于 Apache Kafka 的資料流中的消息/資料。在讀取或寫入 Kafka 時,把 Avro 記錄當做列是非常有用的。每個 Kafka 鍵值對記錄都會相應的新增一些中繼資料, 例如攝取時間戳、偏移量等。
上述函數非常有用的三個場景:
- 當 Spark 從 Kafka 讀取 Avro 二進制資料時,from_avro() 可以提取資料,清理資料并對其進行轉換。
- 如果要将結構轉換為 Avro 二進制記錄,然後再将其重新推送到的 Kafka 或将其寫入檔案,使用 to_avro() 。
- 如果要将多個列重新編碼為單個列,使用 to_avro() 。
上述函數僅支援 Scala 和 Java 中使用。
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro('value, jsonFormatSchema) as 'user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as 'value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
檢視更多示例,點選
Read and Write Streaming Avro Data with DataFrames。
與 Databricks spark-avro 的相容性
内置的 spark-avro 子產品與 Databricks 的開源庫
spark-avro相容。
使用内置 Avro 子產品可以加載/寫入先前使用 com.databricks.spark.avro 建立的資料源表,而無需任何代碼更改。實際上,如果更喜歡使用自己建構的 spark-avro jar 檔案,則隻需禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled ,并在部署應用程式時使用選項 --jars。有關詳細資訊,請閱讀
應用程式送出指南中的
進階依賴管理部分。
性能改進
通過
SPARK-24800的 IO 優化,内置的 Avro 資料源在讀取和寫入 Avro 檔案都實作了性能提升。我們進行了一些基準測試,觀察到讀取性能提高了 2 倍,寫入性能提高了 8% 。
配置和方法
我們在
Databricks 社群版上的單個節點 Apache Spark 叢集上運作了基準測試。有關基準測試的詳細實施,請檢視
Avro 基準測試手冊![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL0UDOzUTZmJjNmNzNklDM4IzNmhTY2QGM4QmZlJWO0gTNjVzNjVmMh9CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)
如圖表所示,讀取性能提升接近 2 倍,寫入性能也提高了 8%。
配置細節:
- 資料:包含各種資料類型的 1 百萬行資料的DataFrame:Int / Double / String / Map / Array / Struct等。
- 叢集:6.0 GB 記憶體,0.88 核心,1 DBU。
- Databricks運作時 版本:5.0(新的内置 spark-avro )和 4.0(外部 Databricks spark-avro 庫)。
結論
新的内置 spark-avro 子產品在
Spark SQL和
Structured Streaming中提供了更好的使用者體驗和 IO 性能。由于Spark本身對 Avro 的内置支援,最初的 spark-avro 将被棄用。
您可以在
Databricks Runtime 5.0上嘗試
版本。 要了解有關如何在雲中使用 Apache Avro 進行 Structured Streaming 處理的更多資訊,請閱讀 Azure Databricks 或 AWS 上的文檔。