-
DataFrame是什么
在Spark中,DataFrame是一种按列组织的分布式数据集,概念上等价于关系数据库中一个表或者是Python中的
data frame,但是在底层进行了更丰富的优化。
-
DataFrame与RDD的对比以及联系
DataFrame里面存放的结构化数据的描述信息,DataFrame要有表头(表的描述信息),描述了有多少列,每一
列数叫什么名字、什么类型、能不能为空?
DataFrame是特殊的RDD(RDD+Schema信息就变成了DataFrame)DataFrame是一种以RDD为基础的分布式数
据集,类似于传统数据库中的二维表格。
与RDD的主要区别在于:前者带有Schema元数据,即DataFrame所表示的二维数据集的每一列都有名称和类型。
由于无法知道RDD数据集内部的结构,Spark执行作业只能在调度阶段进行简单通用的优化,而DataFrame带有数
据集内部的结构,可以根据这些信息进行针对性的优化,最终实现优化运行效率。
DataFrame带来的好处:
精简代码
提升执行效率
减少数据读取:忽略无关数据,根据查询条件进行适当裁剪。
- RDD----DataFrame3种方式
-
直接手动确定
手动创建其实就在在已有的RDD上给定一个标签的名称,因为RDD本身就带有数据格式,再给定一个标签就满足了DataFrame的格式要求
-
package sparksql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark
object RDDtoDataFrame1 {
def main(args: Array[String]): Unit = {
val ss:SparkSession=SparkSession.builder().appName("RDDtoDataFrame1").master("local[*]").getOrCreate()
val rdd1:RDD[(String, Int)] =ss.sparkContext.parallelize(List("xiaowanzi"->23,"zhangsan"->25,"lisi"->30))
import ss.implicits._
val df:DataFrame=rdd1.toDF("name","age")
df.createOrReplaceTempView("person")
ss.sql("select name,age from person").show()
ss.stop()
}
}
-
利用反射创建
利用反射创建Dataframe的前提是已经知道了Schema(即表的内部结构),首先要定义一个case class(这个class其实就是Schema),然后要获取一个RDD,这个RDD的类型要为已经定义过的class,最后直接RDD.toDF()
package sparksql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object RDDtoDataFrame2 {
case class person(name:String,age:Int){}
def main(args: Array[String]): Unit = {
val ss:SparkSession=SparkSession.builder().appName("RDDtoDataFrame1").master("local[*]").getOrCreate()
val rdd2:RDD[person]=ss.sparkContext.parallelize(List("xiaowanzi"->23,"zhangsan"->25,"lisi"->30)).map(x=>{
val name=x._1
val age=x._2
person(name,age)
})
import ss.implicits._
val df2:DataFrame=rdd2.toDF()
df2.createOrReplaceTempView("person")
ss.sql("select name,age from person").show()
ss.stop()
}
}
-
利用接口编程实现
在事先不知道表的Schema的情况,可以利用接口编程来创建,首先创建一个Row类型的RDD,然后定义一个StructType类型的Schema,即给Row标签和数据类型。最后使用sparksession.createDataFrame(rdd,schema1)即可
package sparksql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection.Schema
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object RDDtoDataFrame {
def main(args: Array[String]): Unit = {
val ss:SparkSession=SparkSession.builder().appName("RDDtoDataFrame1").master("local[*]").getOrCreate()
val rdd:RDD[Row]=ss.sparkContext.parallelize(List("xiaowanzi"->23,"zhangsan"->25,"lisi"->30)).map(x=>{
val name=x._1
val age=x._2
Row(name,age)
})
val filed="name,age".split(",").map(x=>{StructField(x,StringType,true)})
val schema=StructType(filed)
val schema1:StructType = StructType(List(
StructField("name",StringType,true),
StructField("age",IntegerType,true)
))
val df:DataFrame=ss.createDataFrame(rdd,schema1)
df.createOrReplaceTempView("person")
ss.sql("select name,age from person").show()
ss.stop()
}
}
总结:3种方法虽然表面看起来不太一样,但其根本就是拼凑出一个有标签,有类型的Row,只要满足这2个条件,RDD就可以转换为DataFrame。