天天看点

用写sql的思路写 pyspark

文章目录

    • 初始化:
    • 1. 查询筛选以及UDF
      • 1.1 新增字段
      • 1.2 字段为list,包含子项
      • 1.3 pyspark的`case when`使用
    • 2. group by 聚合
      • 2.1 想自定义聚合函数,
    • 3. 对数据结果排序
    • 4. join关联
    • 5. 窗口函数
    • 6. 取topN
    • 7. 展开一个字段到多行
      • 参考链接

初始化:

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.utils import AnalysisException
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, Row
import pyspark.sql.functions as F

spark = SparkContext()
hiveContext = HiveContext(spark)
a_table = spark.table('a_table')
a_table.printSchema()
>
root
 |-- name: string (nullable = true)
 |-- class: string (nullable = true)
 |-- project: string (nullable = false)
 |-- score: long (nullable = false)
           

1. 查询筛选以及UDF

查询的筛选条件可以跟sql一样写到where。就不多说了

select name,score
from a_table
where class='一班' and project='语文'
           
# 这里where有不同的写法,都列出来,如下:
spark.table('a_table').select('name','score')\
	.where("class='一班' and project='语文'")\
	.where((F.col("class")='一班') & (F.col('project')='语文'))
           

1.1 新增字段

select name,score,score*2 as double_score
from a_table
where class='一班' and project='语文'
           
spark.table('a_table')
	.select('name','score')\
	.withColumn('double_score','score'*2)\
	.where("class='一班' and project='语文'")

# 2. 假设有复杂逻辑的udf如下
def fun_a(score):
	if score:
		return score*2
	else:
		return None
# 注册udf时,需要给出输出类型,如果是多种输出类型的怎么处理?目前我还没有解决。
# 占时是注册多个udf的笨法子
udf_a = F.udf(fun_a, LongType())
spark.table('a_table')
	.select('name','score')\
	.withColumn('double_score',udf_a('score'))\
	.where("class='一班' and project='语文'")

# 3. 这个条目是啥来着忘了。
# TODO

# 4. 使用的udf需要参数
# 找到一个写法,但是我没有试成功,可能是我的函数太复杂了,找个简单的再试试 TODO
# https://blog.csdn.net/u011582757/article/details/102991347
# select udf_4(col,0.5) as get_percent_5 from ..
def udf_d(argv):
    def fun_d(value, argv):
        if not argv or argv == 0:
            return 0
        else:
            return value**argv
    return F.udf(lambda x: fun_d(x, argv),  LongType())
spark.table('a_table')
	.select('name','score')\
	.withColumn('triple_score',udf_d(3)('score')) \
	.where("class='一班' and project='语文'")
           

1.2 字段为list,包含子项

1.3 pyspark的

case when

使用

from pyspark.sql import functions as F
spark.table('a_table')\
	.withColumn("new_col", F.when((df["col-1"] > 0.0) & (df["col-2"] > 0.0), 1).otherwise(0))
# 这里的条件要加括号,否则报错
# py4j.Py4JException: Method and([class java.lang.Double]) does not exist
           

2. group by 聚合

select class,name,sum(score),count(score)
from a_table
group by class,name
           
# 这时候发现python的换行真的变态,很不好用啊,
# 想要完整的逻辑非要加\才能换行,你说这不是脑残吗?
# 要是按照他们提供的PEP8换行出来,真的是没眼看。
spark.table('a_table')\
	.groupby('class','name')\
	.agg(F.sum('score').alias('sum_score'),
	     F.count('score').alias('project_count'),
	     F.countDistinct('score').alias('project_count'),
		)
           

2.1 想自定义聚合函数,

此处有大坑,待填坑

3. 对数据结果排序

4. join关联

table_1 = spark.table('table_name_1')
table_2 = spark.table('table_name_2')
# 这里的how有多种模式可选,选用需注意
table_3 = table_1.join(table_2, on=['user_id'], how='outer')
# 这里on条件的另一种写法,更熟悉了
table_3 = table_1.join(table_2, table_1.user_id==table_2.user_id, how='outer')
           

5. 窗口函数

加rank和排序,pyspark提供有窗口函数,跟sql使用的方法基本一样

6. 取topN

a_table.sort_values(by=['socre'], ascending=False).head(n)
a_table.sort_values(by=['socre'], ascending=False).tail(n)
           

7. 展开一个字段到多行

参考链接

  1. https://www.pythonheidong.com/blog/article/76628/