天天看点

SparkSql------两种操作数据的方式(DSL和SQL)测试数据DSL风格语法SQL风格语法

测试数据

{"name":"aaa", "age":20}
{"name":"lbbb", "age":30, "facevalue":80}
{"name":"ccc", "age":28, "facevalue":80}
{"name":"ddd", "age":28, "facevalue":90}
           

DSL风格语法

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object DSLStyle {
  def main(args:Array[String]):Unit = {
    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val df: DataFrame = spark.read.json("people.json")
    //DSL风格语法:
    df.show()
//      +---+---------+--------+
//      |age|facevalue|    name|
//      +---+---------+--------+
//      | 20|     null|     aaa|
//      | 30|       80|     bbb|
//      | 28|       80|     ccc|
//      | 28|       90|     ddd|
//      +---+---------+--------+
    import spark.implicits._
    // 打印Schema信息
    df.printSchema()
//    root
//    |-- age: long (nullable = true)
//    |-- facevalue: long (nullable = true)
//    |-- name: string (nullable = true)

    //直接对DataFrame进行操作
    df.select("name").show()
    df.select($"name", $"age" + 1).show()
    df.filter($"age" > 21).show()
    df.groupBy($"age").count().show()
    spark.stop()
  }
           

SQL风格语法

import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
object SqlStyle {
  def main(args:Array[String]):Unit = {
    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val df: DataFrame = spark.read.json("people.json")
   
    //临时表是Session范围内的,Session退出后,表就失效了
    //一个SparkSession结束后,表自动删除
    df.createOrReplaceTempView("people")
    spark.sql("SELECT * FROM people").show()
    spark.sql("SELECT * FROM people where age>20").show()

    //如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.people
    //应用级别内可以访问,一个SparkContext结束后,表自动删除 一个SparkContext可以多次创建SparkSession
    //使用的比较少
    df.createGlobalTempView("people")
    //创建名后需要必须添加global_temp才可以
    spark.sql("SELECT * FROM global_temp.people").show()
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    spark.stop()
  }
}
           

继续阅读