天天看点

使用pyspark SQL处理MySQL中的数据

目录

    • pyspark连接mysql
    • pyspark SQL常用语法
      • 输出schema
      • 预览表
      • 统计数量
      • 输出列名称和字段类型
      • 选择列
      • 为选择的列赋予新名称
      • 按条件过滤
      • 构造新列
      • 增加行
      • 删除重复记录
      • 删除列
      • 删除缺失值行
      • 填充缺失值
      • 分组统计和计算
      • 函数计算
      • 描述性分析
    • 参考链接

pyspark连接mysql

前提是需要下载jar包。

Mysql-connector-java.jar

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F


sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
        "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
        "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()
           

pyspark SQL常用语法

pyspark SQL的部分语法和pandas的很相似。

输出schema

df.printSchema()
# root
#  |-- id: integer (nullable = true)
#  |-- 省份: string (nullable = true)
           

预览表

df.show(n=5)
# +----+------+------+------+------+
# |  id|  省份|  城市|  区县|  区域|
# +----+------+------+------+------
# |2557|广东省|深圳市|罗湖区|春风路
# ...
           

统计数量

print(df.count())
# 47104
           

输出列名称和字段类型

print(df.columns)
# ['id', '省份', '城市', '区县', '区域', '小区', '源地址',...
print(df.dtypes)
# [('id', 'int'), ('省份', 'string'), ('城市', 'string'),...
           

选择列

df.select('城市', '区县', '区域', '小区').show()
# +------+------+------+--------------+
# |  城市|  区县|  区域|          小区|
# +------+------+------+--------------+
# |深圳市|罗湖区|春风路|      凯悦华庭|
# |深圳市|罗湖区|春风路|      置地逸轩|
# ...
           

为选择的列赋予新名称

可以看到有两种方式来对指定列做操作:

  1. 列名称是英文的话,直接在df后面用点号调用
  2. 列名称非英文,可以在后面用中括号调用
df.select(df.id.alias('id_value'), '小区').show()
# +--------+--------------+
# |id_value|          小区|
# +--------+--------------+
# |    2557|      凯悦华庭|
# |    2558|      置地逸轩|
# ...

df.select(df["城市"].alias('city'), '小区').show()
# +------+--------------+
# |  city|          小区|
# +------+--------------+
# |深圳市|      凯悦华庭|
# |深圳市|      置地逸轩|
# ...
           

按条件过滤

在filter中定义过滤条件,如果要进行多条件过滤,使用以下符号:

&   代表 and 
|   代表or
~   代表not
           
df.select('城市', '区县', '区域', '小区').filter(df["小区"] == '凯悦华庭').show()
# +------+------+------+--------+
# |  城市|  区县|  区域|    小区|
# +------+------+------+--------+
# |深圳市|罗湖区|春风路|凯悦华庭|
# ...

df.select('城市', '区县', '区域', '小区').filter((df["城市"] == '深圳市') & (df["区县"] == '南山区')).show()
# +------+------+------+----------------+
# |  城市|  区县|  区域|            小区|
# +------+------+------+----------------+
# |深圳市|南山区|白石洲|中海深圳湾畔花园|
# |深圳市|南山区|白石洲|    侨城豪苑二期|
# ...

# 可以直接在filter里面写条件表达式
df.select('id', '城市', '区县', '区域', '小区').filter("id = 5000").show()
           

构造新列

使用以下两种方式构造新列,其中使

pyspark.sql.functions

下提供很多sql操作函数。

我们使用

functions

下的

lit

构造新列。

import pyspark.sql.functions as F

df.select(df["城市"] + 1, '城市', '小区').show()
# +----------+------+--------------+
# |(城市 + 1)|  城市|          小区|
# +----------+------+--------------+
# |      null|深圳市|      凯悦华庭|

df.select(F.lit("test").alias('城市plus'), '城市', '小区').show()
# +--------+------+--------------+
# |城市plus|  城市|          小区|
# +--------+------+--------------+
# |    test|深圳市|      凯悦华庭|
           

增加行

# 取出一行
df2 = df.limit(1)  # one row

# 增加行
print(df.count())  # 47104
print(df.unionAll(df2).count())  # 47105
           

删除重复记录

我们上面增加了一个重复行,目前条数是47105,我们接下来看去除重复行之后47104。

另外一个问题,对于原始数据来说,我们的id是不重复的,但是其他字段可能会重复。

因此我们去重的时候应该去掉id的影响,这里可以使用下面第二种指定字段去重。

# 删除重复记录
print(df.drop_duplicates().count())  # 47104

print(df.drop_duplicates(['省份', '城市', '区县', '区域', '小区']).count())
           

删除列

# 删除列
print(df.drop('id').columns)
# ['省份', '城市', '区县', '区域', '小区', '源地址',...
           

删除缺失值行

# 删除存在缺失的记录
print(df.dropna().count())  # 47092
# 删除指定字段中存在缺失的记录
print(df.dropna(subset=['省份', '城市']).count())  # 47104
           

填充缺失值

填充缺失值,指定字段需要填充的值。

分组统计和计算

# 分组统计
df_g1 = df.groupby("区县").count()
df_g1.show()
# +--------+-----+
# |    区县|count|
# +--------+-----+
# |  龙华区| 4217|
# ...

# 分组计算
df.groupby('区县').agg(F.max(df['总价'])).show()
+--------+----------+
# |    区县| max(总价)|
# +--------+----------+
# |  龙华区|5200.00000|
# |  福田区|8300.00000|
# |  罗湖区|7000.00000|
# |  坪山区|1588.00000|
# |  南山区|9800.00000|
# |  龙岗区|4000.00000|
# |  盐田区|5500.00000|
# |  光明区|5200.00000|
# |大鹏新区|3500.00000|
# |  宝安区|8800.00000|
# +--------+----------+
           

函数计算

同样,函数计算由

pyspark.sql.functions

提供。

# 函数计算
df.select(F.max(df["总价"])).show()  # 最大值
# +----------+
# | max(总价)|
# +----------+
# |9800.00000|
# +----------+

df.select(F.min(df["总价"])).show()  # 最小值
# +---------+
# |min(总价)|
# +---------+
# |  1.10000|
# +---------+

df.select(F.avg(df["总价"])).show()  # 平均值
# +-------------+
# |    avg(总价)|
# +-------------+
# |577.736916000|
# +-------------+

df.select(F.countDistinct(df["总价"])).show()  # 去重后再统计
#  |count(DISTINCT 总价)|
# +--------------------+
# |                1219|
# +--------------------+

df.select(F.count(df["总价"])).show()  # 去掉缺失值会再统计
# +-----------+
# |count(总价)|
# +-----------+
# |      47104|
# +-----------+
           

其余的函数:

# 'lit': 'Creates a :class:`Column` of literal value.',
# 'col': 'Returns a :class:`Column` based on the given column name.',
# 'column': 'Returns a :class:`Column` based on the given column name.',
# 'asc': 'Returns a sort expression based on the ascending order of the given column name.',
# 'desc': 'Returns a sort expression based on the descending order of the given column name.',
#
# 'upper': 'Converts a string expression to upper case.',
# 'lower': 'Converts a string expression to upper case.',
# 'sqrt': 'Computes the square root of the specified float value.',
# 'abs': 'Computes the absolutle value.',
#
# 'max': 'Aggregate function: returns the maximum value of the expression in a group.',
# 'min': 'Aggregate function: returns the minimum value of the expression in a group.',
# 'first': 'Aggregate function: returns the first value in a group.',
# 'last': 'Aggregate function: returns the last value in a group.',
# 'count': 'Aggregate function: returns the number of items in a group.',
# 'sum': 'Aggregate function: returns the sum of all values in the expression.',
# 'avg': 'Aggregate function: returns the average of the values in a group.',
# 'mean': 'Aggregate function: returns the average of the values in a group.',
# 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
           

描述性分析

df.describe("总价").show()
# +-------+-----------------+
# |summary|             总价|
# +-------+-----------------+
# |  count|            47104|
# |   mean|    577.736916000|
# | stddev|544.7605196104298|
# |    min|          1.10000|
# |    max|       9800.00000|
# +-------+-----------------+
           

最后,记得stop一下。

参考链接

Quick Start

Spark Python API (Sphinx)

PySpark SQL常用语法

spark官方文档 翻译 之pyspark.sql.SQLContext