1. 介紹
spark生态系統中,Spark Core,包括各種Spark的各種核心元件,它們能夠對記憶體和硬碟進行操作,或者調用CPU進行計算。
spark core定義了RDD、DataFrame和DataSet
spark最初隻有RDD,DataFrame在Spark 1.3中被首次釋出,DataSet在Spark1.6版本中被加入。
2. RDD
RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一個隻讀的,可分區的分布式資料集,這個資料集的全部或部分可以緩存在記憶體中,在多次計算間重用。
- 優點:
-
編譯時類型安全
編譯時就能檢查出類型錯誤
面向對象的程式設計風格
直接通過類名點的方式來操作資料
- 缺點:
-
序列化和反序列化的性能開銷
無論是叢集間的通信, 還是IO操作都需要對對象的結構和資料進行序列化和反序列化.
GC的性能開銷
頻繁的建立和銷毀對象, 勢必會增加GC
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object Run {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
/**
* id age
* 1 30
* 2 29
* 3 21
*/
case class Person(id: Int, age: Int)
val idAgeRDDPerson = sc.parallelize(Array(Person(1, 30), Person(2, 29), Person(3, 21)))
// 優點1
// idAge.filter(_.age > "") // 編譯時報錯, int不能跟String比
// 優點2
idAgeRDDPerson.filter(_.age > 25) // 直接操作一個個的person對象
}
}
3. DataFrame
在Spark中,DataFrame是一種以RDD為基礎的分布式資料集,類似于傳統資料庫中的二維表格。DataFrame與RDD的主要差別在于,前者帶有schema元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構資訊,進而對藏于DataFrame背後的資料源以及作用于DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運作時效率的目标。反觀RDD,由于無從得知所存資料元素的具體内部結構,Spark Core隻能在stage層面進行簡單、通用的流水線優化。
DataFrame引入了schema和off-heap
schema : RDD每一行的資料, 結構都是一樣的.
這個結構就存儲在schema中。 Spark通過schame就能夠讀懂資料, 是以在通信和IO時就隻需要序列化和反序列化資料,而結構的部分就可以省略了。 off-heap : 意味着JVM堆以外的記憶體,這些記憶體直接受作業系統管理(而不是JVM)。Spark能夠以二進制的形式序列化資料(不包括結構)到off-heap中,當要操作資料時,就直接操作off-heap記憶體。由于Spark了解schema,是以知道該如何操作。
off-heap就像地盤,schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制,也就不再收GC的困擾了。通過schema和off-heap,DataFrame解決了RDD的缺點,但是卻丢了RDD的優點。 DataFrame不是類型安全的, API也不是面向對象風格的。
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Run {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
/**
* id age
* 1 30
* 2 29
* 3 21
*/
val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))
val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))
val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)
// API不是面向對象的
idAgeDF.filter(idAgeDF.col("age") > 25)
// 不會報錯, DataFrame不是編譯時類型安全的
idAgeDF.filter(idAgeDF.col("age") > "")
}
}
4. DataSet
Dataset是一個強類型的特定領域的對象,這種對象可以函數式或者關系操作并行地轉換。每個Dataset也有一個被稱為一個DataFrame的類型化視圖,這種DataFrame是Row類型的Dataset,即Dataset[Row]
Dataset是“懶惰”的,隻在執行行動操作時觸發計算。本質上,資料集表示一個邏輯計劃,該計劃描述了産生資料所需的計算。當執行行動操作時,Spark的查詢優化程式優化邏輯計劃,并生成一個高效的并行和分布式實體計劃。
DataSet結合了RDD和DataFrame的優點,,并帶來的一個新的概念Encoder 當序列化資料時,Encoder産生位元組碼與off-heap進行互動,能夠達到按需通路資料的效果, 而不用反序列化整個對象。 Spark還沒有提供自定義Encoder的API,但是未來會加入。
下面看DataFrame和DataSet在2.0.0-preview中的實作
下面這段代碼, 在1.6.x中建立的是DataFrame
// 上文DataFrame示例中提取出來的
val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))
val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))
val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)
但是同樣的代碼在2.0.0-preview中, 建立的雖然還叫DataFrame
// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的實作, 傳回值依然是DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
sparkSession.createDataFrame(rowRDD, schema)
}
但是其實卻是DataSet, 因為DataFrame被聲明為Dataset[Row]
package object sql {
// ...省略了不相關的代碼
type DataFrame = Dataset[Row]
}
是以當我們從1.6.x遷移到2.0.0的時候, 無需任何修改就直接用上了DataSet.
下面是一段DataSet的示例代碼
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local") // 調試的時候一定不要用local[*]
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))
val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))
// 在2.0.0-preview中這行代碼建立出的DataFrame, 其實是DataSet[Row]
val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema)
// 在2.0.0-preview中, 還不支援自定的Encoder, Row類型不行, 自定義的bean也不行
// 官方文檔也有寫通過bean建立Dataset的例子,但是我運作時并不能成功
// 是以目前需要用建立DataFrame的方法, 來建立DataSet[Row]
// sqlContext.createDataset(idAgeRDDRow)
// 目前支援String, Integer, Long等類型直接建立Dataset
Seq(1, 2, 3).toDS().show()
sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show()
}
}
5. RDD和DataFrame比較
DataFrame與RDD相同之處,都是不可變分布式彈性資料集。不同之處在于,DataFrame的資料集都是按指定列存儲,即結構化資料。類似于傳統資料庫中的表。
DataFrame的設計是為了讓大資料處理起來更容易。DataFrame允許開發者把結構化資料集導入DataFrame,并做了higher-level的抽象; DataFrame提供特定領域的語言(DSL)API來操作你的資料集。
上圖直覺地展現了DataFrame和RDD的差別。左側的RDD[Person]雖然以Person為類型參數,但Spark架構本身不了解Person類的内部結構。而右側的DataFrame卻提供了詳細的結構資訊,使得Spark SQL可以清楚地知道該資料集中包含哪些列,每列的名稱和類型各是什麼。DataFrame多了資料的結構資訊,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少資料讀取以及執行計劃的優化,比如filter下推、裁剪等。
6. RDD和DataSet比較
DataSet以Catalyst邏輯執行計劃表示,并且資料以編碼的二進制形式被存儲,不需要反序列化就可以執行sorting、shuffle等操作。
DataSet創立需要一個顯式的Encoder,把對象序列化為二進制,可以把對象的scheme映射為Spark SQl類型,然而RDD依賴于運作時反射機制。
通過上面兩點,DataSet的性能比RDD的要好很多
7. DataFrame和DataSet比較
- Dataset可以認為是DataFrame的一個特例,主要差別是Dataset每一個record存儲的是一個強類型值而不是一個Row。是以具有如下三個特點:
-
1.DataSet可以在編譯時檢查類型
2.是面向對象的程式設計接口。用wordcount舉例:
3.後面版本DataFrame會繼承DataSet,DataFrame是面向Spark SQL的接口。
DataFrame和DataSet可以互相轉化,df.as[ElementType]這樣可以把DataFrame轉化為DataSet,ds.toDF()這樣可以把DataSet轉化為DataFrame。
//DataFrame
// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
.flatMap(_.split(" ")) // Split on whitespace
.filter(_ != "") // Filter empty words
.toDF() // Convert to DataFrame to perform aggregation / sorting
.groupBy($"value") // Count number of occurences of each word
.agg(count("*") as "numOccurances")
.orderBy($"numOccurances" desc) // Show most common words first
//DataSet,完全使用scala程式設計,不要切換到DataFrame
val wordCount =
ds.flatMap(_.split(" "))
.filter(_ != "")
.groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()
8. 應用場景
- 什麼時候用RDD?使用RDD的一般場景:
-
你需要使用low-level的transformation和action來控制你的資料集;
你得資料集非結構化,比如,流媒體或者文本流;
你想使用函數式程式設計來操作你得資料,而不是用特定領域語言(DSL)表達;
你不在乎schema,比如,當通過名字或者列處理(或通路)資料屬性不在意列式存儲格式;
你放棄使用DataFrame和Dataset來優化結構化和半結構化資料集
RDD在Apache Spark 2.0中慘遭抛棄?
答案當然是 NO !
通過後面的描述你會得知:Spark使用者可以在RDD,DataFrame和Dataset三種資料集之間無縫轉換,而是隻需使用超級簡單的API方法。
- 什麼時候使用DataFrame或者Dataset?
-
你想使用豐富的語義,high-level抽象,和特定領域語言API,那你可DataFrame或者Dataset;
你處理的半結構化資料集需要high-level表達, filter,map,aggregation,average,sum ,SQL 查詢,列式通路和使用lambda函數,那你可DataFrame或者Dataset;
你想利用編譯時高度的type-safety,Catalyst優化和Tungsten的code生成,那你可DataFrame或者Dataset;
你想統一和簡化API使用跨Spark的Library,那你可DataFrame或者Dataset;
如果你是一個R使用者,那你可DataFrame或者Dataset;
如果你是一個Python使用者,那你可DataFrame或者Dataset;
你可以無縫的把DataFrame或者Dataset轉化成一個RDD,隻需簡單的調用 .rdd:
// select specific fields from the Dataset, apply a predicate
// using the where() method, convert to an RDD, and show first 10
// RDD rows
val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300)
// convert to RDDs and take the first 10 rows
val eventsRDD = deviceEventsDS.rdd.take(10)