天天看点

Spark-Sql(RDD----DataFrame3种方式)

  1. DataFrame是什么

    在Spark中,DataFrame是一种按列组织的分布式数据集,概念上等价于关系数据库中一个表或者是Python中的

    data frame,但是在底层进行了更丰富的优化。

  2. DataFrame与RDD的对比以及联系

    DataFrame里面存放的结构化数据的描述信息,DataFrame要有表头(表的描述信息),描述了有多少列,每一

    列数叫什么名字、什么类型、能不能为空?

    DataFrame是特殊的RDD(RDD+Schema信息就变成了DataFrame)DataFrame是一种以RDD为基础的分布式数

    据集,类似于传统数据库中的二维表格。

    与RDD的主要区别在于:前者带有Schema元数据,即DataFrame所表示的二维数据集的每一列都有名称和类型。

    由于无法知道RDD数据集内部的结构,Spark执行作业只能在调度阶段进行简单通用的优化,而DataFrame带有数

    据集内部的结构,可以根据这些信息进行针对性的优化,最终实现优化运行效率。

    DataFrame带来的好处:

    精简代码

    提升执行效率

    减少数据读取:忽略无关数据,根据查询条件进行适当裁剪。

    Spark-Sql(RDD----DataFrame3种方式)
  3. RDD----DataFrame3种方式
    1. 直接手动确定

      手动创建其实就在在已有的RDD上给定一个标签的名称,因为RDD本身就带有数据格式,再给定一个标签就满足了DataFrame的格式要求

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark
object RDDtoDataFrame1 {
  def main(args: Array[String]): Unit = {
    val ss:SparkSession=SparkSession.builder().appName("RDDtoDataFrame1").master("local[*]").getOrCreate()
    val rdd1:RDD[(String, Int)] =ss.sparkContext.parallelize(List("xiaowanzi"->23,"zhangsan"->25,"lisi"->30))
    import  ss.implicits._
    val df:DataFrame=rdd1.toDF("name","age")
    df.createOrReplaceTempView("person")
    ss.sql("select name,age from person").show()
    ss.stop()
  }
}
           
  1. 利用反射创建

    利用反射创建Dataframe的前提是已经知道了Schema(即表的内部结构),首先要定义一个case class(这个class其实就是Schema),然后要获取一个RDD,这个RDD的类型要为已经定义过的class,最后直接RDD.toDF()

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object RDDtoDataFrame2 {
  case class person(name:String,age:Int){}
  def main(args: Array[String]): Unit = {
    val ss:SparkSession=SparkSession.builder().appName("RDDtoDataFrame1").master("local[*]").getOrCreate()
    val rdd2:RDD[person]=ss.sparkContext.parallelize(List("xiaowanzi"->23,"zhangsan"->25,"lisi"->30)).map(x=>{
      val name=x._1
      val age=x._2
      person(name,age)
    })
    import  ss.implicits._
    val df2:DataFrame=rdd2.toDF()
    df2.createOrReplaceTempView("person")
    ss.sql("select name,age from person").show()
    ss.stop()
  }
}
           
  1. 利用接口编程实现

    在事先不知道表的Schema的情况,可以利用接口编程来创建,首先创建一个Row类型的RDD,然后定义一个StructType类型的Schema,即给Row标签和数据类型。最后使用sparksession.createDataFrame(rdd,schema1)即可

package sparksql


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection.Schema
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object RDDtoDataFrame {
  def main(args: Array[String]): Unit = {
    val ss:SparkSession=SparkSession.builder().appName("RDDtoDataFrame1").master("local[*]").getOrCreate()
    val rdd:RDD[Row]=ss.sparkContext.parallelize(List("xiaowanzi"->23,"zhangsan"->25,"lisi"->30)).map(x=>{
      val name=x._1
      val age=x._2
      Row(name,age)
    })
    val filed="name,age".split(",").map(x=>{StructField(x,StringType,true)})
    val schema=StructType(filed)
    val schema1:StructType = StructType(List(
      StructField("name",StringType,true),
      StructField("age",IntegerType,true)
    ))
    val df:DataFrame=ss.createDataFrame(rdd,schema1)
    df.createOrReplaceTempView("person")
    ss.sql("select name,age from person").show()
    ss.stop()
  }
}
           

总结:3种方法虽然表面看起来不太一样,但其根本就是拼凑出一个有标签,有类型的Row,只要满足这2个条件,RDD就可以转换为DataFrame。

继续阅读