天天看點

SparkSQL2.x 中的DataFrame和Dataset建立與使用

 SparkSQL 的概念 

Spark SQL 是一個用來處理結構化資料的 spark 元件,也可被視為一個分布式的 SQL 查詢引擎。與基礎

的 Spark RDD API 不同, Spark SQL 提供了查詢結構化資料及計算結果等資訊的接口。在内部, Spark 

SQL 使用這個額外的資訊去執行額外的優化.有幾種方式可以跟 Spark SQL 進行互動, 包括 SQL 和 

Dataset API(DSL)。 

 什麼是 DataFrames(1.3)

dataframe 是從 1.3 版本引入的 

與 RDD 相似,DataFrame 也是一個不可變分布式的資料集合。但與 RDD 不同的是,資料都被組織到有名

字的列中,就像關系型資料庫中的表一樣,除了資料之外,還記錄着資料的結構資訊,即 schema。設計

DataFrame的目的就是要讓對大型資料集的處理變得更簡單,它讓開發者可以為分布式的資料集指定一個

模式,進行更高層次的抽象。還提供了特定領域内專用的API來處理分布式資料。 

從API易用性的角度上 看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加

友好,門檻更低。由于與R和Pandas的DataFrame類似,Spark DataFrame很好地繼承了傳統單機數

據分析的開發體驗。 

dataframe = rdd + schema 

dataframe就是帶着schema資訊的rdd  

SparkSQL2.x 中的DataFrame和Dataset建立與使用

 建立 DataFrames 

1.在本地建立一個檔案,有三列,分别是 id、name、age,用空格分隔,然後上傳到 hdfs 上 hdfs dfs -put person.txt / 

2.在 spark shell 執行下面指令,讀取資料,将每一行的資料使用列分隔符分割 

val lineRDD = sc.textFile("hdfs://hdp-01:9000/person.txt").map(_.split(" ")) 

3.定義 case class(相當于表的 schema) case class Person(id:Int, name:String, age:Int) 

4.将 RDD 和 case class 關聯 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) 

5.将 RDD 轉換成 DataFrame val personDF = personRDD.toDF 

6.對 DataFrame 進行處理 personDF.show 

使用簡單案例wordcount 為例吧

1. 使用的spark 1.x 的寫法 使用的RDD轉換 DataFrame

DataFrame

1.使用的SQL 文法

package org.yonggan

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark 1.x  版本中 DataFrame建立
  */
object WordCountDemo {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("app").setMaster("local")

    val sc = new SparkContext(conf)
    val sqlc = new SQLContext(sc)

    val wcRdd = sc.textFile("wc.txt")

    //單詞切分
    val wordAndOne: RDD[Word] = wcRdd.flatMap(_.split(" ")).map(Word(_))

    import sqlc.implicits._
    // 建立 word DataFrame
    val wordDf: DataFrame = wordAndOne.toDF()

    //注冊 中間 表
    wordDf.registerTempTable("t_word")
//    , count(1)
    val resFrame = sqlc.sql("select word,count(1) from t_word group by word")

    resFrame.show()

    sc.stop()
  }

}

case class Word(word:String)
           

2. 使用的DSL文法

package org.yonggan

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql._

object WordCountDemo02 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("app").setMaster("local")

    val sc = new SparkContext(conf)
    val sqlc = new SQLContext(sc)

    val wcRdd = sc.textFile("wc.txt")

    //單詞切分
    val rowRdd: RDD[Row] = wcRdd.flatMap(_.split(" ")).map(Row(_))

    // schema
    val schema = StructType(List(StructField("name", StringType, true)))

    //建立DataFrame
    val wdf: DataFrame = sqlc.createDataFrame(rowRdd, schema)

    // 建立查詢條件
    val resDf = wdf.select("name").groupBy("name").count()
    resDf.show()

//    val wcDf: RelationalGroupedDataset = wdf.select("name", "count(1)").groupBy("name")

  }

}
           

使用的Dataset (spark2.x引入)

建立初始化資訊  使用的新的api 接口SparkSession

val session = SparkSession.builder().getOrCreate()
           

1. 使用的SQL

package org.yonggan

import org.apache.spark.sql.{Dataset, SparkSession}

object DatasetWordCount01 {

  def main(args: Array[String]): Unit = {

    /**
      * spark 2.x 版本過後使用 新版本api
      */
    val session = SparkSession.builder()
      .appName("app")
      .master("local")
      .getOrCreate()

    val sqlContext = session.sqlContext
    val wcDs = sqlContext.read.textFile("wc.txt")

    // 導入隐式轉換
    import session.implicits._
    val wordData: Dataset[String] = wcDs.flatMap(_.split(" "))
    wordData.createTempView("t_word")

    //    wordData.printSchema()
    // 查詢結果
    val resDf =  session.sql("select value, count(1) from t_word group by value")
    resDf.show()

  }

}
           

2. 使用的DSL

package org.yonggan

import org.apache.spark.sql.{Dataset, Row, SparkSession}

object DatasetWordCount02 {

  def main(args: Array[String]): Unit = {

    /**
      * spark 2.x 版本過後使用 新版本api
      */
    val session = SparkSession.builder()
      .appName("app")
      .master("local")
      .getOrCreate()

    val sqlContext = session.sqlContext
    val wcDs = sqlContext.read.textFile("wc.txt")

    // 導入隐式轉換
    import session.implicits._
    val wordData: Dataset[String] = wcDs.flatMap(_.split(" "))
    wordData.createTempView("t_word")

    // 查詢結果

    import org.apache.spark.sql.functions._
    val wDs = wordData.groupBy("value").agg(count("value") as "cts")

    //排序
    val resDataset: Dataset[Row] = wDs.orderBy($"cts" desc)
    resDataset.show()

  }

}
           

繼續閱讀