天天看點

Spark RDD程式設計基礎

一、資料讀寫

(1)從檔案系統加載資料建立RDD

  ①本地檔案:sc.textFile("file:///data/spark/buyer_favorite")

  ②HDFS檔案:sc.textFile("hdfs://localhost:9000/spark/buyer_favorite")

(2)通過并行集合建立RDD

  val array = Array(1,2,3,4,5)

  val rdd = sc.parallelize(array)

(3)将RDD寫入到文本檔案

   使用saveAsTextFile()函數,要求提供一個不存在的目錄名稱

  *使用repartition重新設定分區個數

二、RDD常見操作

(1)轉換操作

①  filter(func):篩選出滿足函數func的元素,并傳回一個新的資料集。

②  map(func):将每一個元素傳遞到函數func中,并将結果傳回為一個新的資料集。

③  flatMap(func):類似map(),但是每個輸入都可以映射0到多個輸出結果。可以了解為:

    第一步:執行map()

    第二步:把map操作得到的數組中每個元素“拍扁”(flat)

④  groupByKey():對具有相同鍵的值進行分組,傳回一個(key,Iterable)形式資料集。

⑤  reduceByKey(func):先執行groupByKey()操作得到(key,value-list),根據func對value-list進行操作。

(2)行動操作

①  count():傳回資料集中元素個數。

②  collect():以數組的形式傳回資料集中的所有元素。

③  first():傳回資料集第一個元素。

④  take(n):以數組的形式傳回資料集中的前n個元素。

⑤  reduce(func):通過函數func(輸入兩個參數,傳回一個值)聚合資料集中的元素。

⑥  foreach(func):将資料集中每個元素傳遞到func中運作。

三、鍵值對RDD

(1)建立鍵值對RDD

①val lines = sc.textTextFile(file:///usr/local/word.txt)

  var pairRDD=lines.flatMap(_.split("\t")).map(word=>(word,1))

②var pairRDD=rdd.map(word=>(word,1))

(2)常用鍵值對RDD操作

reduceByKey(func)

groupByKey()

keys:傳回一個新的RDD.

values

sortByKey()

sortBy()

mapValues(func)

join():内連接配接,對于給定的兩個資料集(k,v1)和(k,v2),當兩個資料集都存在key才會被輸出,最注重得到一個(k,(v1,v2))類型的資料集。

四、幾個簡單例子

(1)

WordCount統計:某電商網站記錄了大量的使用者對商品的收藏資料,并将資料存儲在名為buyer_favorite的文本檔案中。文本資料格式如下:

使用者id(buyer_id),商品id(goods_id),收藏日期(dt);現要求統計使用者收藏資料中,每個使用者收藏商品數量。

rdd.map(line=> (line.split('\t')(0),1)).reduceByKey(_+_).collect

(2)

去重:使用spark-shell,對上述實驗中,使用者收藏資料檔案進行統計。根據商品ID進行去重,統計使用者收藏資料中都有哪些商品被收藏。

rdd.map(line => line.split('\t')(1)).distinct.collect      相當于

rdd.map(line=>(line.split('\t')),1).reduceByKey(_+_).map(_._1).collect

(3)排序:電商網站都會對商品的通路情況進行統計,現有一個goods_visit檔案,存儲了電商網站中的各種商品以及此各個商品的點選次數。

商品id(goods_id) 點選次數(click_num)現根據商品的點選次數進行排序,并輸出所有商品。

rdd1.map(line => ( line.split('\t')(1).toInt, line.split('\t')(0) ) ).sortByKey(true).map(_._2).collect

(4)Join:現有某電商在2011年12月15日的部分交易資料。資料有訂單表orders和訂單明細表order_items,表結構及資料分别為:

orders表:(訂單id order_id, 訂單号 order_number, 買家ID buyer_id, 下單日期 create_dt)

order_items表:(明細ID item_id, 訂單ID order_id, 商品ID goods_id )

orders表和order_items表,通過訂單id進行關聯,是一對多的關系。

下面開啟spark-shell,查詢在當天該電商網站,都有哪些使用者購買了什麼商品

rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) ) 

    join

rdd2.map(line=> (line.split('\t')(1), line.split('\t')(2)) )

     .collect

(5)求平均值:電商網站都會對商品的通路情況進行統計。現有一個goods_visit檔案,存儲了全部商品及各商品的點選次數。還有一個檔案goods,記錄了商品的基本資訊。兩張表的資料結構如下:

goods表:商品ID(goods_id),商品狀态(goods_status),商品分類id(cat_id),評分(goods_score)

goods_visit表:商品ID(goods_id),商品點選次數(click_num)

商品表(goods)及商品通路情況表(goods_visit)可以根據商品id進行關聯。現在統計每個分類下,商品的平均點選次數是多少?

val rdd11 = rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )  

val rdd22 = rdd2.map(line=> (line.split('\t')(0), line.split('\t')(1)) )  

val rddjoin = rdd11 join rdd22  

rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>  

{(x._1, x._2._1*1.0/x._2._2)}).collect

繼續閱讀