天天看點

spark-sql 1.x版本與2.x版本的wordcount實作

spark-sql的操作對象由dataframe變為了dataset

spark-sql是spark的一個元件,可以在spark程式中進行SQL查詢。是一個分布式的sql查詢引擎。

spark-sql在1.3的版本中開始使用dataframe對象,來對資料進行操作。

dataframe=rdd+schema文法風格分為兩種:SQL文法風格:dataframe對象注冊虛拟表後查詢,DSL文法風格dataframe對象直接使用方法。

1.spark-sql 的1.x版本的wc

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 
  * 使用spark-SQL的1.x版本實作wordcount
  */
object WCpre {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    .setMaster("local")
    .setAppName(WCpre.getClass.getName)
           
//設定rdd 持久化到記憶體和磁盤的序列化方式|||worker之間類的傳遞的序列化方式
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
           
//可以放在任何位置,值的比較可以使用 = = = 3個等号篩選
           
dataDF.select("*").groupBy("word").count().withColumnRenamed("count","cns").where($"cns" > 1).orderBy($"cns" desc).show()



  }
}
//樣例類用來提供schema資訊。
case class WC(word:String)
           

1.建立SparkContext

2.建立SQLContext

 3.建立RDD

 4.建立一個樣例類,并定義類的成員變量

 5.整理資料并關聯class

 6.将RDD轉換成DataFrame(導入隐式轉換)

 7.将DataFrame注冊成臨時表

 8.書寫SQL(Transformation)

 9.執行Action

2.spark-sql 2.x 版本的wc

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext, SparkSession}


/**
  * 
  * spark2.x  統一了程式入口api
  */
object WCNow {

  def main(args: Array[String]): Unit = {

    val session: SparkSession = SparkSession.builder()
      .master("local")
      .appName(WCNow.getClass.getName)
      //如果該對象存在直接調用,不存在就建立
      .getOrCreate()

    import session.implicits._

    //可以得到之前的版本定義的sc,sqlContext對象,特點...
    val sqlContext: SQLContext = session.sqlContext
    val sc: SparkContext = session.sparkContext

    //讀取檔案,dataset預設存在schema資訊為value
    val file: Dataset[String] = session.read.textFile("wc.txt")
    val words: Dataset[String] = file.flatMap(_.split(" "))
    //設定自定義schema
    val wDF: DataFrame = words.toDF("word")
    wDF.createTempView("t_wc")
    session.sql("select word,count(*) cns from t_wc group by word order by cns desc").show()


    wDF.select("word","cns").groupBy("word").count().withColumnRenamed("count","cns").where("cns > 1").orderBy($"cns" desc).show()

  }
}
           

3.DataSet結合了RDD和DataFrame的優點(類型安全、面向對象、不受JVM限制、沒有GC開銷)

繼續閱讀