天天看點

Spark系列 (二)結構化API--DataFrame常見的業務操作

寫在前面: 我是

「nicedays」

,一枚喜愛做特效,聽音樂,分享技術的

大資料開發猿

。這名字是來自world order樂隊的一首

HAVE A NICE DAY

。如今,走到現在很多坎坷和不順,如今終于明白nice day是需要自己賦予的。

白駒過隙,時光荏苒,珍惜當下~~

寫部落格一方面是對自己學習的一點點

總結及記錄

,另一方面則是希望能夠幫助更多對大資料感興趣的朋友。如果你也對

大資料與機器學習

感興趣,可以關注我的動态

https://blog.csdn.net/qq_35050438

,讓我們一起挖掘資料與人工智能的價值~

文章目錄

    • Spark--DataFrameAPI常見操作:
      • 中繼資料:
      • 處理日期和時間戳:
      • 處理資料空值:
      • 視窗函數解決問題:
      • UDF自定義函數:
      • 列轉行:
      • 取反,差集,交集:
      • 側視圖:
      • DataFrame某列轉集合後擷取對應行的元素:

Spark–DataFrameAPI常見操作:

中繼資料:

def createAndLoadData()={
    val spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate()
  }
  def getLocalTeacher(spark:SparkSession)={
    val schema = StructType(
      List(
        StructField("t_id",StringType,nullable = true),
        StructField("t_name",StringType,nullable = true)
      )
    )
    spark.createDataFrame(spark.sparkContext.parallelize(Seq(
      Row("1","張三"),
      Row("2","李四")
    )),schema)
  }

  def getLocalStudent(spark:SparkSession)={
    val schema = StructType(
      List(
        StructField("s_id",StringType,nullable = true),
        StructField("s_name",StringType,nullable = true),
        StructField("s_birth",StringType,nullable = true),
        StructField("s_sex",StringType,nullable = true)
      )
    )
    spark.createDataFrame(spark.sparkContext.parallelize(Seq(
      Row("1","趙雷","1990,01,01","男"),
      Row("2","錢電","1990,12,21","男"),
      Row("3","孫風","1990,05,20","男"),
      Row("4","李雲","1990,08,01","男"),
      Row("5","周梅","1991,12,01","女"),
      Row("6","吳蘭","1992,03,01","女"),
      Row("7","鄭竹","1989,07,01","女"),
      Row("8","王菊","1990,01,20","女")
    )),schema)
  }

  def getLocalCourse(spark:SparkSession)={
    val schema = StructType(
      List(
        StructField("c_id",StringType,nullable = true),
        StructField("c_name",StringType,nullable = true),
        StructField("t_id",StringType,nullable = true)
      )
    )
    spark.createDataFrame(spark.sparkContext.parallelize(Seq(
      Row("1","國文","2"),
      Row("2","數學","1"),
      Row("3","英語","3")
    )),schema)
  }

  def getLocalScore(spark:SparkSession)={
    val schema = StructType(
      List(
        StructField("s_id",StringType,true),
        StructField("c_id",StringType,true),
        StructField("s_score",StringType,true)
      )
    )
    spark.createDataFrame(spark.sparkContext.parallelize(Seq(
      Row("1","1","80"),
      Row("1","2","90"),
      Row("1","3","99"),
      Row("2","1","70"),
      Row("2","2","60"),
      Row("2","3","65"),
      Row("3","1","80"),
      Row("3","2","80"),
      Row("3","3","80"),
      Row("4","1","50"),
      Row("4","2","30"),
      Row("4","3","40"),
      Row("5","1","76"),
      Row("5","2","87"),
      Row("6","1","31"),
      Row("6","3","34"),
      Row("7","2","89"),
      Row("7","3","98")
    )),schema)
  }
           
// 擷取基礎資料
 val (spark,course,student,teacher,score) = createAndLoadData()
           

處理日期和時間戳:

查詢本周過生日的學生
student.select($"s_id",$"s_name",weekofyear(concat(year(current_date()),date_format($"s_birth","-MM-dd"))). alias("nt"),weekofyear(current_date()).alias("dt")).
      filter($"nt" === $"dt").
      show()
           

處理資料空值:

基于DataFrame,處理null值主要方式是.na子包。

ifnull

  • 如果第一個值為空,則允許第二個值去取代他

nullif

  • 如果兩個值相等,則傳回null,否則傳回第二個值

nvl

  • 如果第一個值為null,則傳回第二個值,否則傳回第一個值

nvl2

  • 如果第一個不為null,傳回第二個指定值,否則傳回最後一個指定值

drop

  • 删除包含null的行
df.na.drop()
df.na.drop("any") # 存在null就删除
df.na.drop("all") # 全部為null才能删除
df.na.drop("any",Seq("StockCode","InvoiceNo")) # 指定列删除
           

fill

  • 用一組值填充一列或者多列
df.na.fill("value") # 将字元類型列的null值替換
df.na.fill(5,Seq("store")) # 指定列替換null值
           

replace

  • 将目前值替換掉某列的所有值
df.na.replace("description",Map("" -> "UNKNOWN"))
           

視窗函數解決問題:

查詢每門課程成績最好的前三名
score.select($"s_id",$"c_id",$"s_score",row_number().over(Window.partitionBy("c_id").orderBy(desc("s_score"))).alias("rank")).
      withColumn("s_id",regexp_replace(isnull($"s_id"),"NULL",$"s_id")).
      filter($"rank" <= 3).
      show()
           
Spark系列 (二)結構化API--DataFrame常見的業務操作

UDF自定義函數:

統計各科成績各分數段人數:課程編号,課程名稱,[100-85],[85-70],[70-60],[0-60]及所占百分比
// udf
    val scoreTransaction = udf{score: String => {
      score.toInt match {
        case x if x > 85 => "A"
        case x if x > 70 => "B"
        case x if x > 60 => "C"
        case _ => "D"
      }
    }
                               
                               
val countval = score.groupBy($"c_id").agg(count($"c_id").alias("Csum"))
                               
    score.select($"s_id", $"c_id", scoreTransaction($"s_score").alias("level")).
      join(countval,"c_id").
      groupBy($"c_id", $"level").
      agg((round(count($"level") / first($"Csum"),2) * 100).alias("per")).
      withColumn("%",lit("%")).
      select($"c_id",$"level",concat($"per",$"%").alias("pers")).
      show()
           
Spark系列 (二)結構化API--DataFrame常見的業務操作

列轉行:

查詢各科成績最高分、最低分和平均分:

以如下形式顯示:課程 ID,課程 name,及格率,中等率,優良率,優秀率

及格為>=60,中等為:70-80,優良為:80-90,優秀為:>=90

要求輸出課程号和選修人數,查詢結果按人數降序排列,若人數相同,按課程号升序排列

val tmp = score.
          join(course,Seq("c_id")).
          groupBy($"c_name",$"c_id").       agg(round(count(when($"s_score">=60,1).otherwise(null))/count($"c_id"),2).as("jige_ratio"),
            round(count(when($"s_score">=70 && $"s_score"<80 ,1).otherwise(null))/count($"s_id"),2).as("zhongdeng_ratio"),
            round(count(when($"s_score">=80 && $"s_score"<90 ,1).otherwise(null))/count($"c_id"),2).as("youliang_ratio"),
            round(count(when($"s_score">=90,1).otherwise(null))/count($"c_id"),2).as("youxiu_ratio"),
            count($"c_id").as("nums")).
          sort(desc("nums"),asc("c_id")).
          show()
           
Spark系列 (二)結構化API--DataFrame常見的業務操作

取反,差集,交集:

查詢沒學過"張三"老師授課的同學的資訊:–左反連接配接
val tmp = teacher.
      filter($"t_name" === lit("張三")).
      join(course, "t_id").
      select("c_id").
      join(score, "c_id").
      groupBy($"s_id").
      agg(count($"s_id")).
      select("s_id")
    // Seq("s_id") <=> student("s_id") === frame("s_id")
    student.join(tmp,Seq("s_id"), "left_anti" ).show()
           
Spark系列 (二)結構化API--DataFrame常見的業務操作

側視圖:

explode:

  • explode方法可以從規定的Array或者Map中使用每一個元素建立一列
import org.apache.spark.sql.functions._
// 數組和清單
df.withColumn("entityPair", explode(col("entityList")));
// map
df.select(df("name"),explode(df("myScore"))).toDF("key","value")
           
name age interest
A 20 籃球,羽毛球
B 32 遊泳,慢跑,看電視

轉成:

name age interest
A 20 籃球
A 20 羽毛球
B 32 遊泳
B 32 慢跑
B 32 看電視

DataFrame某列轉集合後擷取對應行的元素:

查詢和"01"号的同學學習的課程完全相同的其他同學的資訊:
/** score.
      filter($"s_id" === 1).
      groupBy($"s_id").
      agg(collect_set($"s_id").as("c1")).
      //select($"c1").
      // map對每行進行周遊,getas拿到c1的每行資料再把它轉化成seq
      map(a => a.getAs[Seq[String]]("c1").toArray).
      collectAsList()
      get(0)
   **/
	val value= score.
      filter("s_id = 1").
      select("c_id").distinct().
      agg(count("c_id").alias("co")).
      select("co").
      collect().
      flatMap(x=>x.toSeq).
      toList
    val value1 = score.
      filter("s_id = 1").
      select("c_id").
      distinct()
    val frame = score.
      join(value1, "c_id").
      groupBy("s_id").
      agg(count(score("s_id")).alias("count")).
      filter($"count".isin(value:_*) && $"s_id" =!= 1).
      join(student,"s_id").
      show()
           
Spark系列 (二)結構化API--DataFrame常見的業務操作

繼續閱讀