天天看點

Spark-SqlSpark-sql一:DataFrame二:DataSet三:資料的加載和儲存

Spark-sql

一:DataFrame

DataFrame

是什麼?

在Spark中,DataFrame是一種以RDD為基礎的分布式資料集,類似于傳統資料庫中的二維表格。DataFrame與RDD的主要差別在于,前者帶有

schema

元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構 資訊,進而對藏于DataFrame背後的資料源以及作用于DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運作時效率的目标。反觀RDD,由于無從得知所存資料元素的具體内部結構,Spark Core隻能在stage層面進行簡單、通用的流水線優化。同時,與Hive類似,DataFrame 也支援嵌套資料類型(struct、array 和 map)。從API易用性的角度上看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API更加友好,門檻更低。

Spark-SqlSpark-sql一:DataFrame二:DataSet三:資料的加載和儲存

Spark SQL的DataFrame API允許我們使用DataFrame而不用必須去注冊臨時表或者生成SQL 表達式。DataFrame API既有transformation操作也有action操作。

在Spark SQL中SparkSession是建立DataFrame和執行SQL的入口,建立DataFrame

有三種方式:

  1. 通過

    Spark

    的資料源進行建立。
  2. 從一個存在的

    RDD

    進行轉換。
  3. 還可以從

    Table

    進行查詢傳回。

1.1:從資料源進行建立

spark.read.

後面可以跟接的資料源。

Spark-SqlSpark-sql一:DataFrame二:DataSet三:資料的加載和儲存

以讀取一個json檔案為例,展示資料結構:

建立user.json檔案:

代碼讀取示例:

object DataFrameExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_DataFrame")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val dataFrame = spark.read.json("user.json")
  }
}
           

拿到

DataFrame

以後就可以擷取所有的結構和資料:

擷取結構

schema

:

dataFrame.schema.foreach(item => {
      println("field name is " + item.name)
      println("field type is " + item.dataType.typeName)
      println("field nullable is " + item.nullable)
    })
    
	//  field name is age
	//  field type is long
	//  field nullable is true

	//  field name is username
	//  field type is string
	//  field nullable is true
           

schema的每個字段類型都是

org.apache.spark.sql.types.StructField

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) {
           

擷取資料

rdd

dataFrame.rdd.foreach(item => {
      for (index <- 0 until  item.size) {
        print(item.get(index) + " , ")
      }
    })
    // or
    dataFrame.rdd.foreach(println)
 
    // 20 , zhangsan
    // [20,zhangsan]
           

每個item的類型為

org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

class GenericRowWithSchema(values: Array[Any], 
                           override val schema: StructType)
extends GenericRow(values) 
           

GenericRow

源碼如下,是以

item.get()

調用的是

value

數組的值。

class GenericRow(protected[sql] val values: Array[Any]) extends Row {
  protected def this() = this(null)
  
  def this(size: Int) = this(new Array[Any](size))
  
  override def length: Int = values.length

  override def get(i: Int): Any = values(i)

  override def toSeq: Seq[Any] = values.clone()

  override def copy(): GenericRow = this
}
           

對于

dataframe

可以直接使用

api

查詢,傳回的依然是

dataframe

// 引入隐式轉換規則, 此處spark對應前面建立的sparkSession的名稱
    import spark.implicits._
    dataFrame.select($"username", $"age" + 1).show()
    // or
    // dataFrame.select('username, 'age + 1).show()
    // or
    // dataframe.select(dataframe("age")).show()

+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan|       21|
+--------+---------+
           

dataFrame的

sql

操作:

使用

DataFrame

建立一個臨時表

// 建立一個局部臨時表,僅對目前session有效,如果已經存在則報錯
    dataFrame.createTempView("user")
    // 建立一個臨時表,對所有session有效,如果已經存在則報錯
    dataFrame.createGlobalTempView("user")
    // 建立一個局部臨時表,如果已經存在則替換
    dataFrame.createOrReplaceTempView("user")
    // 建立一個臨時表,如果已經存在則替換
    dataFrame.createOrReplaceGlobalTempView("user")
           

使用全局臨時表時需要全路徑通路,如:global_temp.user

對表使用sql操作,傳回的是一個dataframe類型。

1.2:RDD <=> DataFrame

在IDEA中開發程式時,如果需要RDD與DF或者DS之間互相操作,那麼需要引入

import spark.implicits._

這裡的spark不是Scala中的包名,而是建立的sparkSession對象的變量名稱,是以必須先建立sparkSession對象再導入。這裡的spark對象不能使用var聲明,因為Scala隻支援val修飾的對象的引入。

case class Person(name: String, age: Int)
    import spark.implicits._
    val rdd:RDD[String] = spark.sparkContext.textFile("person")
    val dataFrame = rdd.map(item => {
      val strings = item.split(",")
      if(strings.length > 1) {
        Person(strings(0), strings(1).toInt)
      } else {
        Person(null, 0)
      }
    }).toDF()
    dataFrame.show(false)
           

1.3:DataFrame <=> RDD

DataFrame其實就是對RDD的封裝,是以可以直接擷取内部的RDD。

val rdd: RDD[Row]= dataFrame.rdd
           

二:DataSet

DataSet是具有強類型的資料集合,需要提供對應的類型資訊。

2.1:RDD <=> DataSet

case class Person(name: String, age: Int)
    import spark.implicits._
    val rdd:RDD[String] = spark.sparkContext.textFile("person")
    val dataSet = rdd.map(item => {
      val strings = item.split(",")
      if(strings.length > 1) {
        Person(strings(0), strings(1).toInt)
      } else {
        Person(null, 0)
      }
    }).toDS()
    dataSet.show()
           

2.2:DataSet <=> RDD

dataSet.rdd
           

2.3:DataFrame <=> DataSet

DataFrame其實是 DataSet的特例,是以它們之間是可以互相轉換的。

// 必須引入
    import spark.implicits._
    val rdd: RDD[String] = spark.sparkContext.textFile("person")
    val dataframe = rdd.map(item => {
      val strings = item.split(",")
      if (strings.length > 1) {
        Person(strings(0), strings(1).toInt)
      } else {
        Person(null, 0)
      }
    })
	// DataFrame 轉 DataSet
    val dataSet: Dataset[Person] = dataframe.as[Person]
    // DataSet 轉 DataFrame
    val dataFrame02 = dataSet.toDF()
           

三:資料的加載和儲存

SparkSQL 預設讀取和儲存的檔案格式為

parquet

3.1:Read

scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
           

3.1.1:parquet

val dataframe = spark.read.load("output_person")
    dataframe.show(false)
           

3.1.2:orc

3.1.3:json

3.1.4:csv

val dataframe = spark.read.format("csv")
                              .option("sep", ",")
                              .option("inferSchema", "true")
                              .option("header", "true")
                              .load("user.csv")
    dataframe.show(false)
           

3.1.5:mysql

val dataFrame = spark.read.format("jdbc")
                              .option("url", "jdbc:mysql://localhost:3306/mysql")
                              .option("driver", "com.mysql.jdbc.cj.Driver")
                              .option("user", "root")
                              .option("password", "123456")
                              .option("dbtable", "table")
                              .load()
    dataFrame.show(false)
           

3.2:Write

3.2.1:Parquet

3.2.2:json

3.2.3:csv

val dataframe = spark.read.json("json_output")
    dataframe.write
             .mode(SaveMode.Append)
             .option("header", "true")
             .option("sep", ";")
             .option("inferSchema", "true")
             .csv("csv_output")
           

3.2.4:mysql

case class User(name: String, age: Long)
    val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_ReadAndWrite")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User("lisi", 20), User("zs", 30)))
    val ds: Dataset[User] = rdd.toDS
    ds.show(false)
    ds.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/mysql")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "table")
      .mode(SaveMode.Append)
      .save()
           

3.2.5:orc

繼續閱讀