测试数据
{"name":"aaa", "age":20}
{"name":"lbbb", "age":30, "facevalue":80}
{"name":"ccc", "age":28, "facevalue":80}
{"name":"ddd", "age":28, "facevalue":90}
DSL风格语法
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
object DSLStyle {
def main(args:Array[String]):Unit = {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
val spark = SparkSession.builder().config(conf).getOrCreate()
val df: DataFrame = spark.read.json("people.json")
//DSL风格语法:
df.show()
// +---+---------+--------+
// |age|facevalue| name|
// +---+---------+--------+
// | 20| null| aaa|
// | 30| 80| bbb|
// | 28| 80| ccc|
// | 28| 90| ddd|
// +---+---------+--------+
import spark.implicits._
// 打印Schema信息
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- facevalue: long (nullable = true)
// |-- name: string (nullable = true)
//直接对DataFrame进行操作
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy($"age").count().show()
spark.stop()
}
SQL风格语法
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
object SqlStyle {
def main(args:Array[String]):Unit = {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
val spark = SparkSession.builder().config(conf).getOrCreate()
val df: DataFrame = spark.read.json("people.json")
//临时表是Session范围内的,Session退出后,表就失效了
//一个SparkSession结束后,表自动删除
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people").show()
spark.sql("SELECT * FROM people where age>20").show()
//如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.people
//应用级别内可以访问,一个SparkContext结束后,表自动删除 一个SparkContext可以多次创建SparkSession
//使用的比较少
df.createGlobalTempView("people")
//创建名后需要必须添加global_temp才可以
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()
spark.stop()
}
}