天天看点

spark小案列解答

11、案例解答

目录

    • 11、案例解答
      • 资源下载
      • 11.1. PV&UV
      • 11.2. 二次排序
      • 12.3. 分组取topN

资源下载

scala idea代码.https://download.csdn.net/download/weixin_43660536/21011874

11.1. PV&UV

  • 统计网站 pv 和 uv 是一个非常常见的场景。
  • PV(page view) 是网站分析的一个术语, 即页面浏览量或点击量,用以衡量网站用户访问的网页的数量。
  • UV(unique visitor) 即独立访客数,一个用户同一天内再次访问该网站则不计数。
146.1.30.98	河南	2017-10-10	1512012307080	5263761960810313758	www.jd.com	Comment
146.1.30.98	河南	2017-10-10	1512012307080	5263761960810313758	www.mi.com	View
98.188.31.30	上海	2017-10-10	1512012307081	2605157034872986714	www.mi.com	View
192.149.67.17	天津	2017-10-10	1512012307081	539962031835013115	www.baidu.com	Comment
115.77.12.186	安徽	2017-10-10	1512012307084	5641635304912151098	www.jd.com	View
115.77.12.186	安徽	2017-10-10	1512012307084	5641635304912151098	www.suning.com	Buy
12.150.165.203	内蒙	2017-10-10	1512012307085	7712710500530530727	www.mi.com	View
           
//pv浏览量
def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("PageView" + System.currentTimeMillis())
    val sparkContext = new SparkContext(sparkConf)
    val value: RDD[String] = sparkContext.textFile("src/main/resources/pvuvdata")

    val rdd2: RDD[(String, Int)] = value.map(s => {
      (s.split("\t")(5), 1)
    })
    val rdd3: RDD[(String, Int)] = rdd2.reduceByKey((x, y) => x + y)
    rdd3.foreach(println)

    value.map(x => (x.split("\t")(5), 1)).reduceByKey((_ + _)).foreach(println)

    sparkContext.stop()
  }
           
//uv浏览量
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("UniqueVisitor" + System.currentTimeMillis())
    val sparkContext = new SparkContext(sparkConf)
    val value: RDD[String] = sparkContext.textFile("src/main/resources/pvuvdata")
    val rdd2: RDD[(String, String)] = value.map(s => (s.split("\t")(5), s.split("\t")(0)))
    val rdd3: RDD[(String, Int)] = rdd2.distinct().map(s => (s._1, 1))
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)
    rdd4.foreach(println)

    //value.map(s => (s.split("\t")(5), s.split("\t")(0))).distinct().map(s => (s._1, 1)).reduceByKey(_+_).foreach(println);

    sparkContext.stop()
  }
           

11.2. 二次排序

  • 根据省市二次排序
//生成文档代码
def main(args: Array[String]): Unit = {
    for (city <- 1 to 9; province <- 1 to 9) {
      println("city" + (Random.nextInt(90) + 10) + "\t" + "province" + (Random.nextInt(90) + 10))
    }
}
//city15	province52
//city45	province96
//city97	province74
//city10	province82
           
//二次排序
def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("orderBy" + System.currentTimeMillis())
    val sparkContext = new SparkContext(sparkConf)
    //val value: RDD[String] = sparkContext.textFile("src/main/resources/ctiy_provi.txt")

    val arr= sortBy()
    val value: RDD[String] = sparkContext.parallelize(arr)

    val rdd2: RDD[(String, String)] = value.map((str) => {
      (str.split("\t")(1), str.split("\t")(0))
    })
    val rdd3 = rdd2.sortBy((f: (String, String)) => {
      f
    }, true)
    val rdd4: RDD[(String, String)] = rdd3.map(str => {
      (str._2, str._1)
    })
    rdd4.foreach(println)


    //方法1
    //value.map(a => (a.split("\t")(0), a.split("\t")(1))).sortByKey().sortBy(_._2).foreach(println)
    //方法2
    //value.map(a => (a.split("\t")(1),a.split("\t")(0))).sortBy(x=>x,true).map(x=>(x._2,x._1)).foreach(println)

    sparkContext.stop()
  }
           

12.3. 分组取topN

/**
* 数据类型:品牌 种类 售价
* 问题:
* 求出总售价额TOP3的品牌
* 求出总售价额TOP3的种类
*
* 按照总售价额求出每种品牌TOP3的种类
* 按照总售价额求出每种种类TOP3的品牌
*
* 求出平均售价最好TOP3的品牌
*/
def main(args: Array[String]): Unit = {
    val sport = Array[String]("鸿星尔克", "安踏", "特步", "回力", "Nick", "Adidas", "LI-NING")
    val kinds = Array[String]("运动鞋", "上衣", "裤子", "袜子")
    for (elem <- 1 to 1000) {
      println(sport(Random.nextInt(sport.length)) + "\t" + kinds(Random.nextInt(kinds.length)) + "\t" + (Random.nextInt(100) + 100))
    }
}
// 特步	上衣	155
// 鸿星尔克	上衣	191
// 特步	裤子	196
           
/**
* 求出总售价额TOP3的品牌
* 求出总售价额TOP3的种类
*/
def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("TotalBrandTypeTop" + System.currentTimeMillis())
    val sparkContext = new SparkContext(sparkConf)
    val value: RDD[String] = sparkContext.textFile("src/main/resources/topops.txt")

    //总售价额TOP3的品牌
    val brandRdd2: RDD[(String, Int)] = value.map(s => (s.split("\t")(0), s.split("\t")(2).toInt))
    val brandRdd3: RDD[(String, Int)] = brandRdd2.reduceByKey((x, y) => x + y)
    val brandRdd4: RDD[(String, Int)] = brandRdd3.sortBy((f: (String, Int)) => f._2,false)
    brandRdd4.take(3).foreach(println)
    //总售价额TOP3的品牌
    //value.map(s => (s.split("\t")(0), s.split("\t")(2).toInt)).reduceByKey(_+_).sortBy(_._2).take(3).foreach(println)

    //总售价额TOP3的种类
    val typeRdd2: RDD[(String, Int)] = value.map(s => (s.split("\t")(1), s.split("\t")(2).toInt))
    val typeRdd3: RDD[(String, Int)] = typeRdd2.reduceByKey((x, y) => x + y)
    val typeRdd4: RDD[(String, Int)] = typeRdd3.sortBy((f: (String, Int)) => f._2,false)
    typeRdd4.take(3).foreach(println)
    //总售价额TOP3的种类
    //value.map(s => (s.split("\t")(0), s.split("\t")(2).toInt)).reduceByKey(_+_).sortBy(_._2).take(3).foreach(println)

    sparkContext.stop()

  }
           
/**
* 按照总售价额求出每种品牌TOP3的种类
* 按照总售价额求出每种种类TOP3的品牌
*/
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("TotalBrandTypeTop" + System.currentTimeMillis())
    val sparkContext = new SparkContext(sparkConf)
    val value: RDD[String] = sparkContext.textFile("src/main/resources/topops.txt")

    //总售价额求出每种品牌TOP3的种类
//    val brandRdd2: RDD[((String, String), Int)] = value.map(s => ((s.split("\t")(0),s.split("\t")(1)), s.split("\t")(2).toInt))
//    val brandRdd3: RDD[((String, String), Int)] = brandRdd2.reduceByKey((x, y) => x + y)
//    val brandRdd4: RDD[(String, Iterable[((String, String), Int)])] = brandRdd3.groupBy(_._1._1)
//    val brandRdd5: RDD[(String, List[((String, String), Int)])] = brandRdd4.mapValues(iter => {
//      iter.toList.sortBy(_._2).reverse.take(3)
//    })
//    val brandRdd6: RDD[((String, String), Int)] = brandRdd5.flatMap(s => s._2)
//    brandRdd6.foreach(println)

    //总售价额求出每种品牌TOP3的种类--简写
    value.map(s => ((s.split("\t")(0),s.split("\t")(1)), s.split("\t")(2).toInt))
      .reduceByKey((x, y) => x + y)
      .groupBy(_._1._1)
      .mapValues(iter => {iter.toList.sortBy(_._2).reverse.take(3)})
      .flatMap(s => s._2)
      .foreach(println)

//
//    //总售价额求出每种种类TOP3的品牌
//    val typeRdd2: RDD[((String, String), Int)] = value.map(s => ((s.split("\t")(1), s.split("\t")(0)), s.split("\t")(2).toInt))
//    val typeRdd3: RDD[((String, String), Int)] = typeRdd2.reduceByKey((x, y) => x + y)
//    val typeRdd4: RDD[(String, Iterable[((String, String), Int)])] = typeRdd3.groupBy(_._1._1)
//    val typeRdd5: RDD[(String, List[((String, String), Int)])] = typeRdd4.mapValues(iter => {
//      iter.toList.sortBy(_._2).reverse.take(3)
//    })
//    val typeRdd6: RDD[((String, String), Int)] = typeRdd5.flatMap(s => s._2)
//    typeRdd6.foreach(println)

//    //总售价额求出每种种类TOP3的品牌 --简写
    value.map(s => ((s.split("\t")(1), s.split("\t")(0)), s.split("\t")(2).toInt))
      .reduceByKey((x, y) => x + y)
      .groupBy(_._1._1)
      .mapValues(iter => { iter.toList.sortBy(_._2).reverse.take(3) })
      .flatMap(s => s._2)
      .foreach(println)
  }
           
/**
* 求出平均售价最好TOP3的品牌
*/
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("TotalBrandTypeTop" + System.currentTimeMillis())
    val sparkContext = new SparkContext(sparkConf)
    //val value: RDD[String] = sparkContext.textFile("src/main/resources/topops.txt")
    val list: Array[String] = topN()
    val value: RDD[String] = sparkContext.parallelize(list)

    //平均售价最好TOP3的品牌
    val sumRdd2: RDD[(String, Int)] = value.map(s => (s.split("\t")(0), s.split("\t")(2).toInt)).reduceByKey(_+_)
    val numRdd3: RDD[(String, Int)] = value.map(s => (s.split("\t")(0), 1)).reduceByKey(_ + _)
    val avgRdd4: RDD[(String, (Int, Int))] = sumRdd2.join(numRdd3)
    val avgRdd5: RDD[(String, Int)] = avgRdd4.map(s => (s._1, (s._2._1 / s._2._2)))
    avgRdd5.take(3).foreach(println)

  }
           

继续阅读