Spark-sql
一:DataFrame
DataFrame
是什么?
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有
schema
元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构 信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。同时,与Hive类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API更加友好,门槛更低。
Spark SQL的DataFrame API允许我们使用DataFrame而不用必须去注册临时表或者生成SQL 表达式。DataFrame API既有transformation操作也有action操作。
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame
有三种方式:
- 通过
的数据源进行创建。Spark
- 从一个存在的
进行转换。RDD
- 还可以从
进行查询返回。Table
1.1:从数据源进行创建
spark.read.
后面可以跟接的数据源。
以读取一个json文件为例,展示数据结构:
创建user.json文件:
代码读取示例:
object DataFrameExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_DataFrame")
val spark = SparkSession.builder().config(conf).getOrCreate()
val dataFrame = spark.read.json("user.json")
}
}
拿到
DataFrame
以后就可以获取所有的结构和数据:
获取结构
schema
:
dataFrame.schema.foreach(item => {
println("field name is " + item.name)
println("field type is " + item.dataType.typeName)
println("field nullable is " + item.nullable)
})
// field name is age
// field type is long
// field nullable is true
// field name is username
// field type is string
// field nullable is true
schema的每个字段类型都是
org.apache.spark.sql.types.StructField
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {
获取数据
rdd
:
dataFrame.rdd.foreach(item => {
for (index <- 0 until item.size) {
print(item.get(index) + " , ")
}
})
// or
dataFrame.rdd.foreach(println)
// 20 , zhangsan
// [20,zhangsan]
每个item的类型为
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
class GenericRowWithSchema(values: Array[Any],
override val schema: StructType)
extends GenericRow(values)
GenericRow
源码如下,所以
item.get()
调用的是
value
数组的值。
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
protected def this() = this(null)
def this(size: Int) = this(new Array[Any](size))
override def length: Int = values.length
override def get(i: Int): Any = values(i)
override def toSeq: Seq[Any] = values.clone()
override def copy(): GenericRow = this
}
对于
dataframe
可以直接使用
api
查询,返回的依然是
dataframe
// 引入隐式转换规则, 此处spark对应前面创建的sparkSession的名称
import spark.implicits._
dataFrame.select($"username", $"age" + 1).show()
// or
// dataFrame.select('username, 'age + 1).show()
// or
// dataframe.select(dataframe("age")).show()
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan| 21|
+--------+---------+
dataFrame的
sql
操作:
使用
DataFrame
创建一个临时表
// 创建一个局部临时表,仅对当前session有效,如果已经存在则报错
dataFrame.createTempView("user")
// 创建一个临时表,对所有session有效,如果已经存在则报错
dataFrame.createGlobalTempView("user")
// 创建一个局部临时表,如果已经存在则替换
dataFrame.createOrReplaceTempView("user")
// 创建一个临时表,如果已经存在则替换
dataFrame.createOrReplaceGlobalTempView("user")
使用全局临时表时需要全路径访问,如:global_temp.user
对表使用sql操作,返回的是一个dataframe类型。
1.2:RDD <=> DataFrame
在IDEA中开发程序时,如果需要RDD与DF或者DS之间互相操作,那么需要引入
import spark.implicits._
这里的spark不是Scala中的包名,而是创建的sparkSession对象的变量名称,所以必须先创建sparkSession对象再导入。这里的spark对象不能使用var声明,因为Scala只支持val修饰的对象的引入。
case class Person(name: String, age: Int)
import spark.implicits._
val rdd:RDD[String] = spark.sparkContext.textFile("person")
val dataFrame = rdd.map(item => {
val strings = item.split(",")
if(strings.length > 1) {
Person(strings(0), strings(1).toInt)
} else {
Person(null, 0)
}
}).toDF()
dataFrame.show(false)
1.3:DataFrame <=> RDD
DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD。
val rdd: RDD[Row]= dataFrame.rdd
二:DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
2.1:RDD <=> DataSet
case class Person(name: String, age: Int)
import spark.implicits._
val rdd:RDD[String] = spark.sparkContext.textFile("person")
val dataSet = rdd.map(item => {
val strings = item.split(",")
if(strings.length > 1) {
Person(strings(0), strings(1).toInt)
} else {
Person(null, 0)
}
}).toDS()
dataSet.show()
2.2:DataSet <=> RDD
dataSet.rdd
2.3:DataFrame <=> DataSet
DataFrame其实是 DataSet的特例,所以它们之间是可以互相转换的。
// 必须引入
import spark.implicits._
val rdd: RDD[String] = spark.sparkContext.textFile("person")
val dataframe = rdd.map(item => {
val strings = item.split(",")
if (strings.length > 1) {
Person(strings(0), strings(1).toInt)
} else {
Person(null, 0)
}
})
// DataFrame 转 DataSet
val dataSet: Dataset[Person] = dataframe.as[Person]
// DataSet 转 DataFrame
val dataFrame02 = dataSet.toDF()
三:数据的加载和保存
SparkSQL 默认读取和保存的文件格式为
parquet
。
3.1:Read
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
3.1.1:parquet
val dataframe = spark.read.load("output_person")
dataframe.show(false)
3.1.2:orc
3.1.3:json
3.1.4:csv
val dataframe = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("user.csv")
dataframe.show(false)
3.1.5:mysql
val dataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mysql")
.option("driver", "com.mysql.jdbc.cj.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "table")
.load()
dataFrame.show(false)
3.2:Write
3.2.1:Parquet
3.2.2:json
3.2.3:csv
val dataframe = spark.read.json("json_output")
dataframe.write
.mode(SaveMode.Append)
.option("header", "true")
.option("sep", ";")
.option("inferSchema", "true")
.csv("csv_output")
3.2.4:mysql
case class User(name: String, age: Long)
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_ReadAndWrite")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User("lisi", 20), User("zs", 30)))
val ds: Dataset[User] = rdd.toDS
ds.show(false)
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mysql")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "table")
.mode(SaveMode.Append)
.save()