天天看点

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

对应课件:3.3 SparkSQL中的核心数据结构DataFrame第40.pdf

对应视频:40、agg聚合方法及数据缓存方法cache的讲解.mp4

一、spark.read方法读取文件创建dataframe。

dir(spark.read)查看。spark.read支持多种格式的文件的读取,包括:'csv', 'format', 'jdbc', 'json', 'load', 'option', 'options', 'orc', 'parquet', 'schema', 'table', 'text'

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

hdfs dfs -mkdir /sql

hdfs dfs -put users.parquet   /sql

hdfs dfs -ls /sql

将文件users.parquet上传到创建好的目录下,查看一下。

df = spark.read.parquet('/sql/users.parquet') #读取文件

df.show() #展示样本数据

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

 df.printSchema()  #查看dataframe的结构

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点
|-- element: integer (containsNull = true)表示favorite_numbers字段里边是数组      

     通过read上的csv方法读取hdfs上的/customers.csv,返回dataframe,由关键字参数header指定保留文件的头信息,并使用头信息作为DataFrame的columns。调用printSchema方法打印出DataFrame的结构信息。

df = spark.read.csv('/sql/customers.csv',header=True)

df.printSchema()

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

df.show()

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

   有了dataFrame之后,怎样来聚合呢?

这个DataFrame中第一列为编号,第二列为性别,第三列为年龄,第四列为实际收入,第五列为消费积分。现在就可以用agg聚合方法来找出Age的最大或最小值,实际收入的均值,消费积分的均值等信息了。例如计算年龄最大值、收入均值、消费积分均值.

    对多个字段求得聚合放到一个字典中,键为列名称,值为用来计算的函数名称,当然这里的函数可以是自定义的聚合函数udaf。注意:字典里边的键和值要用单引号或者双引号扩起来。

df.agg({"Age": "max","Annual Income (k$)":'mean','Spending Score (1-100)':"mean"}).show()
           
pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

 三、dataframe.alias(‘别名’)。为DataFrame定义一个别名,稍后再函数中就可以利用这个别名来做相关的运算,例如说自关联.

from pyspark.sql.functions import *

df = spark.read.csv('/sql/customers.csv',header=True)

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

 df2 = df.alias('cus2')

df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')

df3.count()

Out[26]:

df3.show(4)

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

 四、cache(),将DataFrame缓存到StorageLevel对应的缓存级别中,默认是MEMORY_AND_DISK 

df = spark.read.csv('/sql/customers.csv',header=True)

a = df.cache()

a.show()

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

    将频繁需要查询的数据缓存起来,这样下一次要查询的时候就可以直接在内存中读取数据,

从而提升数据的读取速度提升计算的效率。

五、dataframe.checkpoint(eager=True) 。关键字参数eager,默认为True表示是否立即设置断

点。

    对DataFrame设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个

DataFrame上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。这个方法

对于需要大量需要迭代的算法非常有用,因为算法在迭代的过程中逻辑计划的数据量会呈现

指数级别的上升。要使用这个方法需要使用sparkcontext上面的setCheckpointDir设置检查

点数据在HDFS中的存储目录。

sc=spark.sparkContext   #从现有的sparksession对象spark中获取到sparkContext

sc.master

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点
pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

sc.setCheckpointDir('/datas/checkpoint')

#设置检查点数据在HDFS中的存储目录,hdfs上会多一个/checkpoint目录

a.checkpoint()

a.show()

运行完成之后,在hdfs的/datas/checkpoint目录下会看到checkpoint的数据

!hdfs dfs -ls /datas/checkpoint

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

继续阅读