一、上次課回顧
二、求使用者通路量TopN的Hive實作及Spark-Core實作
三、使用者通路量的TopN的Spark-Core實作
四、求平均年齡Spark-Core的實作
五、求男女人數以及最低、最高身高
六、本次課總結
此處為本章學習視訊連接配接:
内容出處:若澤資料 http://www.ruozedata.com/
有一起學習的也可以聯系下我QQ:2032677340
連結:https://pan.baidu.com/s/1ES_bnWAGCuS2yHOj0aZMsQ
提取碼:atvb
一、上次課回顧
上次課部落格總結:
https://blog.csdn.net/zhikanjiani/article/details/90740153
回顧:
注意:hadoop-client使用2.6.0-cdh5.7.0版本的話,需要添加cdh的倉庫.
打包注意事項、上傳伺服器使用rz指令、spark-submit送出spark應用程式、結果輸出在控制台、輸出到hdfs路徑下、輸入多個檔案、輸入檔案規則明配、排序(tuple)
二、求使用者通路量TopN的Hive實作及Spark-Core實作
需求:求使用者通路量top5
hive中建立一張表:
1、建表語句如下:
create table page_views(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
2、把準備好的本地資料加載進去:
hive> LOAD DATA LOCAL INPATH '/home/hadoop/data/page_views.dat' overwrite into table page_views;
Loading data to table ruoze_d6.page_views
Table ruoze_d6.page_views stats: [numFiles=1, numRows=0, totalSize=1710, rawDataSize=0]
OK
Time taken: 1.05 seconds
使用SQL實作:
select end_user_id,count(1) c from page_views group by end_user_id order by c desc limit 5;
使用spark-core來實作:
需求分析:
1)使用者:使用tab鍵分割("\t")==>split
拿到userid ==>splits(5) splits的下标index是從0開始的.
2)通路量
==>(根據userid分組求總次數):reduceByKey(+)
==>
(123626648,40)
(123747736,30)
(8277376,20)
(NULL,68762)
…
TOP N(按總次數排序求前5條)
反轉:map(x._2,x._1)
==>
(40,123626648)
(30,123747736)
(20,8277376)
(68762,NULL)
…
sortByKey
反轉
take(5)
工作中很多場景都能看到WordCount的影子
三、使用者通路量的TopN的Spark-Core實作
在idea完成的同時要在spark-shell中進行測試:
1、scala> val pageview = sc.textFile("file:///home/hadoop/data/page_views.dat")
pageview: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/page_views.dat MapPartitionsRDD[9] at textFile at <console>:24
2、scala> pageview.map(x => x.split("\t")(5)).take(2).foreach(println)
NULL
NULL
3、scala> pageview.map(x => (x.split("\t")(5),1)) .take(2).foreach(println)
(NULL,1)
(NULL,1)
4、求通路量
Idea下的代碼:
package com.ruozedata.spark
import org.apache.spark.{SparkConf, SparkContext}
object PageViewsApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val pageViews = sc.textFile(args(0))
//擷取使用者id
val user_id = pageViews.map(x => (x.split("\t")(5),1))
//user_id.reduceByKey(_+_).take(10).foreach(println)
user_id.reduceByKey(_+_).map(x =>(x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1)).take(5).foreach(print)
sc.stop()
}
}
在工作中很多場景中的統計,都可以看到wordcount的影子。
工作中拿到需求之後:
1):進行分析;
2):通過中文描述來進行功能拆解 <== 詳細設計說明書
3):代碼的開發:代碼的實作
四、求平均年齡Spark-Core的實作
資料格式:ID + “” + 年齡:
實作思路:sum(age) / count(ID)
1)、拿到年齡
2)、拿到人數
3)、年齡相加 / 人數
package com.ruozedata.spark
import org.apache.spark.{SparkConf, SparkContext}
object AvgAge {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
//取出年齡
val dataFile = sc.textFile("file:///home/hadoop/data/sample_age_data")
// val ageData = dataFile.map(x => x.split("")(1)).take(10).foreach(println)
//求人數
val countPeople = dataFile.count()
// 總年齡
val ageData = dataFile.map(x => x.split("")(1))
val totalAge = ageData.map(age => age.toInt).reduce(_+_)
val avgAge = totalAge / countPeople
sc.stop()
}
IDEA上每寫一步都可以相應的在Spark-shell中進行測試
注意:公式中不能定義和算子同名的變量,否則運作的時候系統會識别不了
五、求男女人數以及最低、最高身高
資料:ID + “” + 性别 + “” + 身高
ID | 性别 | 身高 |
---|---|---|
1 | F | 185 |
2 | M | 190 |
3 | F | 175 |
分析:
1)、取出性别,一個分組groupBy
RDD ==> MRDD + FRDD (一個RDD拆分為男RDD和一個女RDD)
2)、max min 由于結構的特殊性,直接max、min就行
3)、max min
六、本次課總結
RDD一定要掌握,但是Spark SQL是更進階的API.