天天看點

【譯】Apache Spark 2.4 内置資料源 Apache Avro

原文連結: Apache Avro as a Built-in Data Source in Apache Spark 2.4
Apache Avro

是一種流行的資料序列化格式。它廣泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的資料流場景。從

Apache Spark 2.4

版本開始,Spark 原生支援了 Avro 資料的讀寫。新的内置 spark-avro 子產品最初來自 Databricks 開源項目

Avro Data Source for Apache Spark

(後文簡稱為 spark-avro )。 此外, 它還提供了:

  • 新函數 from_avro() 和 to_avro() 用于在 DataFrame 中讀寫 Avro 資料,而不僅僅是檔案。
  • Avro 邏輯類型支援, 包括 Decimal、Timestamp 和日期類型。
  • 2 倍的讀吞吐量提升和 10% 的寫吞吐量提升。

這篇部落格中, 我們會通過示例逐條的講解上述的每個功能,通過例子你會發現其 API 的易用性,高性能等優點。

Load 和 Save 函數

在 Apache Spark 2.4 中,隻需要在 DataFrameReander 和 DataFrameWriter 中将檔案格式指定為 “avro” 就能夠加載和儲存 Avro 格式資料。出于一緻性考慮,用法和其他内置資料源類似。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")           

from_avro() 和 to_avro() 函數

為了進一步簡化資料轉換流程,我們引入了 2 個新的内置函數: from_avro() 和 to_avro()。Avro 常用于序列化/反序列化基于 Apache Kafka 的資料流中的消息/資料。在讀取或寫入 Kafka 時,把 Avro 記錄當做列是非常有用的。每個 Kafka 鍵值對記錄都會相應的新增一些中繼資料, 例如攝取時間戳、偏移量等。

上述函數非常有用的三個場景:

  • 當 Spark 從 Kafka 讀取 Avro 二進制資料時,from_avro() 可以提取資料,清理資料并對其進行轉換。
  • 如果要将結構轉換為 Avro 二進制記錄,然後再将其重新推送到的 Kafka 或将其寫入檔案,使用 to_avro() 。
  • 如果要将多個列重新編碼為單個列,使用 to_avro() 。

上述函數僅支援 Scala 和 Java 中使用。

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()           

檢視更多示例,點選

Read and Write Streaming Avro Data with DataFrames

與 Databricks spark-avro 的相容性

内置的 spark-avro 子產品與 Databricks 的開源庫

spark-avro

相容。

使用内置 Avro 子產品可以加載/寫入先前使用 com.databricks.spark.avro 建立的資料源表,而無需任何代碼更改。實際上,如果更喜歡使用自己建構的 spark-avro jar 檔案,則隻需禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled ,并在部署應用程式時使用選項 --jars。有關詳細資訊,請閱讀

應用程式送出指南

中的

進階依賴管理

部分。

性能改進

通過

SPARK-24800

的 IO 優化,内置的 Avro 資料源在讀取和寫入 Avro 檔案都實作了性能提升。我們進行了一些基準測試,觀察到讀取性能提高了 2 倍,寫入性能提高了 8% 。

配置和方法

我們在

Databricks 社群版

上的單個節點 Apache Spark 叢集上運作了基準測試。有關基準測試的詳細實施,請檢視

Avro 基準測試手冊
【譯】Apache Spark 2.4 内置資料源 Apache Avro

如圖表所示,讀取性能提升接近 2 倍,寫入性能也提高了 8%。

配置細節:

  • 資料:包含各種資料類型的 1 百萬行資料的DataFrame:Int / Double / String / Map / Array / Struct等。
  • 叢集:6.0 GB 記憶體,0.88 核心,1 DBU。
  • Databricks運作時 版本:5.0(新的内置 spark-avro )和 4.0(外部 Databricks spark-avro 庫)。

結論

新的内置 spark-avro 子產品在

Spark SQL

Structured Streaming

中提供了更好的使用者體驗和 IO 性能。由于Spark本身對 Avro 的内置支援,最初的 spark-avro 将被棄用。

您可以在

Databricks Runtime 5.0

上嘗試

版本。 要了解有關如何在雲中使用 Apache Avro 進行 Structured Streaming 處理的更多資訊,請閱讀 Azure Databricks 或 AWS 上的文檔。

歡迎spark感興趣的同學入群技術交流!

【譯】Apache Spark 2.4 内置資料源 Apache Avro

繼續閱讀