天天看點

Spark Core元件:RDD、DataFrame和DataSet

1. 介紹

spark生态系統中,Spark Core,包括各種Spark的各種核心元件,它們能夠對記憶體和硬碟進行操作,或者調用CPU進行計算。

spark core定義了RDD、DataFrame和DataSet

spark最初隻有RDD,DataFrame在Spark 1.3中被首次釋出,DataSet在Spark1.6版本中被加入。

2. RDD

RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一個隻讀的,可分區的分布式資料集,這個資料集的全部或部分可以緩存在記憶體中,在多次計算間重用。
優點:

編譯時類型安全

編譯時就能檢查出類型錯誤

面向對象的程式設計風格

直接通過類名點的方式來操作資料

缺點:

序列化和反序列化的性能開銷

無論是叢集間的通信, 還是IO操作都需要對對象的結構和資料進行序列化和反序列化.

GC的性能開銷

頻繁的建立和銷毀對象, 勢必會增加GC

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Run {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val sqlContext = new SQLContext(sc)

    /**
      * id      age
      * 1       30
      * 2       29
      * 3       21
      */
    case class Person(id: Int, age: Int)
    val idAgeRDDPerson = sc.parallelize(Array(Person(1, 30), Person(2, 29), Person(3, 21)))

    // 優點1
    // idAge.filter(_.age > "") // 編譯時報錯, int不能跟String比

    // 優點2
    idAgeRDDPerson.filter(_.age > 25) // 直接操作一個個的person對象
  }
}           

3. DataFrame

在Spark中,DataFrame是一種以RDD為基礎的分布式資料集,類似于傳統資料庫中的二維表格。DataFrame與RDD的主要差別在于,前者帶有schema元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構資訊,進而對藏于DataFrame背後的資料源以及作用于DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運作時效率的目标。反觀RDD,由于無從得知所存資料元素的具體内部結構,Spark Core隻能在stage層面進行簡單、通用的流水線優化。

DataFrame引入了schema和off-heap

schema : RDD每一行的資料, 結構都是一樣的.

這個結構就存儲在schema中。 Spark通過schame就能夠讀懂資料, 是以在通信和IO時就隻需要序列化和反序列化資料,而結構的部分就可以省略了。 off-heap : 意味着JVM堆以外的記憶體,這些記憶體直接受作業系統管理(而不是JVM)。Spark能夠以二進制的形式序列化資料(不包括結構)到off-heap中,當要操作資料時,就直接操作off-heap記憶體。由于Spark了解schema,是以知道該如何操作。

off-heap就像地盤,schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制,也就不再收GC的困擾了。通過schema和off-heap,DataFrame解決了RDD的缺點,但是卻丢了RDD的優點。 DataFrame不是類型安全的, API也不是面向對象風格的。

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Run {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val sqlContext = new SQLContext(sc)
    /**
      * id      age
      * 1       30
      * 2       29
      * 3       21
      */
    val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

    val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

    val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)
    // API不是面向對象的
    idAgeDF.filter(idAgeDF.col("age") > 25) 
    // 不會報錯, DataFrame不是編譯時類型安全的
    idAgeDF.filter(idAgeDF.col("age") > "") 
  }
}           

4. DataSet

Dataset是一個強類型的特定領域的對象,這種對象可以函數式或者關系操作并行地轉換。每個Dataset也有一個被稱為一個DataFrame的類型化視圖,這種DataFrame是Row類型的Dataset,即Dataset[Row]

Dataset是“懶惰”的,隻在執行行動操作時觸發計算。本質上,資料集表示一個邏輯計劃,該計劃描述了産生資料所需的計算。當執行行動操作時,Spark的查詢優化程式優化邏輯計劃,并生成一個高效的并行和分布式實體計劃。

DataSet結合了RDD和DataFrame的優點,,并帶來的一個新的概念Encoder 當序列化資料時,Encoder産生位元組碼與off-heap進行互動,能夠達到按需通路資料的效果, 而不用反序列化整個對象。 Spark還沒有提供自定義Encoder的API,但是未來會加入。

下面看DataFrame和DataSet在2.0.0-preview中的實作

下面這段代碼, 在1.6.x中建立的是DataFrame
// 上文DataFrame示例中提取出來的
val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)           
但是同樣的代碼在2.0.0-preview中, 建立的雖然還叫DataFrame

// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的實作, 傳回值依然是DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
sparkSession.createDataFrame(rowRDD, schema)
}           
但是其實卻是DataSet, 因為DataFrame被聲明為Dataset[Row]

package object sql {
  // ...省略了不相關的代碼

  type DataFrame = Dataset[Row]
}           
是以當我們從1.6.x遷移到2.0.0的時候, 無需任何修改就直接用上了DataSet.

下面是一段DataSet的示例代碼

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local") // 調試的時候一定不要用local[*]
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

    val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

    // 在2.0.0-preview中這行代碼建立出的DataFrame, 其實是DataSet[Row]
    val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema)

    // 在2.0.0-preview中, 還不支援自定的Encoder, Row類型不行, 自定義的bean也不行
    // 官方文檔也有寫通過bean建立Dataset的例子,但是我運作時并不能成功
    // 是以目前需要用建立DataFrame的方法, 來建立DataSet[Row]
    // sqlContext.createDataset(idAgeRDDRow)

    // 目前支援String, Integer, Long等類型直接建立Dataset
    Seq(1, 2, 3).toDS().show()
    sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show()
  }
}           

5. RDD和DataFrame比較

DataFrame與RDD相同之處,都是不可變分布式彈性資料集。不同之處在于,DataFrame的資料集都是按指定列存儲,即結構化資料。類似于傳統資料庫中的表。

DataFrame的設計是為了讓大資料處理起來更容易。DataFrame允許開發者把結構化資料集導入DataFrame,并做了higher-level的抽象; DataFrame提供特定領域的語言(DSL)API來操作你的資料集。

上圖直覺地展現了DataFrame和RDD的差別。左側的RDD[Person]雖然以Person為類型參數,但Spark架構本身不了解Person類的内部結構。而右側的DataFrame卻提供了詳細的結構資訊,使得Spark SQL可以清楚地知道該資料集中包含哪些列,每列的名稱和類型各是什麼。DataFrame多了資料的結構資訊,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少資料讀取以及執行計劃的優化,比如filter下推、裁剪等。

6. RDD和DataSet比較

DataSet以Catalyst邏輯執行計劃表示,并且資料以編碼的二進制形式被存儲,不需要反序列化就可以執行sorting、shuffle等操作。

DataSet創立需要一個顯式的Encoder,把對象序列化為二進制,可以把對象的scheme映射為Spark SQl類型,然而RDD依賴于運作時反射機制。

通過上面兩點,DataSet的性能比RDD的要好很多

7. DataFrame和DataSet比較

Dataset可以認為是DataFrame的一個特例,主要差別是Dataset每一個record存儲的是一個強類型值而不是一個Row。是以具有如下三個特點:

1.DataSet可以在編譯時檢查類型

2.是面向對象的程式設計接口。用wordcount舉例:

3.後面版本DataFrame會繼承DataSet,DataFrame是面向Spark SQL的接口。

DataFrame和DataSet可以互相轉化,df.as[ElementType]這樣可以把DataFrame轉化為DataSet,ds.toDF()這樣可以把DataSet轉化為DataFrame。

//DataFrame

// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
  .flatMap(_.split(" "))               // Split on whitespace
  .filter(_ != "")                     // Filter empty words
  .toDF()                              // Convert to DataFrame to perform aggregation / sorting
  .groupBy($"value")                   // Count number of occurences of each word
  .agg(count("*") as "numOccurances")
  .orderBy($"numOccurances" desc)      // Show most common words first

//DataSet,完全使用scala程式設計,不要切換到DataFrame

val wordCount =
  ds.flatMap(_.split(" "))
    .filter(_ != "")
    .groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
    .count()
           

8. 應用場景

什麼時候用RDD?使用RDD的一般場景:

你需要使用low-level的transformation和action來控制你的資料集;

你得資料集非結構化,比如,流媒體或者文本流;

你想使用函數式程式設計來操作你得資料,而不是用特定領域語言(DSL)表達;

你不在乎schema,比如,當通過名字或者列處理(或通路)資料屬性不在意列式存儲格式;

你放棄使用DataFrame和Dataset來優化結構化和半結構化資料集

RDD在Apache Spark 2.0中慘遭抛棄?

答案當然是 NO !

通過後面的描述你會得知:Spark使用者可以在RDD,DataFrame和Dataset三種資料集之間無縫轉換,而是隻需使用超級簡單的API方法。

什麼時候使用DataFrame或者Dataset?

你想使用豐富的語義,high-level抽象,和特定領域語言API,那你可DataFrame或者Dataset;

你處理的半結構化資料集需要high-level表達, filter,map,aggregation,average,sum ,SQL 查詢,列式通路和使用lambda函數,那你可DataFrame或者Dataset;

你想利用編譯時高度的type-safety,Catalyst優化和Tungsten的code生成,那你可DataFrame或者Dataset;

你想統一和簡化API使用跨Spark的Library,那你可DataFrame或者Dataset;

如果你是一個R使用者,那你可DataFrame或者Dataset;

如果你是一個Python使用者,那你可DataFrame或者Dataset;

你可以無縫的把DataFrame或者Dataset轉化成一個RDD,隻需簡單的調用 .rdd:

// select specific fields from the Dataset, apply a predicate
// using the where() method, convert to an RDD, and show first 10
// RDD rows

val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300)
// convert to RDDs and take the first 10 rows

val eventsRDD = deviceEventsDS.rdd.take(10)           

繼續閱讀