SparkSQL 的概念
Spark SQL 是一個用來處理結構化資料的 spark 元件,也可被視為一個分布式的 SQL 查詢引擎。與基礎
的 Spark RDD API 不同, Spark SQL 提供了查詢結構化資料及計算結果等資訊的接口。在内部, Spark
SQL 使用這個額外的資訊去執行額外的優化.有幾種方式可以跟 Spark SQL 進行互動, 包括 SQL 和
Dataset API(DSL)。
什麼是 DataFrames(1.3)
dataframe 是從 1.3 版本引入的
與 RDD 相似,DataFrame 也是一個不可變分布式的資料集合。但與 RDD 不同的是,資料都被組織到有名
字的列中,就像關系型資料庫中的表一樣,除了資料之外,還記錄着資料的結構資訊,即 schema。設計
DataFrame的目的就是要讓對大型資料集的處理變得更簡單,它讓開發者可以為分布式的資料集指定一個
模式,進行更高層次的抽象。還提供了特定領域内專用的API來處理分布式資料。
從API易用性的角度上 看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加
友好,門檻更低。由于與R和Pandas的DataFrame類似,Spark DataFrame很好地繼承了傳統單機數
據分析的開發體驗。
dataframe = rdd + schema
dataframe就是帶着schema資訊的rdd
建立 DataFrames
1.在本地建立一個檔案,有三列,分别是 id、name、age,用空格分隔,然後上傳到 hdfs 上 hdfs dfs -put person.txt /
2.在 spark shell 執行下面指令,讀取資料,将每一行的資料使用列分隔符分割
val lineRDD = sc.textFile("hdfs://hdp-01:9000/person.txt").map(_.split(" "))
3.定義 case class(相當于表的 schema) case class Person(id:Int, name:String, age:Int)
4.将 RDD 和 case class 關聯 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
5.将 RDD 轉換成 DataFrame val personDF = personRDD.toDF
6.對 DataFrame 進行處理 personDF.show
使用簡單案例wordcount 為例吧
1. 使用的spark 1.x 的寫法 使用的RDD轉換 DataFrame
DataFrame
1.使用的SQL 文法
package org.yonggan
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* spark 1.x 版本中 DataFrame建立
*/
object WordCountDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val wcRdd = sc.textFile("wc.txt")
//單詞切分
val wordAndOne: RDD[Word] = wcRdd.flatMap(_.split(" ")).map(Word(_))
import sqlc.implicits._
// 建立 word DataFrame
val wordDf: DataFrame = wordAndOne.toDF()
//注冊 中間 表
wordDf.registerTempTable("t_word")
// , count(1)
val resFrame = sqlc.sql("select word,count(1) from t_word group by word")
resFrame.show()
sc.stop()
}
}
case class Word(word:String)
2. 使用的DSL文法
package org.yonggan
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql._
object WordCountDemo02 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val wcRdd = sc.textFile("wc.txt")
//單詞切分
val rowRdd: RDD[Row] = wcRdd.flatMap(_.split(" ")).map(Row(_))
// schema
val schema = StructType(List(StructField("name", StringType, true)))
//建立DataFrame
val wdf: DataFrame = sqlc.createDataFrame(rowRdd, schema)
// 建立查詢條件
val resDf = wdf.select("name").groupBy("name").count()
resDf.show()
// val wcDf: RelationalGroupedDataset = wdf.select("name", "count(1)").groupBy("name")
}
}
使用的Dataset (spark2.x引入)
建立初始化資訊 使用的新的api 接口SparkSession
val session = SparkSession.builder().getOrCreate()
1. 使用的SQL
package org.yonggan
import org.apache.spark.sql.{Dataset, SparkSession}
object DatasetWordCount01 {
def main(args: Array[String]): Unit = {
/**
* spark 2.x 版本過後使用 新版本api
*/
val session = SparkSession.builder()
.appName("app")
.master("local")
.getOrCreate()
val sqlContext = session.sqlContext
val wcDs = sqlContext.read.textFile("wc.txt")
// 導入隐式轉換
import session.implicits._
val wordData: Dataset[String] = wcDs.flatMap(_.split(" "))
wordData.createTempView("t_word")
// wordData.printSchema()
// 查詢結果
val resDf = session.sql("select value, count(1) from t_word group by value")
resDf.show()
}
}
2. 使用的DSL
package org.yonggan
import org.apache.spark.sql.{Dataset, Row, SparkSession}
object DatasetWordCount02 {
def main(args: Array[String]): Unit = {
/**
* spark 2.x 版本過後使用 新版本api
*/
val session = SparkSession.builder()
.appName("app")
.master("local")
.getOrCreate()
val sqlContext = session.sqlContext
val wcDs = sqlContext.read.textFile("wc.txt")
// 導入隐式轉換
import session.implicits._
val wordData: Dataset[String] = wcDs.flatMap(_.split(" "))
wordData.createTempView("t_word")
// 查詢結果
import org.apache.spark.sql.functions._
val wDs = wordData.groupBy("value").agg(count("value") as "cts")
//排序
val resDataset: Dataset[Row] = wDs.orderBy($"cts" desc)
resDataset.show()
}
}