天天看点

spark-sql 1.x版本与2.x版本的wordcount实现

spark-sql的操作对象由dataframe变为了dataset

spark-sql是spark的一个组件,可以在spark程序中进行SQL查询。是一个分布式的sql查询引擎。

spark-sql在1.3的版本中开始使用dataframe对象,来对数据进行操作。

dataframe=rdd+schema语法风格分为两种:SQL语法风格:dataframe对象注册虚拟表后查询,DSL语法风格dataframe对象直接使用方法。

1.spark-sql 的1.x版本的wc

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 
  * 使用spark-SQL的1.x版本实现wordcount
  */
object WCpre {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    .setMaster("local")
    .setAppName(WCpre.getClass.getName)
           
//设置rdd 持久化到内存和磁盘的序列化方式|||worker之间类的传递的序列化方式
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
           
//可以放在任何位置,值的比较可以使用 = = = 3个等号筛选
           
dataDF.select("*").groupBy("word").count().withColumnRenamed("count","cns").where($"cns" > 1).orderBy($"cns" desc).show()



  }
}
//样例类用来提供schema信息。
case class WC(word:String)
           

1.创建SparkContext

2.创建SQLContext

 3.创建RDD

 4.创建一个样例类,并定义类的成员变量

 5.整理数据并关联class

 6.将RDD转换成DataFrame(导入隐式转换)

 7.将DataFrame注册成临时表

 8.书写SQL(Transformation)

 9.执行Action

2.spark-sql 2.x 版本的wc

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext, SparkSession}


/**
  * 
  * spark2.x  统一了程序入口api
  */
object WCNow {

  def main(args: Array[String]): Unit = {

    val session: SparkSession = SparkSession.builder()
      .master("local")
      .appName(WCNow.getClass.getName)
      //如果该对象存在直接调用,不存在就创建
      .getOrCreate()

    import session.implicits._

    //可以得到之前的版本定义的sc,sqlContext对象,特点...
    val sqlContext: SQLContext = session.sqlContext
    val sc: SparkContext = session.sparkContext

    //读取文件,dataset默认存在schema信息为value
    val file: Dataset[String] = session.read.textFile("wc.txt")
    val words: Dataset[String] = file.flatMap(_.split(" "))
    //设置自定义schema
    val wDF: DataFrame = words.toDF("word")
    wDF.createTempView("t_wc")
    session.sql("select word,count(*) cns from t_wc group by word order by cns desc").show()


    wDF.select("word","cns").groupBy("word").count().withColumnRenamed("count","cns").where("cns > 1").orderBy($"cns" desc).show()

  }
}
           

3.DataSet结合了RDD和DataFrame的优点(类型安全、面向对象、不受JVM限制、没有GC开销)

继续阅读