天天看點

用寫sql的思路寫 pyspark

文章目錄

    • 初始化:
    • 1. 查詢篩選以及UDF
      • 1.1 新增字段
      • 1.2 字段為list,包含子項
      • 1.3 pyspark的`case when`使用
    • 2. group by 聚合
      • 2.1 想自定義聚合函數,
    • 3. 對資料結果排序
    • 4. join關聯
    • 5. 視窗函數
    • 6. 取topN
    • 7. 展開一個字段到多行
      • 參考連結

初始化:

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.utils import AnalysisException
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, Row
import pyspark.sql.functions as F

spark = SparkContext()
hiveContext = HiveContext(spark)
a_table = spark.table('a_table')
a_table.printSchema()
>
root
 |-- name: string (nullable = true)
 |-- class: string (nullable = true)
 |-- project: string (nullable = false)
 |-- score: long (nullable = false)
           

1. 查詢篩選以及UDF

查詢的篩選條件可以跟sql一樣寫到where。就不多說了

select name,score
from a_table
where class='一班' and project='國文'
           
# 這裡where有不同的寫法,都列出來,如下:
spark.table('a_table').select('name','score')\
	.where("class='一班' and project='國文'")\
	.where((F.col("class")='一班') & (F.col('project')='國文'))
           

1.1 新增字段

select name,score,score*2 as double_score
from a_table
where class='一班' and project='國文'
           
spark.table('a_table')
	.select('name','score')\
	.withColumn('double_score','score'*2)\
	.where("class='一班' and project='國文'")

# 2. 假設有複雜邏輯的udf如下
def fun_a(score):
	if score:
		return score*2
	else:
		return None
# 注冊udf時,需要給出輸出類型,如果是多種輸出類型的怎麼處理?目前我還沒有解決。
# 占時是注冊多個udf的笨法子
udf_a = F.udf(fun_a, LongType())
spark.table('a_table')
	.select('name','score')\
	.withColumn('double_score',udf_a('score'))\
	.where("class='一班' and project='國文'")

# 3. 這個條目是啥來着忘了。
# TODO

# 4. 使用的udf需要參數
# 找到一個寫法,但是我沒有試成功,可能是我的函數太複雜了,找個簡單的再試試 TODO
# https://blog.csdn.net/u011582757/article/details/102991347
# select udf_4(col,0.5) as get_percent_5 from ..
def udf_d(argv):
    def fun_d(value, argv):
        if not argv or argv == 0:
            return 0
        else:
            return value**argv
    return F.udf(lambda x: fun_d(x, argv),  LongType())
spark.table('a_table')
	.select('name','score')\
	.withColumn('triple_score',udf_d(3)('score')) \
	.where("class='一班' and project='國文'")
           

1.2 字段為list,包含子項

1.3 pyspark的

case when

使用

from pyspark.sql import functions as F
spark.table('a_table')\
	.withColumn("new_col", F.when((df["col-1"] > 0.0) & (df["col-2"] > 0.0), 1).otherwise(0))
# 這裡的條件要加括号,否則報錯
# py4j.Py4JException: Method and([class java.lang.Double]) does not exist
           

2. group by 聚合

select class,name,sum(score),count(score)
from a_table
group by class,name
           
# 這時候發現python的換行真的變态,很不好用啊,
# 想要完整的邏輯非要加\才能換行,你說這不是腦殘嗎?
# 要是按照他們提供的PEP8換行出來,真的是沒眼看。
spark.table('a_table')\
	.groupby('class','name')\
	.agg(F.sum('score').alias('sum_score'),
	     F.count('score').alias('project_count'),
	     F.countDistinct('score').alias('project_count'),
		)
           

2.1 想自定義聚合函數,

此處有大坑,待填坑

3. 對資料結果排序

4. join關聯

table_1 = spark.table('table_name_1')
table_2 = spark.table('table_name_2')
# 這裡的how有多種模式可選,選用需注意
table_3 = table_1.join(table_2, on=['user_id'], how='outer')
# 這裡on條件的另一種寫法,更熟悉了
table_3 = table_1.join(table_2, table_1.user_id==table_2.user_id, how='outer')
           

5. 視窗函數

加rank和排序,pyspark提供有視窗函數,跟sql使用的方法基本一樣

6. 取topN

a_table.sort_values(by=['socre'], ascending=False).head(n)
a_table.sort_values(by=['socre'], ascending=False).tail(n)
           

7. 展開一個字段到多行

參考連結

  1. https://www.pythonheidong.com/blog/article/76628/