天天看点

RDD编程初级进阶

前言

这里基于前面做过的RDD初级程序,以及后来搭建的本机的运行spark的环境,进行下面代码的编写

想了解spark-shell的可以看这篇文章,链接:https://blog.csdn.net/s863222424/article/details/102868038

想了解使用IDEA进行RDD编程+scala打包运行可以看这篇文章,链接:https://blog.csdn.net/s863222424/article/details/102979162

任务一

任务描述:在推荐领域有一个著名的开放测试集,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、users.dat、movies.dat,具体介绍可阅读:README.txt。请编程实现:通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m

1.程序分析

先给出几个文件的数据格式

  • ratings.dat

UserID::MovieID::Rating::Timestamp

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
           
  • movies.dat

MovieID::Title::Genres

6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller
           
  • 从格式我们可以看到,要得到平均分超过4.0的电影,我们需要做的,第一步根据ratings.dat得到所有电影的平均分,这个的具体实现方法在我的上一篇文章内已经做了比较详细的叙述,具体可以看下面这篇文章RDD初级编程
  • 在得到所有电影的得分的平均值之后,我们需要通过电影的id来和movies.dat内的数据连接,从而得到电影的名字和相对应的得分,最后筛选得分大于4.0的即可

    2.具体实现

    下面直接给出完整代码

import org.apache.spark.{SparkConf, SparkContext}

object Join {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Join").setMaster("local")
    val sc = new SparkContext(conf)
    val movies = sc.textFile("hdfs://namenode:9000/spark/Join/movies.dat")
    val ratings = sc.textFile("hdfs://namenode:9000/spark/Join/ratings.dat")
    //将读入的movies数据转换成(movie_id,movie_name)的数据格式
    val rdd1 = movies.map(t => (t.split("::")(0), t.split("::")(1)))
    //将读入的ratings数据转化为(movie_id,(grade,1))
    val rdd2 = ratings.map(t => (t.split("::")(1), (t.split("::")(2).toInt, 1)))
      //将所有movie_id相同的电影的得分以及被评分次数求和
      .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
      //求各电影得分的平均值
      .mapValues(a => a._1 / a._2)
    //将movies的数据和得到的数据进行连接
    //rdd3的数据格式为(movie_id,(movie_name,grade)
    val rdd3 = rdd1.join(rdd2)
    //筛选得分大于4的电影
    rdd3.filter(t => t._2._2 > 4)
      //取得分大于4的电影的名字
      .map(t=>t._2._1)
      //输出电影的名字
      .collect().foreach(println)
  }
}
           

运行结果部分如下

19/11/09 08:45:41 INFO DAGScheduler: ResultStage 2 (collect at Join.scala:25) finished in 0.108 s
19/11/09 08:45:41 INFO DAGScheduler: Job 0 finished: collect at Join.scala:25, took 68.634569 s
Gate of Heavenly Peace, The (1995)
Song of Freedom (1936)
One Little Indian (1973)
Schlafes Bruder (Brother of Sleep) (1995)
Lured (1947)
Bittersweet Motel (2000)
Follow the Bitch (1998)
Baby, The (1973)
Smashing Time (1967)
Ulysses (Ulisse) (1954)
19/11/09 08:45:41 INFO SparkContext: Invoking stop() from shutdown hook
           

3.打包上传运行

spark-submit --class Join --master local Join.jar
           
RDD编程初级进阶

任务二

任务描述:对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C,输出结果按二次排序输出。

输入文件A的样例如下:

20170101 x

20170102 y

20170103 x

20170104 y

20170105 z

20170106 z

输入文件B的样例如下:

20170101 y

20170102 y

20170103 x

20170104 z

20170105 y

根据输入的文件A和B合并得到的输出文件C的样例如下:

20170101 x

20170101 y

20170102 y

20170103 x

20170104 y

20170104 z

20170105 y

20170105 z

20170106 z

1.程序分析

思路其实也特别简单,先去重,去重之后进行二次排序即可

2.具体实现

首先尝试了编写SecondarySortKey类来实现二次排序,发现并没有什么效果,如果有朋友知道的可以留言告诉博主,主要这个地方的第二个元素是要按照字母的大小来排序会比较麻烦

下面是重写的SecondarySortKey类

class SecondarySortKey(val first:Int,val second:String) extends Ordered [SecondarySortKey] with Serializable {
  def compare(other:SecondarySortKey):Int = {
    if (this.first - other.first !=0) {
      this.first - other.first
    } else {
      //实现字母相减
      this.second.compareTo(other.second)
    }
  }
}
           

主程序

import org.apache.spark.{SparkConf, SparkContext}

object Duplicate_removal {
	def main(args:Array[String]){
	    val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
	    val sc = new SparkContext(conf)
	    val lines = sc.textFile("hdfs://namenode:9000/spark/Duplicate_removal")
	    val pairWithSortKey = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1)),line))
	    val sorted = pairWithSortKey.sortByKey()
	    val sortedResult = sorted.map(sortedLine =>sortedLine._2)
	    sortedResult.collect().foreach (println)
	  }
}
           

运行结果

RDD编程初级进阶

可以看到并没有实现二次排序,具体的问题在哪我也没有找到

但是可以说明的是如果输入文件是数字-数字这种格式的话,将SecondarySortKey类稍微改一下,是可以使用的

class SecondarySortKey(val first:Int,val second:Int) extends Ordered [SecondarySortKey] with Serializable {
  def compare(other:SecondarySortKey):Int = {
    if (this.first - other.first !=0) {
      this.first - other.first
    } else {
      this.second - other.second
    }
  }
}
           

之后使用了如下的方法

import org.apache.spark.{SparkConf, SparkContext}

object Duplicate_removal {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Join").setMaster("local")
    val sc = new SparkContext(conf)
    val input = sc.textFile("hdfs://namenode:9000/spark/Duplicate_removal", 2)
    //去重,这里如果数据分布在不同的机器,最好使用groupByKey()来去重,因为distinct()只在本机上去重
    input.distinct()
      //直接使用sortBy将整个字符串看出一个整体
      .sortBy(x=>x)
      //输出
      .collect().foreach (println)
    //将结果存到文件内
    input.distinct().sortBy(x=>x).map(t=>t).saveAsTextFile("hdfs://namenode:9000/spark/Duplicate_removal/out")
  }
}
           

运行结果

RDD编程初级进阶

3.打包上传运行

spark-submit --class Duplicate_removal --master local Duplicate_removal.jar
           
RDD编程初级进阶

任务三

任务描述:每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:

小明 92

小红 87

小新 82

小丽 90

Database成绩:

小明 95

小红 81

小新 89

小丽 85

Python成绩:

小明 82

小红 83

小新 94

小丽 91

平均成绩如下:

(小红,83.67)

(小新,88.33)

(小明,89.67)

(小丽,88.67)

1.程序分析

这个题有了第一个题的基础看上去是特别简单的,文件读入后根据姓名合并求平均值即可

2.具体实现

import org.apache.spark.{SparkConf, SparkContext}

object Average {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Join").setMaster("local")
    val sc = new SparkContext(conf)
    val course = sc.textFile("hdfs://namenode:9000/spark/average")
    val rdd1 = course.map(t => (t.split(" ")(0), (t.split(" ")(1).toDouble, 1)))
      .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
      .mapValues(a => a._1 / a._2)
    //将结果保留两位小数
    val rdd2=rdd1.mapValues(t=>f"$t%1.2f".toDouble)
    rdd2.collect().foreach(println)
    rdd2.saveAsTextFile("hdfs://namenode:9000/spark/average/out")
  }
}
           

运行结果

RDD编程初级进阶

3.打包上传运行

spark-submit --class Average --master local Average.jar
           
RDD编程初级进阶

几点不足

  1. 对于输出到文件的部分,如果仅使用上面的方法,在每次运行完一遍,需要将生成的out文件夹删掉,不然会报错
  2. 对于生成的文件,输出到hdfs上面的文件不是一个完整的文件,而是根据具体有多少个分区来决定生成的文件个数,这里没有将他们合并
  3. 对于二次排序部分还存在很多疑惑没有解决

项目源文件

链接:https://pan.baidu.com/s/1cfiPFjcfB3C3KrvaUgAv3w

提取码:tbli

继续阅读