天天看點

Spark 算子

==> RDD是什麼?

    ---> RDD(Resilient Distributed Dataset) 彈性分布式資料集 , 是 Spark 中最基本的資料抽象,它代表一個不可變,可分區,裡面的元素可并行計算的集合

    ---> 特點:

        ---- 自動容錯

        ---- 位置感覺性高度

        ---- 可伸縮性

        ---- 允許使用者在執行多個查詢時顯示的将工作集緩存在記憶體中,後續的查詢能夠重用工作集,極大的提升了查詢速度

    ---> RDD 的屬性

        ---- A list of partitions

一個組分片,即資料集的基本組成機關

對于 RDD 來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度,使用者可以在建立 RDD 時指定 RDD的分片個數,如果沒有指定,那麼就會采用預設值,預設值就是程式所配置設定 到的 CPU  Core 的數目 

        ---- A function for computing each split

一個計算每個分區的函數

Spark 中 RDD 的計算是以分片為機關的,每個 RDD 都會實作 compute 函數以達到這個目的, compute 函數會對疊代器進行複合,不需要儲存每次計算的結果

        ---- A list of dependencies on other RDDs

RDD 之間的依賴關系

RDD 每次轉換都會生成一個新的RDD, 是以 RDD 之間就會形成類似于流水線一樣的前後依賴關系。在部分資料丢失時, Spark 可以通過這個依賴關系重新計算丢失 的分區資料,而不是對 RDD的所有分區進行重新計算

        ---- Optionally, a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)

一個 Partitioner, 即 RDD的分片函數

Spark 中實作 了兩種類型的分片函數, 一個是基于哈希的 HashPartitioner, 另外一個是基于 RangePartitioner, 隻有對于 key-value 的 RDD, 才會有 Partitioner, 非 key-value的 RDD的 Partitioner的值是None, Partitioner函數不但決定 了 RDD 本身的分片數量,也決定了 parents RDD Shuffle 輸出時的分片數量

        ---- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file )

一個清單,存儲存取每個 Partion 的優先位置(preferred location)

對于一個 HDFS 檔案來說,這個清單 儲存的就是每個Partition 所在的塊的位置

按照“移動資料不如移動計算”的理念, Spark 在進行任務排程的時候,會盡可能的将計算任務配置設定到其所要處理資料塊的存儲位置

==> RDD 的建立方式

    ---> 通過外部的資料檔案建立 (HDFS)

<code>val</code> <code>rdd</code><code>1</code> <code>=</code> <code>sc.textFile(</code><code>"hdfs://192.168.10.210:9000/data/data.txt"</code><code>)</code>

    ---&gt; 通過 sc.parallelize 進行建立

<code>val</code> <code>rdd</code><code>2</code> <code>=</code> <code>sc.parallelize(Array(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>))</code>

==&gt; RDD的基本原理

    ---&gt; 建立一個 RDD: 

<code>//                          3代表分三個分區</code>

<code>val</code> <code>rdd</code><code>1</code> <code>=</code> <code>sc.parallelize(Array(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>,</code><code>7</code><code>,</code><code>8</code><code>), </code><code>3</code><code>)</code>

    ---&gt; 一個分區運作在一個Worker 節點上, 一個 Worker 上可以運作多個分區

==&gt; RDD  的類型

    ---&gt; Trasformation

RDD 中的所有轉換都是延遲加載的,即,不會傳回計算結果,隻記住這些應用到基礎資料集(如,一個檔案,一個清單等)上的轉換動作,隻有當發生一個要求傳回結果給 Driver 時,這些轉換才會執行(個人了解,與 Scala 中的 lazy (懶值)比較相似)

轉換

含義

map(func)

傳回一個新的 RDD,該 RDD 由每一個輸入元素經過 func 函數轉換後組成

filter(func)

傳回一個新的RDD,該 RDD 由經過 func 函數計算後傳回值為 true 的輸入元素組成

flatMap(func)

類似于 map ,但是每個輸入元素可以被映射為 0 或多個輸出元素(傳回一個序列)

mapPartitions(func)

類似于 map, 但是獨立的在RDD 的每一個分片上運作,是以在類型為T  的 RDD 上運作時,func 的函數類型必須 是Iterator[T] =&gt; Iterator[U]

mapPartitionsWithIndex(func)

類似于 mapPartitions,但 func 帶有一個整數參數表示分片的索引值,是以在類型為T 的 RDD 上運作時, func 的函數類型必須是(Int, Interator[T])= &gt; Iterator[U]

sample(withReplacement, fraction, seed)

根據 fraction 指定的比例對資料進行采樣, 可以選擇是否使用随機數進行替換, seed 用于指定随機數生成器種子

union(otherDataset)

對源RDD 和 參數 RDD求并集後傳回一個新的RDD

intersection(otherDataset)

對源RDD 和參數 RDD 求交集後傳回一個新的RDD

distinct([numTasks])

對源RDD 去重後傳回一個新的RDD

groupByKey([numTasks])

在 (k, v) 的RDD 上調用,傳回一個(K, iterator[V]) 的 RDD

reduceByKey(func, [numTasks])

在(k, v) 的RDD上調用,傳回一個(k, v) 的 RDD, 使用指定的reduce函數,将相同key 的值聚合到一起,與 groupByKey 類似,reduce 任務的個數可以通過第二個可選的參數來設定

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

sortByKey([ascending], [numTasks])

在一個(k, v)上調用,k 必須實作 Ordered 接口,傳回一個按照 key 進行排序的(k, v) 的RDD

sortBy(func, [ascending], [numTasks])

與 sortByKey 類似,但是更靈活

join(otherDataset, [numTasks])

在類型為(k, v)和(k, w) 的RDD 上調用,傳回一個相同key 對應的所有元素堆在一起的(k, (v, w)) 的 RDD

cogroup(otherDataset, [numTasks])

在類型為(k, v)和(k, w)的 RDD 上調用 ,傳回一個(k, (Iterable&lt;v&gt;, Iterable&lt;w&gt;)) 類型的 RDD

cartesian(otherDataset)

笛卡爾積

pipe(command, [envVars])

coalesce(numPartitions)

repartitionAndSortWithinPartitions(partitions)

    ---&gt; Action

reduce(fun)

通過 func 函數聚集RDD中的所有元素,這個功能必須是可交換且可并聯的

collect()

在驅動程式中,以數組的形式傳回資料集的所有元素

count()

傳回元素個數

first()

反回 RDD 的第一個元素(類似于 take(1))

take(n)

傳回一個由資料集的前 n 個元素組成的數組

takeSample(withReplacement, num, [seed])

傳回一個數組,該 數組由從資料集中随機采樣的num 個元素組成,可以選擇是否用随機數替換不足的部分, seed用于指定随機數生成器種子

takeOrdered(n, [ordering])

saveAsTextFile(path)

将資料集的元素以 textfile 的形式儲存到HDFS檔案系統或者其它支援的檔案系統,對每個元素,Spark 将會調用 toString 方法将它轉換為檔案中的文本

saveAsSequenceFile(path)

将資料集中的元素以 Hadoop sequencefile 的格式 儲存到指定的目錄下,可以使HDFS 或者其它 Hadoop 支援的檔案系統

saveAsObjectFile(path)

countByKey()

針對(k, v ) 類型的RDD, 傳回一個(k, Int) 的 map, 表示每一個key 對應的元素個數

foreach(func)

在資料集的每一個元素上運作函數 func 進行更新

==&gt; RDD  的緩存機制

    ---&gt; 作用:緩存有可能丢失,或由于存儲于記憶體中的資料由于記憶體不足而被删除,緩存容錯機制保證了即使緩存丢失也能保證計算的正确執行

    ---&gt; 實作原理:通過基于 RDD 的一系列轉換,丢失的資料會被重算,由于 RDD 的各個 Partition 是相對獨立的,是以隻需要計算丢失的部分即可, 不用全部重新計算

    ---&gt; 運作方式:RDD通過 persist方法或 cache方法可以将前面的計算結果緩存,但并不會調用時便立緩存,而是觸發後面的action 時,此RDD會被緩存到計算機記憶體中,供後面重用

    ---&gt; 通過檢視源碼可以發現,cache 最終調用的也是 parsist

<code>def</code> <code>persist()</code><code>:</code><code>this</code><code>.</code><code>type</code> <code>=</code> <code>persist(StorageLevel.MEMORY</code><code>_</code><code>ONLY)</code>

<code>def</code> <code>cache()</code><code>:</code><code>this</code><code>.</code><code>type</code> <code>=</code> <code>persist()</code>

    ---&gt; 緩存使用:

<code>rdd</code><code>1</code><code>.count          </code><code>// 沒有緩存,直接執行</code>

<code>rdd</code><code>1</code><code>.cache</code>

<code>rdd</code><code>1</code><code>.count        </code><code>// 第一次執行會慢一些</code>

<code>rdd</code><code>1</code><code>.count        </code><code>// 第二次會很快</code>

    ---&gt; 存儲級别在 object  StorageLevel 中定義

<code>object</code> <code>StorageLevel{</code>

<code>    </code><code>val</code> <code>NONE </code><code>=</code> <code>new</code> <code>StorageLevel(</code><code>false</code><code>, </code><code>false</code><code>, </code><code>false</code><code>, </code><code>false</code><code>)</code>

<code>    </code><code>val</code> <code>DISK</code><code>_</code><code>ONLY </code><code>=</code> <code>new</code> <code>StorageLevel(</code><code>true</code><code>, </code><code>false</code><code>, </code><code>false</code><code>, </code><code>false</code><code>)</code>

<code>    </code><code>val</code> <code>DISK</code><code>_</code><code>ONLY</code><code>_</code><code>2</code> <code>=</code> <code>new</code> <code>StorageLevel(</code><code>true</code><code>, </code><code>false</code><code>, </code><code>false</code><code>, </code><code>false</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>ONLY </code><code>=</code> <code>new</code> <code>StorageLevel(</code><code>false</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>true</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>ONLY</code><code>_</code><code>2</code> <code>=</code> <code>new</code> <code>StorageLevel(</code><code>false</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>true</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>ONLY</code><code>_</code><code>SET </code><code>=</code> <code>new</code> <code>StorageLevel(</code><code>false</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>false</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>ONLY</code><code>_</code><code>SET</code><code>_</code><code>2</code> <code>=</code> <code>new</code> <code>StorageLevel(</code><code>false</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>false</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>AND</code><code>_</code><code>DISK </code><code>=</code> <code>new</code> <code>StorageLevel(</code><code>true</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>true</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>AND</code><code>_</code><code>DISK</code><code>_</code><code>2</code> <code>=</code> <code>new</code> <code>StorageLevel(</code><code>true</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>true</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>AND</code><code>_</code><code>DISK</code><code>_</code><code>SET </code><code>=</code> <code>new</code> <code>StorageLevel(</code><code>true</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>false</code><code>)</code>

<code>    </code><code>val</code> <code>MEMORY</code><code>_</code><code>ADN</code><code>_</code><code>DISK</code><code>_</code><code>SET</code><code>_</code><code>2</code> <code>=</code> <code>new</code> <code>StorageLevel(</code><code>true</code><code>, </code><code>true</code><code>, </code><code>false</code><code>, </code><code>false</code><code>)</code>

<code>    </code><code>val</code> <code>OFF</code><code>_</code><code>HEAP </code><code>=</code> <code>new</code> <code>StorageLevel </code><code>=</code> <code>new</code> <code>StorageLevel(</code><code>true</code><code>, </code><code>true</code><code>, </code><code>true</code><code>, </code><code>false</code><code>)</code>

<code>}</code>

==&gt; RDD的 Checkpoint(檢查點)機制: 容錯機制

    ---&gt; 檢查點本質是通過将 RDD 寫入 Disk 做檢查點

    ---&gt; 作用: 通過做 lineage 做容錯的輔助

    ---&gt; 運作機制: 在RDD 的中間階段做檢查點容錯,之後如果有節點出現問題而丢失分區,從做檢查點的 RDD 開始重新做 Lineage,以達到減少開銷的目的

    ---&gt; 設定檢查點的方式: 本地目錄, HDFS

        ---- 本地目錄(需要将 spark-shell 運作在本地模式上)

<code>// 設定檢查點目錄 </code>

<code>sc.setCheckpointDir(</code><code>"/data/checkpoint"</code><code>)</code>

<code>// 建立一個RDD</code>

<code>// 設定檢查點</code>

<code>rdd</code><code>1</code><code>.checkpoint</code>

<code>// 執行,觸發 Action ,會在檢查點目錄生成檢查點</code>

<code>rdd</code><code>1</code><code>.count</code>

        ---- HDFS(需要将 Spark-shell 運作在叢集模式上)

<code>sc.setCheckpointDir(</code><code>"hdfs://192.168.10.210:9000/data/checkpoint"</code><code>)</code>

==&gt; RDD 的依賴關系 和 Spark 任務中的 Stage

    ---&gt; RDD 依賴關系    RDD和它的父 RDD(s)的關系有兩種不同的類型

        ---- 窄依賴    每個 父 RDD 的 partition 隻能被子 RDD 的一個 partition 使用    一個子RDD

        ---- 寬依賴    多個子RDD 的 partition 會依賴同一個父 RDD        多個子RDD

    ---&gt; Stage    劃分Stage 的依據是:寬依賴

本文轉自 菜鳥的征程 51CTO部落格,原文連結:http://blog.51cto.com/songqinglong/2074380

繼續閱讀