天天看点

IDEA开发Spark应用程序(二)一、上次课回顾三、用户访问量的TopN的Spark-Core实现四、求平均年龄Spark-Core的实现五、求男女人数以及最低、最高身高六、本次课总结

一、上次课回顾

二、求用户访问量TopN的Hive实现及Spark-Core实现

三、用户访问量的TopN的Spark-Core实现

四、求平均年龄Spark-Core的实现

五、求男女人数以及最低、最高身高

六、本次课总结

此处为本章学习视频连接:
内容出处:若泽数据	http://www.ruozedata.com/	
有一起学习的也可以联系下我QQ:2032677340
链接:https://pan.baidu.com/s/1ES_bnWAGCuS2yHOj0aZMsQ 
提取码:atvb 
           

一、上次课回顾

上次课博客总结:

https://blog.csdn.net/zhikanjiani/article/details/90740153

回顾:

注意:hadoop-client使用2.6.0-cdh5.7.0版本的话,需要添加cdh的仓库.

打包注意事项、上传服务器使用rz命令、spark-submit提交spark应用程序、结果输出在控制台、输出到hdfs路径下、输入多个文件、输入文件规则明配、排序(tuple)

二、求用户访问量TopN的Hive实现及Spark-Core实现

需求:求用户访问量top5

hive中创建一张表:

1、建表语句如下:
create table page_views(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

2、把准备好的本地数据加载进去:
hive> LOAD DATA LOCAL INPATH '/home/hadoop/data/page_views.dat' overwrite into table page_views;
Loading data to table ruoze_d6.page_views
Table ruoze_d6.page_views stats: [numFiles=1, numRows=0, totalSize=1710, rawDataSize=0]
OK
Time taken: 1.05 seconds
           

使用SQL实现:

select end_user_id,count(1) c from page_views group by end_user_id order by c desc limit 5;

使用spark-core来实现:

需求分析:

1)用户:使用tab键分割("\t")==>split

拿到userid ==>splits(5) splits的下标index是从0开始的.

2)访问量

==>(根据userid分组求总次数):reduceByKey(+)

==>

(123626648,40)

(123747736,30)

(8277376,20)

(NULL,68762)

TOP N(按总次数排序求前5条)

反转:map(x._2,x._1)

==>

(40,123626648)

(30,123747736)

(20,8277376)

(68762,NULL)

sortByKey

反转

take(5)

工作中很多场景都能看到WordCount的影子

三、用户访问量的TopN的Spark-Core实现

在idea完成的同时要在spark-shell中进行测试:

1、scala> val pageview = sc.textFile("file:///home/hadoop/data/page_views.dat")
pageview: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/page_views.dat MapPartitionsRDD[9] at textFile at <console>:24

2、scala> pageview.map(x => x.split("\t")(5)).take(2).foreach(println)
NULL
NULL

3、scala> pageview.map(x => (x.split("\t")(5),1)) .take(2).foreach(println)
(NULL,1)
(NULL,1)

4、求访问量

           

Idea下的代码:

package com.ruozedata.spark

import org.apache.spark.{SparkConf, SparkContext}

object PageViewsApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val pageViews = sc.textFile(args(0))


    //获取用户id
    val user_id = pageViews.map(x => (x.split("\t")(5),1))

    //user_id.reduceByKey(_+_).take(10).foreach(println)

    user_id.reduceByKey(_+_).map(x =>(x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1)).take(5).foreach(print)
    sc.stop()
  }

}

           

在工作中很多场景中的统计,都可以看到wordcount的影子。

工作中拿到需求之后:

1):进行分析;

2):通过中文描述来进行功能拆解 <== 详细设计说明书

3):代码的开发:代码的实现

四、求平均年龄Spark-Core的实现

数据格式:ID + “” + 年龄:

实现思路:sum(age) / count(ID)

1)、拿到年龄

2)、拿到人数

3)、年龄相加 / 人数

package com.ruozedata.spark

import org.apache.spark.{SparkConf, SparkContext}

object AvgAge {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)

    //取出年龄
    val dataFile = sc.textFile("file:///home/hadoop/data/sample_age_data")

//    val ageData = dataFile.map(x => x.split("")(1)).take(10).foreach(println)

    //求人数
    val countPeople = dataFile.count()

    // 总年龄
    val ageData = dataFile.map(x => x.split("")(1))
    val totalAge = ageData.map(age => age.toInt).reduce(_+_)
    
    val avgAge = totalAge / countPeople

  sc.stop()
}

           

IDEA上每写一步都可以相应的在Spark-shell中进行测试

注意:公式中不能定义和算子同名的变量,否则运行的时候系统会识别不了

五、求男女人数以及最低、最高身高

数据:ID + “” + 性别 + “” + 身高

ID 性别 身高
1 F 185
2 M 190
3 F 175

分析:

1)、取出性别,一个分组groupBy

RDD ==> MRDD + FRDD (一个RDD拆分为男RDD和一个女RDD)

2)、max min 由于结构的特殊性,直接max、min就行

3)、max min

六、本次课总结

RDD一定要掌握,但是Spark SQL是更高级的API.