spark-sql的操作對象由dataframe變為了dataset
spark-sql是spark的一個元件,可以在spark程式中進行SQL查詢。是一個分布式的sql查詢引擎。
spark-sql在1.3的版本中開始使用dataframe對象,來對資料進行操作。
dataframe=rdd+schema文法風格分為兩種:SQL文法風格:dataframe對象注冊虛拟表後查詢,DSL文法風格dataframe對象直接使用方法。
1.spark-sql 的1.x版本的wc
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
*
* 使用spark-SQL的1.x版本實作wordcount
*/
object WCpre {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName(WCpre.getClass.getName)
//設定rdd 持久化到記憶體和磁盤的序列化方式|||worker之間類的傳遞的序列化方式
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//可以放在任何位置,值的比較可以使用 = = = 3個等号篩選
dataDF.select("*").groupBy("word").count().withColumnRenamed("count","cns").where($"cns" > 1).orderBy($"cns" desc).show()
}
}
//樣例類用來提供schema資訊。
case class WC(word:String)
1.建立SparkContext
2.建立SQLContext
3.建立RDD
4.建立一個樣例類,并定義類的成員變量
5.整理資料并關聯class
6.将RDD轉換成DataFrame(導入隐式轉換)
7.将DataFrame注冊成臨時表
8.書寫SQL(Transformation)
9.執行Action
2.spark-sql 2.x 版本的wc
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext, SparkSession}
/**
*
* spark2.x 統一了程式入口api
*/
object WCNow {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder()
.master("local")
.appName(WCNow.getClass.getName)
//如果該對象存在直接調用,不存在就建立
.getOrCreate()
import session.implicits._
//可以得到之前的版本定義的sc,sqlContext對象,特點...
val sqlContext: SQLContext = session.sqlContext
val sc: SparkContext = session.sparkContext
//讀取檔案,dataset預設存在schema資訊為value
val file: Dataset[String] = session.read.textFile("wc.txt")
val words: Dataset[String] = file.flatMap(_.split(" "))
//設定自定義schema
val wDF: DataFrame = words.toDF("word")
wDF.createTempView("t_wc")
session.sql("select word,count(*) cns from t_wc group by word order by cns desc").show()
wDF.select("word","cns").groupBy("word").count().withColumnRenamed("count","cns").where("cns > 1").orderBy($"cns" desc).show()
}
}
3.DataSet結合了RDD和DataFrame的優點(類型安全、面向對象、不受JVM限制、沒有GC開銷)