天天看點

pyspark 之 rdd操作

1、rdd 簡介

什麼是rdd?從官網和一些技術部落格上我們都可以看到這樣的介紹

RDD叫做彈性分布式資料集(resilient distributed dataset) ,是Spark中最基本的資料抽象,它是跨叢集節點分區的元素集合,可以并行操作。

官網舉例如下:

## 建構包含應用資訊的SparkConf對象
conf = SparkConf().setAppName(appName).setMaster(master)
## 建立SparkContext對象,Spark通過它通路叢集
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
           

如上所示,集合data的元素被複制并形成可以并行化操作的分布式資料集,即建立了一個包含數字1到5的并行化集合。

是以可以簡單了解為RDD為對輸入集合的一種封裝,如java使用對象進行封裝,Spark将輸入資料封裝為分布式集合(RDD),友善資料進行分布式計算。 如下圖所示:

pyspark 之 rdd操作

RDD将任務分解為不同的子任務,使用多台計算機分布式執行,最後将計算結果整合。

為什麼需要RDD呢?之前wordcount(大資料計算)使用原生集合隻支援單機版,如果要做分布式計算,需要做很多額外的工作,如線程\程序通信,容錯、自動均衡等。是以需要有一個分布式的資料抽象,也就是用該抽象,可以表示分布式的集合,基于這個分布式集合進行操作,就可以很友善的完成分布式的wordcount。

pyspark 之 rdd操作

2、pyspark rdd操作

1、建立SparkContext對象

Spark 程式必須做的第一件事是建立一個 SparkContext 對象,它告訴 Spark 如何通路叢集。 要建立 SparkContext,首先需要建構一個 SparkConf 對象,其中包含有關您的應用程式的資訊。每個 JVM 應該隻有一個 SparkContext 處于活動狀态。在建立新的 SparkContext 之前,必須停止活動的 SparkContext。

pyspark 之 rdd操作

如圖所示,SparkContext處于DriverProgram核心位置,所有與Cluster、Worker Node互動的操作都需要SparkContext來完成

## 建構包含應用資訊的SparkConf對象
conf = SparkConf().setAppName(appName).setMaster(master)
## 建立SparkContext對象,Spark通過它通路叢集
sc = SparkContext(conf=conf)
           

2、讀取檔案

## 1、對于list 直接建立
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

## 2、讀取檔案
distFile = sc.textFile("data.txt")
           

3、rdd操作

作用于RDD上的Operation分為transformantion和action。 經Transformation處理之後,資料集中的内容會發生更改轉換成為其它類型的RDD(由資料集A轉換成為資料集B);而經Action處理之後,資料集中的内容會被歸約為一個具體的數值。

隻有當RDD上有action時,該RDD及其父RDD上的所有operation才會被送出到cluster中真正的被執行。

rdd transformations操作

rdd actions操作

當action作用于轉換之後RDD時,會調用SparkContext的runJob方法。

繼續閱讀