天天看點

Spark(十一)Spark分區

  分區是rdd内部并行計算的一個計算單元,rdd的資料集在邏輯上被劃分為多個分片,每一個分片稱為分區,分區的格式決定了并行計算的粒度,而每個分區的數值計算都是在一個任務中進行的,是以任務的個數,也是由rdd(準确來說是作業最後一個rdd)的分區數決定。

  資料分區,在分布式叢集裡,網絡通信的代價很大,減少網絡傳輸可以極大提升性能。mapreduce架構的性能開支主要在io和網絡傳輸,io因為要大量讀寫檔案,它是不可避免的,但是網絡傳輸是可以避免的,把大檔案壓縮變小檔案,   進而減少網絡傳輸,但是增加了cpu的計算負載。

  spark裡面io也是不可避免的,但是網絡傳輸spark裡面進行了優化:

  spark把rdd進行分區(分片),放在叢集上并行計算。同一個rdd分片100個,10個節點,平均一個節點10個分區,當進行sum型的計算的時候,先進行每個分區的sum,然後把sum值shuffle傳輸到主程式進行全局sum,是以進行sum型計算對網絡傳輸非常小。但對于進行join型的計算的時候,需要把資料本身進行shuffle,網絡開銷很大。

spark是如何優化這個問題的呢?

  spark把key-value rdd通過key的hashcode進行分區,而且保證相同的key存儲在同一個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程,我們進行mapreduce計算的時候為什麼要進行shuffle?,就是說mapreduce裡面網絡傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle。shuffle是非常影響網絡的,它要把所有的資料混在一起走網絡,然後它才能把相同的key走到一起。要進行shuffle是存儲決定的。

  spark從這個教訓中得到啟發,spark會把key進行分區,也就是key的hashcode進行分區,相同的key,hashcode肯定是一樣的,是以它進行分區的時候100t的資料分成10分,每部分10個t,它能確定相同的key肯定在一個分區裡面,而且它能保證存儲的時候相同的key能夠存在同一個節點上。比如一個rdd分成了100份,叢集有10個節點,是以每個節點存10份,每一分稱為每個分區,spark能保證相同的key存在同一個節點上,實際上相同的key存在同一個分區。

  key的分布不均決定了有的分區大有的分區小。沒法分區保證完全相等,但它會保證在一個接近的範圍。是以mapreduce裡面做的某些工作裡邊,spark就不需要shuffle了,spark解決網絡傳輸這塊的根本原理就是這個。

  進行join的時候是兩個表,不可能把兩個表都分區好,通常情況下是把用的頻繁的大表事先進行分區,小表進行關聯它的時候小表進行shuffle過程。

  大表不需要shuffle。

  需要在工作節點間進行資料混洗的轉換極大地受益于分區。這樣的轉換是  cogroup,groupwith,join,leftouterjoin,rightouterjoin,groupbykey,reducebykey,combinebykey 和lookup。

  分區是可配置的,隻要rdd是基于鍵值對的即可。

rdd分區的一個分區原則:盡可能是得分區的個數等于叢集核心數目

無論是本地模式、standalone模式、yarn模式或mesos模式,我們都可以通過spark.default.parallelism來配置其預設分區個數,若沒有設定該值,則根據不同的叢集環境确定該值

以下這種預設方式就一個分區

Spark(十一)Spark分區

結果

Spark(十一)Spark分區

設定了幾個分區就是幾個分區

Spark(十一)Spark分區
Spark(十一)Spark分區

n等于幾預設就是幾個分區

如果n=* 那麼分區個數就等于cpu core的個數

Spark(十一)Spark分區
Spark(十一)Spark分區

本機電腦檢視cpu core,我的電腦--》右鍵管理--》裝置管理器--》處理器

Spark(十一)Spark分區
Spark(十一)Spark分區
Spark(十一)Spark分區
Spark(十一)Spark分區

 進入defaultparallelism方法

Spark(十一)Spark分區

繼續進入defaultparallelism方法

Spark(十一)Spark分區

這個一個trait,其實作類是(ctrl+h)

Spark(十一)Spark分區

進入taskschedulerimpl類找到defaultparallelism方法

Spark(十一)Spark分區

繼續進入defaultparallelism方法,又是一個trait,看其實作類

Spark(十一)Spark分區

ctrl+h看schedulerbackend類的實作類

Spark(十一)Spark分區

進入coarsegrainedschedulerbackend找到defaultparallelism

Spark(十一)Spark分區

totalcorecount.get()是所有executor使用的core總數,和2比較去較大值

如果正常的情況下,那你設定了多少就是多少

(1)如果是從hdfs裡面讀取出來的資料,不需要分區器。因為hdfs本來就分好區了。

    分區數我們是可以控制的,但是沒必要有分區器。

(2)非key-value rdd分區,沒必要設定分區器

(3)key-value形式的時候,我們就有必要了。

hashpartitioner

rangepartitioner

自定義分區

需要實作2個方法

Spark(十一)Spark分區