準備資料
//company app visit_times
騰訊,騰訊視訊,800
騰訊,QQ音樂,900
騰訊,微信讀書,100
騰訊,微信,900
騰訊,騰訊課堂,200
阿裡,支付寶,900
阿裡,優酷視訊,700
阿裡,蝦米音樂,500
阿裡,飛豬,700
阿裡,釘釘,600
百度,百度App,700
百度,百度地圖,800
百度,愛奇藝,800
百度,百度錢包,100
百度,百度貼吧,200
Spark Table Api實作
import org.apache.spark.sql.functions._
val df = spark.read.textFile("./data/test")
.map(_.split(","))
.map(x => (x(0), x(1), x(2)))
.toDF("company", "app", "vst_times")
.groupBy("company","app")
.agg(sum("vst_times") as "vst_times")
.cache()
val windows = Window.partitionBy("company","app").orderBy(col("vst_times").desc)
//取出BAT三大公司通路量Top2的app
df.select("company", "app", "vst_times")
.withColumn("row_number", row_number().over(windows))
.where("row_number <= 2 ")
.select("company", "app", "vst_times")
.show()
Spark Core Api實作
val apprdd = spark.read.textFile("test.log")
.map(line => {
val x = line.split(",")
((x(0), x(1)), x(2))
})
val reduced: RDD[((String, String), Int)] = apprdd.reduceByKey(_+_)
val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
//按照公司分組
val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))
//輸出結果
sorted.foreach(println(_))
//釋放資源
sc.stop()
結果輸出:
+-------+--------+---------+
|company| app|vst_times|
+-------+--------+---------+
| 騰訊| QQ音樂| 900|
| 騰訊| 微信| 900|
| 百度|百度地圖| 800|
| 百度| 愛奇藝| 800|
| 阿裡| 支付寶| 900|
| 阿裡|優酷視訊| 700|
+-------+--------+---------+