一、資料讀寫
(1)從檔案系統加載資料建立RDD
①本地檔案:sc.textFile("file:///data/spark/buyer_favorite")
②HDFS檔案:sc.textFile("hdfs://localhost:9000/spark/buyer_favorite")
(2)通過并行集合建立RDD
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)
(3)将RDD寫入到文本檔案
使用saveAsTextFile()函數,要求提供一個不存在的目錄名稱
*使用repartition重新設定分區個數
二、RDD常見操作
(1)轉換操作
① filter(func):篩選出滿足函數func的元素,并傳回一個新的資料集。
② map(func):将每一個元素傳遞到函數func中,并将結果傳回為一個新的資料集。
③ flatMap(func):類似map(),但是每個輸入都可以映射0到多個輸出結果。可以了解為:
第一步:執行map()
第二步:把map操作得到的數組中每個元素“拍扁”(flat)
④ groupByKey():對具有相同鍵的值進行分組,傳回一個(key,Iterable)形式資料集。
⑤ reduceByKey(func):先執行groupByKey()操作得到(key,value-list),根據func對value-list進行操作。
(2)行動操作
① count():傳回資料集中元素個數。
② collect():以數組的形式傳回資料集中的所有元素。
③ first():傳回資料集第一個元素。
④ take(n):以數組的形式傳回資料集中的前n個元素。
⑤ reduce(func):通過函數func(輸入兩個參數,傳回一個值)聚合資料集中的元素。
⑥ foreach(func):将資料集中每個元素傳遞到func中運作。
三、鍵值對RDD
(1)建立鍵值對RDD
①val lines = sc.textTextFile(file:///usr/local/word.txt)
var pairRDD=lines.flatMap(_.split("\t")).map(word=>(word,1))
②var pairRDD=rdd.map(word=>(word,1))
(2)常用鍵值對RDD操作
reduceByKey(func)
groupByKey()
keys:傳回一個新的RDD.
values
sortByKey()
sortBy()
mapValues(func)
join():内連接配接,對于給定的兩個資料集(k,v1)和(k,v2),當兩個資料集都存在key才會被輸出,最注重得到一個(k,(v1,v2))類型的資料集。
四、幾個簡單例子
(1)
WordCount統計:某電商網站記錄了大量的使用者對商品的收藏資料,并将資料存儲在名為buyer_favorite的文本檔案中。文本資料格式如下:
使用者id(buyer_id),商品id(goods_id),收藏日期(dt);現要求統計使用者收藏資料中,每個使用者收藏商品數量。
rdd.map(line=> (line.split('\t')(0),1)).reduceByKey(_+_).collect
(2)
去重:使用spark-shell,對上述實驗中,使用者收藏資料檔案進行統計。根據商品ID進行去重,統計使用者收藏資料中都有哪些商品被收藏。
rdd.map(line => line.split('\t')(1)).distinct.collect 相當于
rdd.map(line=>(line.split('\t')),1).reduceByKey(_+_).map(_._1).collect
(3)排序:電商網站都會對商品的通路情況進行統計,現有一個goods_visit檔案,存儲了電商網站中的各種商品以及此各個商品的點選次數。
商品id(goods_id) 點選次數(click_num)現根據商品的點選次數進行排序,并輸出所有商品。
rdd1.map(line => ( line.split('\t')(1).toInt, line.split('\t')(0) ) ).sortByKey(true).map(_._2).collect
(4)Join:現有某電商在2011年12月15日的部分交易資料。資料有訂單表orders和訂單明細表order_items,表結構及資料分别為:
orders表:(訂單id order_id, 訂單号 order_number, 買家ID buyer_id, 下單日期 create_dt)
order_items表:(明細ID item_id, 訂單ID order_id, 商品ID goods_id )
orders表和order_items表,通過訂單id進行關聯,是一對多的關系。
下面開啟spark-shell,查詢在當天該電商網站,都有哪些使用者購買了什麼商品
rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )
join
rdd2.map(line=> (line.split('\t')(1), line.split('\t')(2)) )
.collect
(5)求平均值:電商網站都會對商品的通路情況進行統計。現有一個goods_visit檔案,存儲了全部商品及各商品的點選次數。還有一個檔案goods,記錄了商品的基本資訊。兩張表的資料結構如下:
goods表:商品ID(goods_id),商品狀态(goods_status),商品分類id(cat_id),評分(goods_score)
goods_visit表:商品ID(goods_id),商品點選次數(click_num)
商品表(goods)及商品通路情況表(goods_visit)可以根據商品id進行關聯。現在統計每個分類下,商品的平均點選次數是多少?
val rdd11 = rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )
val rdd22 = rdd2.map(line=> (line.split('\t')(0), line.split('\t')(1)) )
val rddjoin = rdd11 join rdd22
rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>
{(x._1, x._2._1*1.0/x._2._2)}).collect