项目说明:附件为要计算数据的demo。点击打开链接
利用spark的缓存机制,读取需要筛选的数据,自定义一个分区器,将不同的学科数据分别放到一个分区器中,并且根据指定的学科,取出点击量前三的数据,并写入文件。
具体程序如下:
1、项目主程序:
- package cn.allengao.Location
- import java.net.URL
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
- /**
- * class_name:
- * package:
- * describe: 缓存机制,自定义一个分区器,根据指定的学科, 取出点击量前三的,按照每种学科数据放到不同的分区器里
- * creat_user: Allen Gao
- * creat_date: 2018/1/30
- * creat_time: 11:21
- **/
- object AdvUrlCount {
- def main(args: Array[String]) {
- //从数据库中加载规则
- // val arr = Array("java.learn.com", "php.learn.com", "net.learn.com")
- val conf = new SparkConf().setAppName( "AdvUrlCount").setMaster( "local[2]")
- val sc = new SparkContext(conf)
- //获取数据
- val file = sc.textFile( "j://information/learn.log")
- //提取出url并生成一个元祖,rdd1将数据切分,元组中放的是(URL, 1)
- val urlAndOne = file.map(line => {
- val fields = line.split( "\t")
- val url = fields( )
- (url, )
- })
- //把相同的url进行聚合
- val sumedUrl = urlAndOne.reduceByKey(_ + _)
- //获取学科信息缓存,提高运行效率
- val cachedProject = sumedUrl.map(x => {
- val url = x._1
- val project = new URL(url).getHost
- val count = x._2
- (project, (url, count))
- }).cache()
- //调用Spark自带的分区器此时会发生哈希碰撞,会有数据倾斜问题产生,需要自定义分区器
- // val res = cachedProject.partitionBy(new HashPartitioner(3))
- // res.saveAsTextFile("j://information//out")
- //得到所有学科
- val projects = cachedProject.keys.distinct().collect()
- //调用自定义分区器并得到分区号
- val partitioner = new ProjectPartitioner(projects)
- //分区
- val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)
- //对每个分区的数据进行排序并取top3
- val res = partitioned.mapPartitions(it => {
- it.toList.sortBy(_._2._2).reverse.take( ).iterator
- })
- res.saveAsTextFile( "j://information//out1")
- sc.stop()
- }
- }
2、自定义分区器:
- package cn.allengao. Location
- import org.apache.spark.Partitioner
- import scala.collection.mutable
- class ProjectPartitioner(projects: Array[String]) extends Partitioner {
- //用来存放学科和分区号
- private val projectsAndPartNum = new mutable. HashMap[ String, Int]()
- //计数器,用于指定分区号
- var n =
- for(pro<-projects){
- projectsAndPartNum += (pro -> n)
- n +=
- }
- //得到分区数
- override def numPartitions = projects.length
- //得到分区号
- override def getPartition(key: Any) = {
- projectsAndPartNum.getOrElse(key. toString, )
- }
- }
运行结果: