寫在前面: 我是,一枚喜愛做特效,聽音樂,分享技術的
「nicedays」
。這名字是來自world order樂隊的一首
大資料開發猿
HAVE A NICE DAY
。如今,走到現在很多坎坷和不順,如今終于明白nice day是需要自己賦予的。
白駒過隙,時光荏苒,珍惜當下~~
寫部落格一方面是對自己學習的一點點
,另一方面則是希望能夠幫助更多對大資料感興趣的朋友。如果你也對
總結及記錄
感興趣,可以關注我的動态
大資料與機器學習
,讓我們一起挖掘資料與人工智能的價值~
https://blog.csdn.net/qq_35050438
文章目錄
-
- Spark--DataFrameAPI常見操作:
-
- 中繼資料:
- 處理日期和時間戳:
- 處理資料空值:
- 視窗函數解決問題:
- UDF自定義函數:
- 列轉行:
- 取反,差集,交集:
- 側視圖:
- DataFrame某列轉集合後擷取對應行的元素:
Spark–DataFrameAPI常見操作:
中繼資料:
def createAndLoadData()={
val spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate()
}
def getLocalTeacher(spark:SparkSession)={
val schema = StructType(
List(
StructField("t_id",StringType,nullable = true),
StructField("t_name",StringType,nullable = true)
)
)
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row("1","張三"),
Row("2","李四")
)),schema)
}
def getLocalStudent(spark:SparkSession)={
val schema = StructType(
List(
StructField("s_id",StringType,nullable = true),
StructField("s_name",StringType,nullable = true),
StructField("s_birth",StringType,nullable = true),
StructField("s_sex",StringType,nullable = true)
)
)
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row("1","趙雷","1990,01,01","男"),
Row("2","錢電","1990,12,21","男"),
Row("3","孫風","1990,05,20","男"),
Row("4","李雲","1990,08,01","男"),
Row("5","周梅","1991,12,01","女"),
Row("6","吳蘭","1992,03,01","女"),
Row("7","鄭竹","1989,07,01","女"),
Row("8","王菊","1990,01,20","女")
)),schema)
}
def getLocalCourse(spark:SparkSession)={
val schema = StructType(
List(
StructField("c_id",StringType,nullable = true),
StructField("c_name",StringType,nullable = true),
StructField("t_id",StringType,nullable = true)
)
)
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row("1","國文","2"),
Row("2","數學","1"),
Row("3","英語","3")
)),schema)
}
def getLocalScore(spark:SparkSession)={
val schema = StructType(
List(
StructField("s_id",StringType,true),
StructField("c_id",StringType,true),
StructField("s_score",StringType,true)
)
)
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row("1","1","80"),
Row("1","2","90"),
Row("1","3","99"),
Row("2","1","70"),
Row("2","2","60"),
Row("2","3","65"),
Row("3","1","80"),
Row("3","2","80"),
Row("3","3","80"),
Row("4","1","50"),
Row("4","2","30"),
Row("4","3","40"),
Row("5","1","76"),
Row("5","2","87"),
Row("6","1","31"),
Row("6","3","34"),
Row("7","2","89"),
Row("7","3","98")
)),schema)
}
// 擷取基礎資料
val (spark,course,student,teacher,score) = createAndLoadData()
處理日期和時間戳:
查詢本周過生日的學生
student.select($"s_id",$"s_name",weekofyear(concat(year(current_date()),date_format($"s_birth","-MM-dd"))). alias("nt"),weekofyear(current_date()).alias("dt")).
filter($"nt" === $"dt").
show()
處理資料空值:
基于DataFrame,處理null值主要方式是.na子包。
ifnull
- 如果第一個值為空,則允許第二個值去取代他
nullif
- 如果兩個值相等,則傳回null,否則傳回第二個值
nvl
- 如果第一個值為null,則傳回第二個值,否則傳回第一個值
nvl2
- 如果第一個不為null,傳回第二個指定值,否則傳回最後一個指定值
drop
- 删除包含null的行
df.na.drop()
df.na.drop("any") # 存在null就删除
df.na.drop("all") # 全部為null才能删除
df.na.drop("any",Seq("StockCode","InvoiceNo")) # 指定列删除
fill
- 用一組值填充一列或者多列
df.na.fill("value") # 将字元類型列的null值替換
df.na.fill(5,Seq("store")) # 指定列替換null值
replace
- 将目前值替換掉某列的所有值
df.na.replace("description",Map("" -> "UNKNOWN"))
視窗函數解決問題:
查詢每門課程成績最好的前三名
score.select($"s_id",$"c_id",$"s_score",row_number().over(Window.partitionBy("c_id").orderBy(desc("s_score"))).alias("rank")).
withColumn("s_id",regexp_replace(isnull($"s_id"),"NULL",$"s_id")).
filter($"rank" <= 3).
show()
UDF自定義函數:
統計各科成績各分數段人數:課程編号,課程名稱,[100-85],[85-70],[70-60],[0-60]及所占百分比
// udf
val scoreTransaction = udf{score: String => {
score.toInt match {
case x if x > 85 => "A"
case x if x > 70 => "B"
case x if x > 60 => "C"
case _ => "D"
}
}
val countval = score.groupBy($"c_id").agg(count($"c_id").alias("Csum"))
score.select($"s_id", $"c_id", scoreTransaction($"s_score").alias("level")).
join(countval,"c_id").
groupBy($"c_id", $"level").
agg((round(count($"level") / first($"Csum"),2) * 100).alias("per")).
withColumn("%",lit("%")).
select($"c_id",$"level",concat($"per",$"%").alias("pers")).
show()
列轉行:
查詢各科成績最高分、最低分和平均分:
以如下形式顯示:課程 ID,課程 name,及格率,中等率,優良率,優秀率
及格為>=60,中等為:70-80,優良為:80-90,優秀為:>=90
要求輸出課程号和選修人數,查詢結果按人數降序排列,若人數相同,按課程号升序排列
val tmp = score.
join(course,Seq("c_id")).
groupBy($"c_name",$"c_id"). agg(round(count(when($"s_score">=60,1).otherwise(null))/count($"c_id"),2).as("jige_ratio"),
round(count(when($"s_score">=70 && $"s_score"<80 ,1).otherwise(null))/count($"s_id"),2).as("zhongdeng_ratio"),
round(count(when($"s_score">=80 && $"s_score"<90 ,1).otherwise(null))/count($"c_id"),2).as("youliang_ratio"),
round(count(when($"s_score">=90,1).otherwise(null))/count($"c_id"),2).as("youxiu_ratio"),
count($"c_id").as("nums")).
sort(desc("nums"),asc("c_id")).
show()
取反,差集,交集:
查詢沒學過"張三"老師授課的同學的資訊:–左反連接配接
val tmp = teacher.
filter($"t_name" === lit("張三")).
join(course, "t_id").
select("c_id").
join(score, "c_id").
groupBy($"s_id").
agg(count($"s_id")).
select("s_id")
// Seq("s_id") <=> student("s_id") === frame("s_id")
student.join(tmp,Seq("s_id"), "left_anti" ).show()
側視圖:
explode:
- explode方法可以從規定的Array或者Map中使用每一個元素建立一列
import org.apache.spark.sql.functions._
// 數組和清單
df.withColumn("entityPair", explode(col("entityList")));
// map
df.select(df("name"),explode(df("myScore"))).toDF("key","value")
name | age | interest |
---|---|---|
A | 20 | 籃球,羽毛球 |
B | 32 | 遊泳,慢跑,看電視 |
… | … | … |
轉成:
name | age | interest |
---|---|---|
A | 20 | 籃球 |
A | 20 | 羽毛球 |
B | 32 | 遊泳 |
B | 32 | 慢跑 |
B | 32 | 看電視 |
DataFrame某列轉集合後擷取對應行的元素:
查詢和"01"号的同學學習的課程完全相同的其他同學的資訊:
/** score.
filter($"s_id" === 1).
groupBy($"s_id").
agg(collect_set($"s_id").as("c1")).
//select($"c1").
// map對每行進行周遊,getas拿到c1的每行資料再把它轉化成seq
map(a => a.getAs[Seq[String]]("c1").toArray).
collectAsList()
get(0)
**/
val value= score.
filter("s_id = 1").
select("c_id").distinct().
agg(count("c_id").alias("co")).
select("co").
collect().
flatMap(x=>x.toSeq).
toList
val value1 = score.
filter("s_id = 1").
select("c_id").
distinct()
val frame = score.
join(value1, "c_id").
groupBy("s_id").
agg(count(score("s_id")).alias("count")).
filter($"count".isin(value:_*) && $"s_id" =!= 1).
join(student,"s_id").
show()