Spark-sql
一:DataFrame
DataFrame
是什麼?
在Spark中,DataFrame是一種以RDD為基礎的分布式資料集,類似于傳統資料庫中的二維表格。DataFrame與RDD的主要差別在于,前者帶有
schema
元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構 資訊,進而對藏于DataFrame背後的資料源以及作用于DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運作時效率的目标。反觀RDD,由于無從得知所存資料元素的具體内部結構,Spark Core隻能在stage層面進行簡單、通用的流水線優化。同時,與Hive類似,DataFrame 也支援嵌套資料類型(struct、array 和 map)。從API易用性的角度上看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API更加友好,門檻更低。
Spark SQL的DataFrame API允許我們使用DataFrame而不用必須去注冊臨時表或者生成SQL 表達式。DataFrame API既有transformation操作也有action操作。
在Spark SQL中SparkSession是建立DataFrame和執行SQL的入口,建立DataFrame
有三種方式:
- 通過
的資料源進行建立。Spark
- 從一個存在的
進行轉換。RDD
- 還可以從
進行查詢傳回。Table
1.1:從資料源進行建立
spark.read.
後面可以跟接的資料源。
以讀取一個json檔案為例,展示資料結構:
建立user.json檔案:
代碼讀取示例:
object DataFrameExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_DataFrame")
val spark = SparkSession.builder().config(conf).getOrCreate()
val dataFrame = spark.read.json("user.json")
}
}
拿到
DataFrame
以後就可以擷取所有的結構和資料:
擷取結構
schema
:
dataFrame.schema.foreach(item => {
println("field name is " + item.name)
println("field type is " + item.dataType.typeName)
println("field nullable is " + item.nullable)
})
// field name is age
// field type is long
// field nullable is true
// field name is username
// field type is string
// field nullable is true
schema的每個字段類型都是
org.apache.spark.sql.types.StructField
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {
擷取資料
rdd
:
dataFrame.rdd.foreach(item => {
for (index <- 0 until item.size) {
print(item.get(index) + " , ")
}
})
// or
dataFrame.rdd.foreach(println)
// 20 , zhangsan
// [20,zhangsan]
每個item的類型為
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
class GenericRowWithSchema(values: Array[Any],
override val schema: StructType)
extends GenericRow(values)
GenericRow
源碼如下,是以
item.get()
調用的是
value
數組的值。
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
protected def this() = this(null)
def this(size: Int) = this(new Array[Any](size))
override def length: Int = values.length
override def get(i: Int): Any = values(i)
override def toSeq: Seq[Any] = values.clone()
override def copy(): GenericRow = this
}
對于
dataframe
可以直接使用
api
查詢,傳回的依然是
dataframe
// 引入隐式轉換規則, 此處spark對應前面建立的sparkSession的名稱
import spark.implicits._
dataFrame.select($"username", $"age" + 1).show()
// or
// dataFrame.select('username, 'age + 1).show()
// or
// dataframe.select(dataframe("age")).show()
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan| 21|
+--------+---------+
dataFrame的
sql
操作:
使用
DataFrame
建立一個臨時表
// 建立一個局部臨時表,僅對目前session有效,如果已經存在則報錯
dataFrame.createTempView("user")
// 建立一個臨時表,對所有session有效,如果已經存在則報錯
dataFrame.createGlobalTempView("user")
// 建立一個局部臨時表,如果已經存在則替換
dataFrame.createOrReplaceTempView("user")
// 建立一個臨時表,如果已經存在則替換
dataFrame.createOrReplaceGlobalTempView("user")
使用全局臨時表時需要全路徑通路,如:global_temp.user
對表使用sql操作,傳回的是一個dataframe類型。
1.2:RDD <=> DataFrame
在IDEA中開發程式時,如果需要RDD與DF或者DS之間互相操作,那麼需要引入
import spark.implicits._
這裡的spark不是Scala中的包名,而是建立的sparkSession對象的變量名稱,是以必須先建立sparkSession對象再導入。這裡的spark對象不能使用var聲明,因為Scala隻支援val修飾的對象的引入。
case class Person(name: String, age: Int)
import spark.implicits._
val rdd:RDD[String] = spark.sparkContext.textFile("person")
val dataFrame = rdd.map(item => {
val strings = item.split(",")
if(strings.length > 1) {
Person(strings(0), strings(1).toInt)
} else {
Person(null, 0)
}
}).toDF()
dataFrame.show(false)
1.3:DataFrame <=> RDD
DataFrame其實就是對RDD的封裝,是以可以直接擷取内部的RDD。
val rdd: RDD[Row]= dataFrame.rdd
二:DataSet
DataSet是具有強類型的資料集合,需要提供對應的類型資訊。
2.1:RDD <=> DataSet
case class Person(name: String, age: Int)
import spark.implicits._
val rdd:RDD[String] = spark.sparkContext.textFile("person")
val dataSet = rdd.map(item => {
val strings = item.split(",")
if(strings.length > 1) {
Person(strings(0), strings(1).toInt)
} else {
Person(null, 0)
}
}).toDS()
dataSet.show()
2.2:DataSet <=> RDD
dataSet.rdd
2.3:DataFrame <=> DataSet
DataFrame其實是 DataSet的特例,是以它們之間是可以互相轉換的。
// 必須引入
import spark.implicits._
val rdd: RDD[String] = spark.sparkContext.textFile("person")
val dataframe = rdd.map(item => {
val strings = item.split(",")
if (strings.length > 1) {
Person(strings(0), strings(1).toInt)
} else {
Person(null, 0)
}
})
// DataFrame 轉 DataSet
val dataSet: Dataset[Person] = dataframe.as[Person]
// DataSet 轉 DataFrame
val dataFrame02 = dataSet.toDF()
三:資料的加載和儲存
SparkSQL 預設讀取和儲存的檔案格式為
parquet
。
3.1:Read
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
3.1.1:parquet
val dataframe = spark.read.load("output_person")
dataframe.show(false)
3.1.2:orc
3.1.3:json
3.1.4:csv
val dataframe = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("user.csv")
dataframe.show(false)
3.1.5:mysql
val dataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mysql")
.option("driver", "com.mysql.jdbc.cj.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "table")
.load()
dataFrame.show(false)
3.2:Write
3.2.1:Parquet
3.2.2:json
3.2.3:csv
val dataframe = spark.read.json("json_output")
dataframe.write
.mode(SaveMode.Append)
.option("header", "true")
.option("sep", ";")
.option("inferSchema", "true")
.csv("csv_output")
3.2.4:mysql
case class User(name: String, age: Long)
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_ReadAndWrite")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User("lisi", 20), User("zs", 30)))
val ds: Dataset[User] = rdd.toDS
ds.show(false)
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mysql")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "table")
.mode(SaveMode.Append)
.save()