对应课件:3.3 SparkSQL中的核心数据结构DataFrame第40.pdf
对应视频:40、agg聚合方法及数据缓存方法cache的讲解.mp4
一、spark.read方法读取文件创建dataframe。
dir(spark.read)查看。spark.read支持多种格式的文件的读取,包括:'csv', 'format', 'jdbc', 'json', 'load', 'option', 'options', 'orc', 'parquet', 'schema', 'table', 'text'
hdfs dfs -mkdir /sql
hdfs dfs -put users.parquet /sql
hdfs dfs -ls /sql
将文件users.parquet上传到创建好的目录下,查看一下。
df = spark.read.parquet('/sql/users.parquet') #读取文件
df.show() #展示样本数据
df.printSchema() #查看dataframe的结构
|-- element: integer (containsNull = true)表示favorite_numbers字段里边是数组
通过read上的csv方法读取hdfs上的/customers.csv,返回dataframe,由关键字参数header指定保留文件的头信息,并使用头信息作为DataFrame的columns。调用printSchema方法打印出DataFrame的结构信息。
df = spark.read.csv('/sql/customers.csv',header=True)
df.printSchema()
df.show()
有了dataFrame之后,怎样来聚合呢?
这个DataFrame中第一列为编号,第二列为性别,第三列为年龄,第四列为实际收入,第五列为消费积分。现在就可以用agg聚合方法来找出Age的最大或最小值,实际收入的均值,消费积分的均值等信息了。例如计算年龄最大值、收入均值、消费积分均值.
对多个字段求得聚合放到一个字典中,键为列名称,值为用来计算的函数名称,当然这里的函数可以是自定义的聚合函数udaf。注意:字典里边的键和值要用单引号或者双引号扩起来。
df.agg({"Age": "max","Annual Income (k$)":'mean','Spending Score (1-100)':"mean"}).show()
三、dataframe.alias(‘别名’)。为DataFrame定义一个别名,稍后再函数中就可以利用这个别名来做相关的运算,例如说自关联.
from pyspark.sql.functions import *
df = spark.read.csv('/sql/customers.csv',header=True)
df2 = df.alias('cus2')
df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')
df3.count()
Out[26]:
df3.show(4)
四、cache(),将DataFrame缓存到StorageLevel对应的缓存级别中,默认是MEMORY_AND_DISK
df = spark.read.csv('/sql/customers.csv',header=True)
a = df.cache()
a.show()
将频繁需要查询的数据缓存起来,这样下一次要查询的时候就可以直接在内存中读取数据,
从而提升数据的读取速度提升计算的效率。
五、dataframe.checkpoint(eager=True) 。关键字参数eager,默认为True表示是否立即设置断
点。
对DataFrame设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个
DataFrame上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。这个方法
对于需要大量需要迭代的算法非常有用,因为算法在迭代的过程中逻辑计划的数据量会呈现
指数级别的上升。要使用这个方法需要使用sparkcontext上面的setCheckpointDir设置检查
点数据在HDFS中的存储目录。
sc=spark.sparkContext #从现有的sparksession对象spark中获取到sparkContext
sc.master
sc.setCheckpointDir('/datas/checkpoint')
#设置检查点数据在HDFS中的存储目录,hdfs上会多一个/checkpoint目录
a.checkpoint()
a.show()
运行完成之后,在hdfs的/datas/checkpoint目录下会看到checkpoint的数据
!hdfs dfs -ls /datas/checkpoint