天天看點

用spark實作row_number()Spark Table Api實作Spark Core Api實作

準備資料

//company app visit_times
騰訊,騰訊視訊,800
騰訊,QQ音樂,900
騰訊,微信讀書,100
騰訊,微信,900
騰訊,騰訊課堂,200
阿裡,支付寶,900
阿裡,優酷視訊,700
阿裡,蝦米音樂,500
阿裡,飛豬,700
阿裡,釘釘,600
百度,百度App,700
百度,百度地圖,800
百度,愛奇藝,800
百度,百度錢包,100
百度,百度貼吧,200
           

Spark Table Api實作

import org.apache.spark.sql.functions._
val df = spark.read.textFile("./data/test")
      .map(_.split(","))
      .map(x => (x(0), x(1), x(2)))
      .toDF("company", "app", "vst_times")
      .groupBy("company","app")
      .agg(sum("vst_times") as "vst_times")
      .cache()
    val windows = Window.partitionBy("company","app").orderBy(col("vst_times").desc)
    //取出BAT三大公司通路量Top2的app
    df.select("company", "app", "vst_times")
      .withColumn("row_number", row_number().over(windows))
      .where("row_number <= 2 ")
      .select("company", "app", "vst_times")
      .show()
           

Spark Core Api實作

val apprdd = spark.read.textFile("test.log")
       .map(line => {
         val x = line.split(",")
         ((x(0), x(1)), x(2))
       })
 val reduced: RDD[((String, String), Int)] = apprdd.reduceByKey(_+_)
 val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
 //按照公司分組
 val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))
 //輸出結果
 sorted.foreach(println(_))
 //釋放資源
 sc.stop()
           

結果輸出:

+-------+--------+---------+
|company|     app|vst_times|
+-------+--------+---------+
|   騰訊|  QQ音樂|      900|
|   騰訊|    微信|      900|
|   百度|百度地圖|      800|
|   百度|  愛奇藝|      800|
|   阿裡|  支付寶|      900|
|   阿裡|優酷視訊|      700|
+-------+--------+---------+
           

繼續閱讀