目录
-
- pyspark连接mysql
- pyspark SQL常用语法
-
- 输出schema
- 预览表
- 统计数量
- 输出列名称和字段类型
- 选择列
- 为选择的列赋予新名称
- 按条件过滤
- 构造新列
- 增加行
- 删除重复记录
- 删除列
- 删除缺失值行
- 填充缺失值
- 分组统计和计算
- 函数计算
- 描述性分析
- 参考链接
pyspark连接mysql
前提是需要下载jar包。
Mysql-connector-java.jar
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
"useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
"useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()
pyspark SQL常用语法
pyspark SQL的部分语法和pandas的很相似。
输出schema
df.printSchema()
# root
# |-- id: integer (nullable = true)
# |-- 省份: string (nullable = true)
预览表
df.show(n=5)
# +----+------+------+------+------+
# | id| 省份| 城市| 区县| 区域|
# +----+------+------+------+------
# |2557|广东省|深圳市|罗湖区|春风路
# ...
统计数量
print(df.count())
# 47104
输出列名称和字段类型
print(df.columns)
# ['id', '省份', '城市', '区县', '区域', '小区', '源地址',...
print(df.dtypes)
# [('id', 'int'), ('省份', 'string'), ('城市', 'string'),...
选择列
df.select('城市', '区县', '区域', '小区').show()
# +------+------+------+--------------+
# | 城市| 区县| 区域| 小区|
# +------+------+------+--------------+
# |深圳市|罗湖区|春风路| 凯悦华庭|
# |深圳市|罗湖区|春风路| 置地逸轩|
# ...
为选择的列赋予新名称
可以看到有两种方式来对指定列做操作:
- 列名称是英文的话,直接在df后面用点号调用
- 列名称非英文,可以在后面用中括号调用
df.select(df.id.alias('id_value'), '小区').show()
# +--------+--------------+
# |id_value| 小区|
# +--------+--------------+
# | 2557| 凯悦华庭|
# | 2558| 置地逸轩|
# ...
df.select(df["城市"].alias('city'), '小区').show()
# +------+--------------+
# | city| 小区|
# +------+--------------+
# |深圳市| 凯悦华庭|
# |深圳市| 置地逸轩|
# ...
按条件过滤
在filter中定义过滤条件,如果要进行多条件过滤,使用以下符号:
& 代表 and
| 代表or
~ 代表not
df.select('城市', '区县', '区域', '小区').filter(df["小区"] == '凯悦华庭').show()
# +------+------+------+--------+
# | 城市| 区县| 区域| 小区|
# +------+------+------+--------+
# |深圳市|罗湖区|春风路|凯悦华庭|
# ...
df.select('城市', '区县', '区域', '小区').filter((df["城市"] == '深圳市') & (df["区县"] == '南山区')).show()
# +------+------+------+----------------+
# | 城市| 区县| 区域| 小区|
# +------+------+------+----------------+
# |深圳市|南山区|白石洲|中海深圳湾畔花园|
# |深圳市|南山区|白石洲| 侨城豪苑二期|
# ...
# 可以直接在filter里面写条件表达式
df.select('id', '城市', '区县', '区域', '小区').filter("id = 5000").show()
构造新列
使用以下两种方式构造新列,其中使
pyspark.sql.functions
下提供很多sql操作函数。
我们使用
functions
下的
lit
构造新列。
import pyspark.sql.functions as F
df.select(df["城市"] + 1, '城市', '小区').show()
# +----------+------+--------------+
# |(城市 + 1)| 城市| 小区|
# +----------+------+--------------+
# | null|深圳市| 凯悦华庭|
df.select(F.lit("test").alias('城市plus'), '城市', '小区').show()
# +--------+------+--------------+
# |城市plus| 城市| 小区|
# +--------+------+--------------+
# | test|深圳市| 凯悦华庭|
增加行
# 取出一行
df2 = df.limit(1) # one row
# 增加行
print(df.count()) # 47104
print(df.unionAll(df2).count()) # 47105
删除重复记录
我们上面增加了一个重复行,目前条数是47105,我们接下来看去除重复行之后47104。
另外一个问题,对于原始数据来说,我们的id是不重复的,但是其他字段可能会重复。
因此我们去重的时候应该去掉id的影响,这里可以使用下面第二种指定字段去重。
# 删除重复记录
print(df.drop_duplicates().count()) # 47104
print(df.drop_duplicates(['省份', '城市', '区县', '区域', '小区']).count())
删除列
# 删除列
print(df.drop('id').columns)
# ['省份', '城市', '区县', '区域', '小区', '源地址',...
删除缺失值行
# 删除存在缺失的记录
print(df.dropna().count()) # 47092
# 删除指定字段中存在缺失的记录
print(df.dropna(subset=['省份', '城市']).count()) # 47104
填充缺失值
填充缺失值,指定字段需要填充的值。
分组统计和计算
# 分组统计
df_g1 = df.groupby("区县").count()
df_g1.show()
# +--------+-----+
# | 区县|count|
# +--------+-----+
# | 龙华区| 4217|
# ...
# 分组计算
df.groupby('区县').agg(F.max(df['总价'])).show()
+--------+----------+
# | 区县| max(总价)|
# +--------+----------+
# | 龙华区|5200.00000|
# | 福田区|8300.00000|
# | 罗湖区|7000.00000|
# | 坪山区|1588.00000|
# | 南山区|9800.00000|
# | 龙岗区|4000.00000|
# | 盐田区|5500.00000|
# | 光明区|5200.00000|
# |大鹏新区|3500.00000|
# | 宝安区|8800.00000|
# +--------+----------+
函数计算
同样,函数计算由
pyspark.sql.functions
提供。
# 函数计算
df.select(F.max(df["总价"])).show() # 最大值
# +----------+
# | max(总价)|
# +----------+
# |9800.00000|
# +----------+
df.select(F.min(df["总价"])).show() # 最小值
# +---------+
# |min(总价)|
# +---------+
# | 1.10000|
# +---------+
df.select(F.avg(df["总价"])).show() # 平均值
# +-------------+
# | avg(总价)|
# +-------------+
# |577.736916000|
# +-------------+
df.select(F.countDistinct(df["总价"])).show() # 去重后再统计
# |count(DISTINCT 总价)|
# +--------------------+
# | 1219|
# +--------------------+
df.select(F.count(df["总价"])).show() # 去掉缺失值会再统计
# +-----------+
# |count(总价)|
# +-----------+
# | 47104|
# +-----------+
其余的函数:
# 'lit': 'Creates a :class:`Column` of literal value.',
# 'col': 'Returns a :class:`Column` based on the given column name.',
# 'column': 'Returns a :class:`Column` based on the given column name.',
# 'asc': 'Returns a sort expression based on the ascending order of the given column name.',
# 'desc': 'Returns a sort expression based on the descending order of the given column name.',
#
# 'upper': 'Converts a string expression to upper case.',
# 'lower': 'Converts a string expression to upper case.',
# 'sqrt': 'Computes the square root of the specified float value.',
# 'abs': 'Computes the absolutle value.',
#
# 'max': 'Aggregate function: returns the maximum value of the expression in a group.',
# 'min': 'Aggregate function: returns the minimum value of the expression in a group.',
# 'first': 'Aggregate function: returns the first value in a group.',
# 'last': 'Aggregate function: returns the last value in a group.',
# 'count': 'Aggregate function: returns the number of items in a group.',
# 'sum': 'Aggregate function: returns the sum of all values in the expression.',
# 'avg': 'Aggregate function: returns the average of the values in a group.',
# 'mean': 'Aggregate function: returns the average of the values in a group.',
# 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
描述性分析
df.describe("总价").show()
# +-------+-----------------+
# |summary| 总价|
# +-------+-----------------+
# | count| 47104|
# | mean| 577.736916000|
# | stddev|544.7605196104298|
# | min| 1.10000|
# | max| 9800.00000|
# +-------+-----------------+
最后,记得stop一下。
参考链接
Quick Start
Spark Python API (Sphinx)
PySpark SQL常用语法
spark官方文档 翻译 之pyspark.sql.SQLContext