天天看點

IDEA開發Spark應用程式(二)一、上次課回顧三、使用者通路量的TopN的Spark-Core實作四、求平均年齡Spark-Core的實作五、求男女人數以及最低、最高身高六、本次課總結

一、上次課回顧

二、求使用者通路量TopN的Hive實作及Spark-Core實作

三、使用者通路量的TopN的Spark-Core實作

四、求平均年齡Spark-Core的實作

五、求男女人數以及最低、最高身高

六、本次課總結

此處為本章學習視訊連接配接:
内容出處:若澤資料	http://www.ruozedata.com/	
有一起學習的也可以聯系下我QQ:2032677340
連結:https://pan.baidu.com/s/1ES_bnWAGCuS2yHOj0aZMsQ 
提取碼:atvb 
           

一、上次課回顧

上次課部落格總結:

https://blog.csdn.net/zhikanjiani/article/details/90740153

回顧:

注意:hadoop-client使用2.6.0-cdh5.7.0版本的話,需要添加cdh的倉庫.

打包注意事項、上傳伺服器使用rz指令、spark-submit送出spark應用程式、結果輸出在控制台、輸出到hdfs路徑下、輸入多個檔案、輸入檔案規則明配、排序(tuple)

二、求使用者通路量TopN的Hive實作及Spark-Core實作

需求:求使用者通路量top5

hive中建立一張表:

1、建表語句如下:
create table page_views(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

2、把準備好的本地資料加載進去:
hive> LOAD DATA LOCAL INPATH '/home/hadoop/data/page_views.dat' overwrite into table page_views;
Loading data to table ruoze_d6.page_views
Table ruoze_d6.page_views stats: [numFiles=1, numRows=0, totalSize=1710, rawDataSize=0]
OK
Time taken: 1.05 seconds
           

使用SQL實作:

select end_user_id,count(1) c from page_views group by end_user_id order by c desc limit 5;

使用spark-core來實作:

需求分析:

1)使用者:使用tab鍵分割("\t")==>split

拿到userid ==>splits(5) splits的下标index是從0開始的.

2)通路量

==>(根據userid分組求總次數):reduceByKey(+)

==>

(123626648,40)

(123747736,30)

(8277376,20)

(NULL,68762)

TOP N(按總次數排序求前5條)

反轉:map(x._2,x._1)

==>

(40,123626648)

(30,123747736)

(20,8277376)

(68762,NULL)

sortByKey

反轉

take(5)

工作中很多場景都能看到WordCount的影子

三、使用者通路量的TopN的Spark-Core實作

在idea完成的同時要在spark-shell中進行測試:

1、scala> val pageview = sc.textFile("file:///home/hadoop/data/page_views.dat")
pageview: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/page_views.dat MapPartitionsRDD[9] at textFile at <console>:24

2、scala> pageview.map(x => x.split("\t")(5)).take(2).foreach(println)
NULL
NULL

3、scala> pageview.map(x => (x.split("\t")(5),1)) .take(2).foreach(println)
(NULL,1)
(NULL,1)

4、求通路量

           

Idea下的代碼:

package com.ruozedata.spark

import org.apache.spark.{SparkConf, SparkContext}

object PageViewsApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val pageViews = sc.textFile(args(0))


    //擷取使用者id
    val user_id = pageViews.map(x => (x.split("\t")(5),1))

    //user_id.reduceByKey(_+_).take(10).foreach(println)

    user_id.reduceByKey(_+_).map(x =>(x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1)).take(5).foreach(print)
    sc.stop()
  }

}

           

在工作中很多場景中的統計,都可以看到wordcount的影子。

工作中拿到需求之後:

1):進行分析;

2):通過中文描述來進行功能拆解 <== 詳細設計說明書

3):代碼的開發:代碼的實作

四、求平均年齡Spark-Core的實作

資料格式:ID + “” + 年齡:

實作思路:sum(age) / count(ID)

1)、拿到年齡

2)、拿到人數

3)、年齡相加 / 人數

package com.ruozedata.spark

import org.apache.spark.{SparkConf, SparkContext}

object AvgAge {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)

    //取出年齡
    val dataFile = sc.textFile("file:///home/hadoop/data/sample_age_data")

//    val ageData = dataFile.map(x => x.split("")(1)).take(10).foreach(println)

    //求人數
    val countPeople = dataFile.count()

    // 總年齡
    val ageData = dataFile.map(x => x.split("")(1))
    val totalAge = ageData.map(age => age.toInt).reduce(_+_)
    
    val avgAge = totalAge / countPeople

  sc.stop()
}

           

IDEA上每寫一步都可以相應的在Spark-shell中進行測試

注意:公式中不能定義和算子同名的變量,否則運作的時候系統會識别不了

五、求男女人數以及最低、最高身高

資料:ID + “” + 性别 + “” + 身高

ID 性别 身高
1 F 185
2 M 190
3 F 175

分析:

1)、取出性别,一個分組groupBy

RDD ==> MRDD + FRDD (一個RDD拆分為男RDD和一個女RDD)

2)、max min 由于結構的特殊性,直接max、min就行

3)、max min

六、本次課總結

RDD一定要掌握,但是Spark SQL是更進階的API.