天天看点

Spark-SqlSpark-sql一:DataFrame二:DataSet三:数据的加载和保存

Spark-sql

一:DataFrame

DataFrame

是什么?

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有

schema

元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构 信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。同时,与Hive类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API更加友好,门槛更低。

Spark-SqlSpark-sql一:DataFrame二:DataSet三:数据的加载和保存

Spark SQL的DataFrame API允许我们使用DataFrame而不用必须去注册临时表或者生成SQL 表达式。DataFrame API既有transformation操作也有action操作。

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame

有三种方式:

  1. 通过

    Spark

    的数据源进行创建。
  2. 从一个存在的

    RDD

    进行转换。
  3. 还可以从

    Table

    进行查询返回。

1.1:从数据源进行创建

spark.read.

后面可以跟接的数据源。

Spark-SqlSpark-sql一:DataFrame二:DataSet三:数据的加载和保存

以读取一个json文件为例,展示数据结构:

创建user.json文件:

代码读取示例:

object DataFrameExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_DataFrame")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val dataFrame = spark.read.json("user.json")
  }
}
           

拿到

DataFrame

以后就可以获取所有的结构和数据:

获取结构

schema

:

dataFrame.schema.foreach(item => {
      println("field name is " + item.name)
      println("field type is " + item.dataType.typeName)
      println("field nullable is " + item.nullable)
    })
    
	//  field name is age
	//  field type is long
	//  field nullable is true

	//  field name is username
	//  field type is string
	//  field nullable is true
           

schema的每个字段类型都是

org.apache.spark.sql.types.StructField

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) {
           

获取数据

rdd

dataFrame.rdd.foreach(item => {
      for (index <- 0 until  item.size) {
        print(item.get(index) + " , ")
      }
    })
    // or
    dataFrame.rdd.foreach(println)
 
    // 20 , zhangsan
    // [20,zhangsan]
           

每个item的类型为

org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

class GenericRowWithSchema(values: Array[Any], 
                           override val schema: StructType)
extends GenericRow(values) 
           

GenericRow

源码如下,所以

item.get()

调用的是

value

数组的值。

class GenericRow(protected[sql] val values: Array[Any]) extends Row {
  protected def this() = this(null)
  
  def this(size: Int) = this(new Array[Any](size))
  
  override def length: Int = values.length

  override def get(i: Int): Any = values(i)

  override def toSeq: Seq[Any] = values.clone()

  override def copy(): GenericRow = this
}
           

对于

dataframe

可以直接使用

api

查询,返回的依然是

dataframe

// 引入隐式转换规则, 此处spark对应前面创建的sparkSession的名称
    import spark.implicits._
    dataFrame.select($"username", $"age" + 1).show()
    // or
    // dataFrame.select('username, 'age + 1).show()
    // or
    // dataframe.select(dataframe("age")).show()

+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan|       21|
+--------+---------+
           

dataFrame的

sql

操作:

使用

DataFrame

创建一个临时表

// 创建一个局部临时表,仅对当前session有效,如果已经存在则报错
    dataFrame.createTempView("user")
    // 创建一个临时表,对所有session有效,如果已经存在则报错
    dataFrame.createGlobalTempView("user")
    // 创建一个局部临时表,如果已经存在则替换
    dataFrame.createOrReplaceTempView("user")
    // 创建一个临时表,如果已经存在则替换
    dataFrame.createOrReplaceGlobalTempView("user")
           

使用全局临时表时需要全路径访问,如:global_temp.user

对表使用sql操作,返回的是一个dataframe类型。

1.2:RDD <=> DataFrame

在IDEA中开发程序时,如果需要RDD与DF或者DS之间互相操作,那么需要引入

import spark.implicits._

这里的spark不是Scala中的包名,而是创建的sparkSession对象的变量名称,所以必须先创建sparkSession对象再导入。这里的spark对象不能使用var声明,因为Scala只支持val修饰的对象的引入。

case class Person(name: String, age: Int)
    import spark.implicits._
    val rdd:RDD[String] = spark.sparkContext.textFile("person")
    val dataFrame = rdd.map(item => {
      val strings = item.split(",")
      if(strings.length > 1) {
        Person(strings(0), strings(1).toInt)
      } else {
        Person(null, 0)
      }
    }).toDF()
    dataFrame.show(false)
           

1.3:DataFrame <=> RDD

DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD。

val rdd: RDD[Row]= dataFrame.rdd
           

二:DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

2.1:RDD <=> DataSet

case class Person(name: String, age: Int)
    import spark.implicits._
    val rdd:RDD[String] = spark.sparkContext.textFile("person")
    val dataSet = rdd.map(item => {
      val strings = item.split(",")
      if(strings.length > 1) {
        Person(strings(0), strings(1).toInt)
      } else {
        Person(null, 0)
      }
    }).toDS()
    dataSet.show()
           

2.2:DataSet <=> RDD

dataSet.rdd
           

2.3:DataFrame <=> DataSet

DataFrame其实是 DataSet的特例,所以它们之间是可以互相转换的。

// 必须引入
    import spark.implicits._
    val rdd: RDD[String] = spark.sparkContext.textFile("person")
    val dataframe = rdd.map(item => {
      val strings = item.split(",")
      if (strings.length > 1) {
        Person(strings(0), strings(1).toInt)
      } else {
        Person(null, 0)
      }
    })
	// DataFrame 转 DataSet
    val dataSet: Dataset[Person] = dataframe.as[Person]
    // DataSet 转 DataFrame
    val dataFrame02 = dataSet.toDF()
           

三:数据的加载和保存

SparkSQL 默认读取和保存的文件格式为

parquet

3.1:Read

scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
           

3.1.1:parquet

val dataframe = spark.read.load("output_person")
    dataframe.show(false)
           

3.1.2:orc

3.1.3:json

3.1.4:csv

val dataframe = spark.read.format("csv")
                              .option("sep", ",")
                              .option("inferSchema", "true")
                              .option("header", "true")
                              .load("user.csv")
    dataframe.show(false)
           

3.1.5:mysql

val dataFrame = spark.read.format("jdbc")
                              .option("url", "jdbc:mysql://localhost:3306/mysql")
                              .option("driver", "com.mysql.jdbc.cj.Driver")
                              .option("user", "root")
                              .option("password", "123456")
                              .option("dbtable", "table")
                              .load()
    dataFrame.show(false)
           

3.2:Write

3.2.1:Parquet

3.2.2:json

3.2.3:csv

val dataframe = spark.read.json("json_output")
    dataframe.write
             .mode(SaveMode.Append)
             .option("header", "true")
             .option("sep", ";")
             .option("inferSchema", "true")
             .csv("csv_output")
           

3.2.4:mysql

case class User(name: String, age: Long)
    val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_ReadAndWrite")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User("lisi", 20), User("zs", 30)))
    val ds: Dataset[User] = rdd.toDS
    ds.show(false)
    ds.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/mysql")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "table")
      .mode(SaveMode.Append)
      .save()
           

3.2.5:orc

继续阅读