Apache Avro 是一種流行的資料序列化格式。它廣泛用于 Apache Spark 和 Apache Hadoop 生态系統,尤其适用于基于 Kafka 的資料管道。從 Apache Spark 2.4 版本開始(參見 Apache Spark 2.4 正式釋出,重要功能詳細介紹),Spark 為讀取和寫入 Avro 資料提供内置支援。新的内置 spark-avro 子產品最初來自 Databricks 的開源項目Avro Data Source for Apache Spark。除此之外,它還提供以下功能:
- 新函數 from_avro() 和 to_avro() 用于在 DataFrame 中讀取和寫入 Avro 資料,而不僅僅是檔案。
- 支援 Avro 邏輯類型(logical types),包括 Decimal,Timestamp 和 Date類型。
- 2倍讀取吞吐量提高和10%寫入吞吐量提升。
本文将通過示例介紹上面的每個功能。
加載和儲存函數
在 Apache Spark 2.4 中,為了讀寫 Avro 格式的資料,你隻需在 DataFrameReader 和 DataFrameWriter 中将檔案格式指定為“avro”即可。其用法和其他資料源用法很類似。如下所示:
val iteblogDF = spark.read.format("avro").load("examples/src/main/resources/iteblog.avro")
iteblogDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
from_avro() 和 to_avro() 的使用
為了進一步簡化資料轉換流程(transformation pipeline),社群引入了兩個新的内置函數:from_avro() 和 to_avro()。Avro 通常用于序列化/反序列化基于 Apache Kafka 的資料管道中的消息或資料,在讀取或寫入 Kafka 時,将 Avro records 作為列将非常有用。每個 Kafka 鍵值記錄都會增加一些中繼資料,例如 Kafka 的攝取時間戳,Kafka 的偏移量等。
在以下三種場景,from_avro() 和 to_avro() 函數将非常有用:
- 當使用 Spark 從 Kafka 中讀取 Avro 格式的資料,可以使用 from_avro() 函數來抽取你要的資料,清理資料并對其進行轉換。
- 當你想要将 structs 格式的資料轉換為 Avro 二進制記錄,然後将它們發送到 Kafka 或寫入到檔案,你可以使用 to_avro()。
- 如果你需要将多個列重新編碼為單個列,請使用to_avro().
目前這兩個函數僅在 Scala 和 Java 語言中可用。from_avro 和 to_avro 函數的使用除了需要人為指定 Avro schema,其他的和使用 from_json 和 to_json 函數一樣,下面是這兩個函數的使用示例。
在代碼裡面指定 Avro 模式
import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder
val servers = "www.iteblog.com:9092"
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save them to a Kafka topic.
iteblogDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
通過 Schema Registry 服務提供 Avro 模式
如果我們有 Schema Registry 服務,那麼我們就不需要在代碼裡面指定 Avro 模式了,如下:
import org.apache.spark.sql.avro._
// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://www.iteblog.com"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
// Given that key and value columns are registered in Schema Registry, convert
// structured data of key and value columns to Avro binary data by reading the schema
// info from the Schema Registry. The converted data is saved to Kafka as a Kafka topic "t".
iteblogDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
通過檔案設定 Avro 模式
我們還可以将 Avro 模式寫入到檔案裡面,然後在代碼裡面讀取模式檔案:
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "iteblog1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro('value, jsonFormatSchema) as 'user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as 'value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "iteblog2")
.start()
與 Databricks spark-avro的相容性
因為 Spark 内置對讀寫 Avro 資料的支援是從
Spark 2.4才引入的,是以在這些版本之前,可能有使用者已經使用了 Databricks 開源的
spark-avro。但是不用急,内置的 spark-avro 子產品和這個是完全相容的。我們僅僅需要将之前引入的 com.databricks.spark.avro 修改成 org.apache.spark.sql.avro._ 即可。
性能測試
基于
SPARK-24800的優化,内置 Avro 資料源讀寫 Avro 檔案的性能得到很大提升。社群在這方面進行了相關的基準測試,結果表明,在1百萬行的資料(包含 Int/Double/String/Map/Array/Struct 等各種資料格式)測試中,讀取的性能提升了2倍,寫的性能提升了8%。基準測試的代碼可參見
這裡,測試比較如下:

結論
内置的 spark-avro 子產品為 Spark SQL 和 Structured Streaming 提供了更好的使用者體驗以及 IO 性能。
本文轉載自:
https://blog.csdn.net/bingdianone/article/details/84965861#_Databricks_sparkavro_124英文原文位址:
https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html