天天看點

Spark技術棧——Spark CoreSpark Core

Spark Core

1. Spark的有幾種部署模式,每種模式特點?

1.本地模式

Spark不一定非要跑到Hadoop叢集,可以在本地,起多個線程的方式來指定。将Spark應用以多線程的方式直接運作在本地,一般都是為了友善調試,本地模式分三類:

  • local:隻啟動一個executor。
  • local[k]:啟動k個executor。
  • local[*]:啟動跟CPU數目相同的executor。

2.standalone模式

分布式部署叢集,自帶完整的服務,資源管理和任務監控是Spark自己監控,這個模式也是其他模式的基礎。

3.Spark on yarn模式

分布式部署叢集,資源和任務監控交給yarn管理,Spark用戶端直接連接配接Yarn,不需要額外建構Spark叢集。有yarn-client和yarn-cluster兩種模式,主要差別在于:Driver程式的運作節點。

  • cluster适合生成,driver運作在叢集子節點,具有容錯功能。
  • client适合調試,driver運作在用戶端。

技巧

考察Spark的幾種部署方式,問題比較簡單,平常調試一般都是本地模式,生産環境一般都是Spark on yarn。

2. driver的功能是什麼?

一個Spark作業運作時包括一個Driver程序,也是作業的主程序,具有main函數,并且有SparkContext的執行個體,是程式的入口點。

功能:負責向叢集申請資源,向master注冊資訊,負責了作業的排程,負責作業的解析,生成Stage并排程Task到Executor上。包括DAGScheduler,TaskScheduler。

技巧

考察Spark架構中元件的作用。需要能夠回答出driver是Spark作業的主程序,是程式的入口。然後回答出它的功能。

3. Hadoop和Spark都是并行計算,那麼它們有什麼相同和差別?

兩者都是用mr模型來進行并行計算,Hadoop的一個作業稱為job,job裡面分為map task和reduce task,每個task 都是在自己的程序中運作的,當task結束時,程序也會結束。

Spark使用者送出的任務稱為application,一個application對應一個Saprkcontext,app中存在多個job,每觸發一次action操作就會産生一個job。這些job可以并行或串行執行,每個job中有多個stage,stage是shuffle過程中DAGSchaduler通過RDD之間的依賴關系劃分job而來的,每個stage裡面有多個task,組成taskset有TaskSchaduler分發到各個executor中執行,executor的生命周期是和app一樣的,即使沒有job運作也是存在的,是以task可以快速啟動讀取記憶體進行計算,Spark的疊代計算都是在記憶體中進行的,API中提取了大量的RDD操作如join,groupby等,而且通過DAG圖可以實作良好的容錯。

Hadoop的job隻有map和reduce操作,表達能力比較欠缺而且在mr過程中會重複的讀寫HDFS,造成大量的io操作,多個job需要自己管理關系。

技巧

可以從Hadoop的job和Spark的job的差別進行對比。還有計算速度上,為什麼Spark的計算會比Hadoop快等方面回答。

4. *簡單描述Spark中的概念RDD,他有哪些特性?

RDD(Resilient Distributed Dataset)叫做彈性分布式資料集,是Spark中最基本的資料抽象,它代表一個不可變,可分區,裡面的元素可并行計算的集合。

RDD五大特性:

  • A list iof partitions一個分區清單,RDD中的資料都存在一個分區清單裡面
  • A function for computing each split作用在每一個分區中的函數
  • A list of dependencies on other RDDs一個RDD依賴于其他多個RDD,這個點很重要,RDD的容錯機制就是依據這個特性而來的
  • Optionally,a Partitioner for key-value RDDs(e.g.to say that the RDD is hash-partitioned)可選的,針對于kv類型的RDD才具有這個特性,作用是決定了資料的來源以及資料處理後的去向
  • Optionally,a list of preferred locations to compute each split on(e.g.block locations for an HDFS file)可選項,資料本地性,資料位置最優

技巧

基礎問題。考察對Spark RDD的了解。需要能夠用自己的話回答出RDD的五個特性,然後需要能夠對彈性分布式資料集進行解釋。彈性展現在哪?分布式展現在哪?

5. *簡述寬依賴和窄依賴概念,groupByKey , reduceByKey , map , filter , union五種,哪些會導緻寬依賴,哪些會導緻窄依賴?

1.窄依賴

指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區,和兩個父RDD的分區對應于一個子RDD的分區。map/filter和union屬于第一類,對輸入進行協同劃分(co-partitioned)的join屬于第二類。

2.寬依賴

指子RDD的分區依賴于父RDD的所有分區,這是因為shuffle類操作。

3.算子的寬窄依賴

對RDD進行map,filter,union等Transformations一般是窄依賴。

寬依賴一般對RDD進行groupByKey,reduceByKey等操作,就是對RDD中的partition中的資料進行重分區(shuffle)。

join操作即可能是寬依賴也可能是窄依賴,當要對RDD進行join操作時,如果RDD進行過重分區則為窄依賴,否則為寬依賴。

技巧

考察Spark寬窄依賴的概念以及算子的使用。需要能夠答出寬窄依賴的定義,以及常用算子是寬依賴還是窄依賴。

6. *Spark如何防止記憶體溢出?

1.driver端的記憶體溢出

可以增大driver的記憶體參數:spark.driver.memory(default 1g)

這個參數用來設定Driver的記憶體。在Spark程式中,SparkContext,DAGScheduler都是運作在Driver端的。對應rdd的Stage切分也是在Driver端運作,如果使用者自己寫的程式有過多的步驟,切分出過多的Stage,這部分資訊消耗的是Driver的記憶體,這個時候就需要調大Driver的記憶體。

2.map過程産生大量對象導緻記憶體溢出

這種溢出的原因是在單個map中産生了大量的對象導緻的,例如:rdd.map(x=>for(i <-1 to 10000) yield i.toString),這個操作在rdd中,每個對象都産生了10000個對象,這肯定很容易産生記憶體溢出的問題。針對這種問題,在不增加記憶體的情況下,可以通過減少每個Task的大小,以便達到每個Task即使産生大量的對象Executor的記憶體也能夠裝得下。具體做法可以在會産生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。

面對這種問題注意,不能使用rdd.coalesce方法,這個方法隻能減少分區,不能增加分區,不會有shuffle的過程。

3.資料不平衡導緻記憶體溢出

資料不平衡除了有可能導緻記憶體溢出外,也有可能導緻性能的問題,解決方法和上面說的類似,就是調用repartition重新分區。這裡就不再累贅了。

4.shuffle後記憶體溢出

shuffle記憶體溢出的情況可以說都是shuffle後,單個檔案過大導緻的。在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,預設的partitioner都是Hashpatitioner,預設值是父RDD中最大的分區數,這個參數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions),spark.default.parallelism參數隻對HashPartitioner有效,是以如果是别的Partitioner或者自己實作的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的并發量了。如果是别的partitioner導緻的shuffle記憶體溢出,就需要從partitioner的代碼增加partitions的數量。

5.standalone模式下資源配置設定不均勻導緻記憶體溢出

在standalone的模式下如果配置了 --total-executor-cores 和 --executor-momory 這兩個參數,但是沒有配置 --executor-cores這個參數的話,就有可能導緻每個Executor的memory是一樣的,但是cores的數量不同,那麼在cores數量多的Executor中,由于能夠同時執行多個Task,就容易導緻記憶體溢出的情況。這種情況的解決方法就是同時配置 --executor-cores或者 spark.executor.cores參數,確定Executor資源配置設定均勻。

6.使用 rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在記憶體不足的時候rdd.cache()的資料會丢失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在記憶體不足的時候會存儲在磁盤,避免重算,隻是消耗點IO是時間。

技巧

以上列舉了Spark記憶體溢出的幾種情況。回答時需針對性進行分析,比如針對什麼情況下會産生記憶體溢出,然後如何解決的來進行回答。最好能結合着自己的項目來講。至少需要能夠回答出其中兩到三點。

7. stage,task和job的差別與劃分方式?

  • Job:一個由多個任務組成的并行計算,當你需要執行一個rdd的action的時候,會生成一個job。
  • Stage:每個Job被拆分成更小的被稱為stage(階段)的task(任務)組,stage彼此之間是互相依賴的,各個stage會按照執行順序依次執行。
  • Task:一個将要被發送到Executor中的工作單元。是stage的一共任務執行單元,一般來說,一個rdd有多少個partition,就會有多少個task,因為每一個task隻是處理一個partition上的資料。

技巧

基礎問題。了解并回答出Spark中Job,Stage,Task三者之間的聯系和差別。

8. Spark送出作業參數(重點)

在送出任務時的幾個重要參數:

  • executor-cores —— 每個executor使用的核心數,預設為1,官方建議2-5個,我們企業是4個
  • num-executor —— 啟動executors的數量,預設為2
  • executor-memory —— executor記憶體大小,預設1G
  • driver-cores ——driver使用核心數,預設為1
  • driver-memory ——driver記憶體大小,預設512M

    下面是一個送出任務的樣式:

spark-submit \
--master local[5] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--executor-memory 8g \
--chass PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
           

技巧

考察Spark任務送出時的相關參數。重點是核心數,executors的數量和記憶體大小,需要回答出預設大小和自己項目中一般設定為多大。

9. *Spark 中 reduceByKey 和 groupByKey 差別與用法?

reduceByKey用于對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,并且merge操作可以通過函數自定義。

groupByKey()也是對每個key對應的多個value進行操作,但是隻是彙總生成一個sequence,本身不能自定義函數,隻能通過額外通過map(func)來實作。

在大的資料集上,reduceByKey(func)的效果比groupByKey()的效果更好一些。因為reduceByKey()會在shuffle之前對資料進行合并,傳輸速度優于groupByKey。

技巧

考察Spark算子。需要能夠回答reduceByKey和groupByKey的差別,需要能夠回答出每個算子的特點,reduceBykey的優點是什麼。

10. *foreach 和 map 的差別?

兩個方法的共同點在于:都是用于周遊集合對象,并對每一項執行指定的方法。而兩者的差異在于:

  • foreach無傳回值(準确說傳回void),map傳回集合對象。foreach用于周遊集合,而map用于映射(轉換)集合到另一個集合。
  • foreach中的處理邏輯是串行的,map中的處理邏輯是并行的。
  • map是Transformation算子,foreach是action算子。

技巧

考察Spark算子。從功能用法上來進行回答,既然拿來比較,一定有相同的地方,是以回答時應結合共同點和不同點來回答。foreach和map可以先講一下他們的功能,用法,然後再說一下他們處理邏輯上的不同以及屬于不同的算子。

11. *map 與 mapPartitions 的差別:

相同:map 與 mapPartitions 都屬于 Transformation(轉換)算子。

差別:

  1. 本質
  • map是對rdd中的每一個元素進行操作
  • mapPartitions則是對rdd中的每個分區的疊代器進行操作
  1. RDD中的每個分區資料量不大的情形
  • map操作性能低下。比如一個partition中有1萬條資料,那麼在分析每個分區時,function要執行和計算1萬次。
  • mapPartitions性能較高。使用mapPartitions操作之後,一個task僅僅會執行一次function,function一次接收所有的partition資料。隻要執行一次就可以了,性能比較高。
  1. RDD中的每個分區資料量超大的情形,比如一個Partition有100萬條資料。
  • map能正常執行完。
  • mapPartitions一次傳入一個function後,可能一下子記憶體不夠用,造成OOM(記憶體溢出)。

技巧

考察Spark算子。需要至少能夠答出第一條差別:功能用法方面的不同。

12. *foreach 和 foreachPartition 的差別

相同:foreach 和 foreachPartition都屬于行動(Action)算子。

差別:

  • foreach每次處理RDD中的一條資料。
  • foreachPartition每次處理RDD中每個分區的疊代器中的資料。

技巧

考察Spark算子。需要能夠回答出foreach是每次處理RDD中的一條資料,需要能夠回答出foreachPartition用法上和foreach的不同。

13. groupByKey , reduceByKey 和 combineByKey 的差別

1.groupByKey

  • (1) 用于對每個key進行操作,将結果生成一個sequence。
  • (2) groupByKey本身不能自定義函數。
  • (3) 會将所有鍵值對進行移動,不會進行局部merge
  • (4) 會導緻叢集節點之間的開銷很大,導緻傳輸延時。

2.reduceByKey

  • (1)用于對每個key對應的多個value進行merge操作。
  • (2)該算子能在本地先進行merge操作。
  • (3)merge操作可以通過函數進行自定義。

3.combineByKey

  • (1)是一個比較底層的算子。
  • (2)reduceByKey底層就使用了combineKyKey。

技巧

考察Spark算子。基礎問題。

14. sortByKey這個算子是全局排序嗎?

是全局排序。

排序的内幕:

  1. 在sortByKey之前将資料使用partitioner根據資料範圍來分。
  2. 使得p1分區所有的資料小于p2,p2分區所有的資料小于p3,以此類推。(p1~pn是分區辨別)
  3. 然後,使用sortByKey算子針對每一個Partition進行排序,這樣全局的資料就被排序了。

技巧

考察Spark算子。基礎問題。

15.Spark 中 coalesce 與 repartition 的差別?

我們常認為coalesce不産生shuffle會比repartition産生shuffle效率高,而實際情況往往要根據具體問題具體分析,coalesce效率不一定高,有時還有大坑,大家要慎用。

coalesce 與 repartition 他們兩個都是RDD的分區進行重新劃分,repartition隻是coalesce接口中shuffle為true的實作。假設源RDD有N個分區,需要重新劃分成M個分區:

  • 如果N<M。一般情況下N個分區有資料分布不均勻的狀況,利用HashPartitioner函數将資料重新分區為M個,這時需要将shuffle設定為true(repartition實作,coalesce也實作不了)。
  • 如果N>M并且N和M相差不多,(假如N是1000,M是100)那麼就可以将N個分區中的若幹個分區合并成一個新的分區,最終合并為M個分區,這時可以将shuffle設定為false(coalesce實作),如果M>N時,coalesce是無效的,不進行shuffle過程。父RDD和子RDD之間是窄依賴關系,無法使檔案數(partition)變多。總之如果shuffle為false時,如果傳入的參數大于現有的分區數目,RDD的分區數不變,也就是說不經過shuffle,是無法将RDD的分區數變多的。
  • 如果N>M并且兩者相差懸殊,這時你要看executor數與要生成的partition關系,如果executor數 <= 要生成partition數,coalesce效率高,反之如果用coalesce會導緻(executor 數 - 要生成partition 數)個executor 空跑進而降低效率。如果在M為1的時候,為了使coalesce之前的操作有更好的并行度,可以将shuffle設定為true。

技巧

考察Spark算子。中等難度。需要能夠回答出兩個都是RDD的分區進行重新劃分,repartition隻是coalesce接口中shuffle為true的實作。

16. *Spark血統的概念?

Spark的主要差別在于它處理分布式運算環境下的資料容錯性(節點失效/資料丢失)問題時采用的方案。為了保證RDD中的資料的魯棒性,RDD資料集通過所謂的血統關系(Lineage)記住了它是如何從其他RDD中演變過來的。相比其它系統的細顆粒度的記憶體資料更新級别的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定資料轉換(Transformation)操作(filter,map,join etc.)行為。當這個RDD的部分分區資料丢失時,它可以通過Lineage擷取足夠的資訊來重新運算和恢複丢失的資料分區。這種粗顆粒的資料模型,限制了Spark的運用場合,但同時相比細顆粒度的資料模型,也帶來了性能的提升。

RDD在Lineage依賴方面分為兩種:窄依賴與寬依賴,用來解決資料容錯時的高效性。

窄依賴:是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區或多個父RDD的分區對應于一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。

寬依賴:是指子RDD的分區依賴于父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。

對于寬依賴,這種計算的輸入和輸出在不同的節點上,Lineage方法對于輸入節點完好,而輸出節點當機時,通過重新計算,這種情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是Lineage,血統的意思),窄依賴對于資料的重算開銷要遠小于寬依賴的資料重算開銷。

在RDD計算,通過checkpoint進行容錯,做checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。使用者可以控制采用哪種方式來實作容錯,預設是logging the updates方式,通過記錄跟蹤所有生成RDD的轉換(transformations)也就是記錄每個RDD的Lineage(血統)來重新計算生成丢失的分區資料。

技巧

考察Spark血統,RDD依賴。

17. 講一講 Spark RDD 的持久化機制?

1.cache() 和 persist()

當對RDD執行持久化操作時,每個節點都會将自己操作的RDD的partition持久化到記憶體中,并且在之後對該RDD的反複使用中,直接使用記憶體緩存的partition,這樣的話,對于針對一個RDD反複執行多個操作的場景,就隻要對RDD計算一次即可,後面直接使用該RDD,而不需要計算多次該RDD。

巧妙使用RDD持久化,甚至在某些場景下,可以将Spark應用程式的性能提升10倍。對于疊代式算法和快速互動式應用來說,RDD持久化,是非常重要的。

要持久化一個RDD,隻要調用其cache()或者 persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中,而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丢失了,那麼Spark會自動通過其源RDD,使用transformation操作重新計算該partition。

cache() 和 persist() 的差別在于:cache() 是 persist() 的一種簡化方式,cache() 的底層就是調用的 persist()的無參版本,同時就是調用 persist(MEMORY_ONLY),将資料持久化到記憶體中。如果需要從記憶體中去除緩存,那麼可以使用unpersist()方法。

2.checkPoint

場景:

當業務場景非常的複雜的時候,RDD的Lineage(血統)依賴會非常的長,一旦血統較後面的RDD資料丢失的時候,Spark會根據血統依賴重新的計算丢失的RDD,這樣會造成計算的時間過長,Spark提供了一個叫checkPoint的算子來解決這樣的業務場景。

使用:

為目前RDD設定檢查點。該函數将會建立一個二進制的檔案,并存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir() 設定的。在checkpoint的過程中,該RDD的所有依賴于父RDD中的資訊将全部被移出。對RDD進行checkpoint操作并不會馬上執行,必須執行Action操作才能觸發。

checkPoint 的優點:

  • 持久化在HDFS上,HDFS預設的3副本備份使得持久化的備份資料更加的安全。
  • 切斷RDD的依賴關系:當業務場景複雜的時候,RDD的依賴關系非常的長的時候,當靠後的RDD資料丢失的時候,會經曆較長的重新計算的過程,采用checkPoint會轉為依賴checkPointRDD,可以避免長的Lineage重新計算。
  • 建議checkpoint之前進行cache操作,這樣會直接将記憶體中的結果進行checkPoint,不用重新啟動 job 重新計算。(優化)

    checkPoint的原理:

  • 當finalRDD執行Action類算子計算job任務的時候,Spark會從finalRDD從後往前回溯檢視哪些RDD使用了checkPoint算子。
  • 将使用了 checkPoint 的算子标記。
  • Spark 會自動的啟動一個job來重新計算标記了的RDD,并将計算的結果存入HDFS,然後切斷RDD的依賴關系。

技巧

考察Spark持久化機制。需要重點能夠回答兩種不同的持久化政策。一種cache 和 persist,要回答出cache 和 persist的差別,重點回答出 cache 底層是調用 persist(MEMORY_ONLY)。另一種是 checkPoint,要掌握 checkPoint的使用,優點以及執行原理。

18. *Spark 送出任務的整個流程說一下?

1.Standalone-Client 方式送出任務

  • (1) Client模式下送出任務,在用戶端啟動Driver 程序。
  • (2) Driver會向Master申請啟動Application啟動的資源。
  • (3) 資源申請成功,Driver端将Task發送到Worker端執行。
  • (4) Worker将Task執行結果傳回到Driver端。

2.Standalone-Cluster方式送出任務

  • (1) Standalone-Cluster模式送出App後,會向Master請求啟動Driver。
  • (2) Master接收請求之後,随機在叢集中一台節點啟動Driver程序。
  • (3) Driver啟動後為目前的應用程式申請資源。
  • (4) Driver端發送task到worker節點上執行。
  • (5) worker将執行情況和執行結果傳回給Driver端。

3.Yarn-Client方式送出任務

  • (1) 用戶端送出一個Application,在用戶端啟動一個Driver程序。
  • (2) 應用程式啟動後會向RS(ResourceManager)發送請求,啟動AM(ApplicationMaster)的資源。
  • (3) RS收到請求,随機選擇一台NM(NodeManager) 啟動AM。這裡的NM相當于(1)Standalone中的Worker節點。
  • (4) AM啟動後,會向RS請求一批container資源,用于啟動Executor。
  • (5) RS會找到一批NM傳回給AM,用于啟動Executor。

4. Yarn-Cluster方式送出任務

  • (1) 客戶機送出Application應用程式,發送請求到RS(ResourceManager),請求啟動AM(ApplicationMaster)。
  • (2) RS收到請求後随機在一台NM(NodeManager)上啟動AM(相當于Driver端)。
  • (3) AM啟動,AM發送請求到RS,請求一批container用于啟動Executor。
  • (4) RS傳回一批NM節點給AM。
  • (5) AM連接配接到NM,發送請求到NM啟動Executor。
  • (6) Executor反向注冊到AM所在的節點的Driver。Driver發送task到Executor。

技巧

考察Spark送出任務的流程。重點掌握Yarn叢集的送出任務的流程。因為企業生産環境最常用的是Spark on yarn,是以最常問的是Yarn方式送出任務的流程。

19.Spark Join 的優化經驗?

Saprk 作為分布式的計算架構,最為影響其執行效率的地方就是頻繁的網絡傳輸。是以一般的,在不存在資料傾斜的情況下,想要提高Spark job的執行效率,就盡量減少job的shuffle 過程(減少 job的stage),或者減少shuffle帶來的影響。

  • 盡量減少參與 join 的RDD的資料量
  • 盡量避免參與 join 的RDD都具有重複的key。
  • 盡量避免或者減少shuffle過程。
  • 條件允許的情況下,使用map-join完成join。

技巧

考察Spark的優化經驗。需要能夠答出join的優化之減少shuffle過程的方法。以及該如何減少。

20. Spark的shuffle有幾種方式?

shuffle方式共三種,分别是:

  1. HashShuffle
  2. SortShuffle(預設)
  3. TungstenShuffle

    在Spark程式中設定方式,通過設定 spark.shuffle.manager 進行配置:

val session = SparkSession.builder()
		.appName("XXXX")
		.master("local[8]")
		// 可設定為 hash , sort , tungsten-sort
		.config("spark.shuffle.manager","hash")
		.getOrCreate()
           

HashShuffleManager 特點:

  • 資料不進行排序,速度較快。
  • 直接寫入緩沖區,緩沖區寫滿後溢寫為檔案。
  • 本 ShuffleMapStage的每一個task會生成與下一個ShuffleMapStage并行度相同的檔案數量。
  • 海量檔案操作句柄和臨時緩存資訊,占用記憶體容易記憶體溢出。

    SortShuffleManager(預設) 特點:

  • 會對資料進行排序。
  • 在寫入緩存之前,如果是 reduceByKey 之類的算子,則會先寫入到一個Map記憶體資料結構中,而如果是join之類的算子,則先寫入到Array記憶體資料結構中。在每條資料寫入先前判斷是否當到達一定閥值,到達則寫入到緩沖區。
  • 複用一個core 的Task 會寫到同一個檔案裡,并生成一個索引檔案。其中記錄了下一個ShuffleMapStage中每一個task所要拉取資料的 start offset 和 end offset。

技巧

考察Spark 的shuffle的幾種方式,如何配置,以及相對應的特點。重點回答出hash 和sort 兩種ShuffleManager 的特點。

21 *哪些算子操作涉及到shuffle?

distinct , groupByKey , reduceByKey , aggregateByKey , join , cogroup , repartition

技巧

考察Spark的涉及shuffle的算子。基礎問題。

22. *簡述 MR 的shuffle 和 Spark 的 shuffle 過程?

MR:首先MR的shuffle,主要是基于磁盤計算,如果資料量過大的話,那麼磁盤io就會産生過大,那麼此時性能會很低,計算起來速度很慢,并且MR的shuffle計算預設是需要進行分組排序,那麼此時資料量很大,那麼進行分組排序的時候,每個資料都要分到相同的分區,并且還要排序,資源大大消耗,毫無效率可言。

Spark:Spark計算主要是基于記憶體,當記憶體寫滿,才會寫到磁盤,這樣速度很快,并且Sparkshuffle的操作可以不進行排序操作,這裡可以設定,利用hashshuffle,和 consolidation機制,而且shuffle計算可以疊代計算,通過這種設定,可以大大提高性能,并且縮短計算時間。

技巧

考察Spark 的shuffle過程。重點問題。

23. *Spark廣播變量的作用?

使用廣播變量,每個Executor的記憶體中,隻駐留一份變量副本,而不是對每個task都傳輸一次大變量,省了很多的網絡傳輸,對性能提升具有很大幫助,而且會通過高效的廣播算法來減少傳輸代價。

使用廣播變量的場景很多,我們都知道Spark一種常見的優化方式就是小表廣播,使用 map join 來代替 reduce join,我們通過把小的資料集廣播到各個節點上,節省了一次特别 expensive 的 shuffle 操作。

比如 driver 上有一張資料量很小的表,其他節點上的task都需要lookup這張表,那麼driver可以先把這張表copy到這些節點,這樣task就可以在本地查表了。

技巧

考察Spark廣播變量的用法。需要能夠回答出廣播變量是每個Executor的記憶體中,隻傳輸一份變量副本,而不是對每個task都傳輸一次大變量。可以結合項目講講它的使用場景。

24. *資料傾斜解決方案?

資料傾斜的發生一般都是一個key對應的資料過大,而導緻task執行過慢,或者記憶體溢出,OOM,一般發送在shuffle的時候,比如reduceByKey , countByKey , groupByKey,容易産生資料傾斜。

如何解決資料傾斜,首先看log日志資訊,因為log日志報錯時候會提示在那些行,然後就是去檢查發生shuffle的地方,這些地方比較容易發生資料傾斜。

第一個方案就是聚合源資料:

我們的資料一般來自于Hive表,那麼在生成Hive表的時候對資料進行聚合,按照key進行分組,将key對應的所有values以另一種格式存儲,比如拼接成一個字元串這樣的話,可以省略groupByKey 和 reduceByKey 的操作,那麼沒有這樣操作的話,也就不用shuffle了,沒shuffle的話不可能出現資料傾斜,如果不能完美拼接,但是能少量拼接也能減少key對應的資料量,這樣也可以提高性能。

第二種方案過濾導緻傾斜的key:

這種方案就是說如果業務允許或者溝通過後能了解的話,我們可以把大的key進行過濾,這樣可以輕松解決問題。

第三種方案提高shuffle操作reduce并行度:

(reduceByKey(new… , 1000))通過提高reduce端的task執行數量,來分擔資料壓力,也就是說将task執行數量提高,性能也會相對應提高,這樣方式如果在運作中确實解決了資料傾斜問題最好了,但是如果出現之前運作時候OOM了,加大了reduce端task的數量,可以運作了,但是執行時間相當的長,那麼就放棄這第三種的方案,換别的方案。

第四種方案利用雙重聚合:

用于groupByKey 和 reduceByKey,比較試用與join,但是通常不用這樣做,也就是說首先第一輪對key進行打散,将原來一樣的key變成不一樣的key前面加字首,相當于将一樣的key分了多個組,然後進行局部聚合,接着除掉每個key的字首,然後在進行全局的聚合,進行兩次聚合,避免資料傾斜問題。

第五種方案将reduce join 轉換成 map join:

如果兩個rdd進行join,有一個表比較小的話,可以将小的表通過broadcast廣播出去,這樣每個節點的blockmanager中都有一份,這樣的話根本不會發送shuffle,那麼也就肯定不會存在資料傾斜的問題了,如果join中沒有資料傾斜的情況,第一時間考慮這樣方式;但是如果兩個表都很大,那麼就不可以broadcast了(記憶體不足),還有就是用 map join來代替reduce join,也就是說犧牲一點點記憶體,是可以接收的。

第六種方案sample抽樣分解聚合:

也就是說将傾斜的key單拉出來,然後用一個RDD進行打亂join。

第七種方案使用随機數和擴容表進行join:

也就是說通過flatmap進行擴容,然後在将随機數打入進去,再進行join,這樣的話不能根本的解決資料傾斜,但是可以有效的緩解資料傾斜的問題,也會提高性能。

技巧

考察Spark的資料傾斜以及解決方案。屬于調優的問題。考察重點,需要能夠回答出其中的兩三種方案。

25. Spark讀取資料生成RDD分區預設多少?

textFile算子的參與minPartitions的預設值為defaultMinPartitions,該方法的實作代碼為math.min(defaultParallelism, 2),其中defaultParallelism與CPU的核數有關系,也就是說預設分區數量是取CPU的核數和2的最小值。

技巧

考察Spark的分區。預設分區的計算方法:math.min(defaultParallelism, 2),是以預設分區數量取CPU核數和2的最小值。

26. 100個分片,我想聚合成兩個分片,用哪個算子?

coalesce算子,主要就是用于在filter操作之後,針對每個partition的資料量各不相同的情況,來壓縮partition的數量。減少partition的數量,而且讓每個partition的資料量都盡量均勻緊湊。進而便于後面的task進行計算操作,在某種程度上,能夠一定程度的提升性能。

27. Spark的通信機制?

Spark消息通信主要分成三個部分:整體架構;啟動消息通信;運作時消息通信。

1.概述

Spark(舊版本)的遠端程序通信(RPC)是通過Akka類庫來實作的,Akka使用Scala語言開發,基于Actor并發模型實作,Akka具有高可靠,高性能,可擴充等特點。

2.具體通信流程

  • (1)首先啟動Master程序,然後啟動所有的Worker程序。
  • (2)Worker啟動後,在preStart方法中與Master建立連接配接,向Master發送注冊資訊,将Worker的資訊通過case class封裝起來發送給Master。
  • (3)Master接收到Worker的注冊消息後将其通過集合儲存起來,然後向Worker回報注冊成功的消息。
  • (4)Worker會定期向Master發送心跳包,領受新的計算任務。
  • (5)Master會定期清理逾時的Worker。

3.通信架構

Spark2.2使用Netty作為master與worker的通信架構,Spark2.0之前使用的akka架構。

  1. Spark啟動消息通信

    worker向Master發送注冊消息,master處理完畢後傳回注冊成功或者是失敗的消息,如果成功,worker向master定時發送心跳。

    2.Spark運作時消息通信

    應用程式SparkContext向master發送注冊消息,并由master為該應用配置設定Executor,exectour啟動之後會向SparkContext發送注冊成功消息,然SparkContext的rdd觸發Action之後會形成一個DAG,通過DAGScheduler進行劃分Stage并将其轉化成TaskSet,然後TaskScheduler向Executor發送執行消息,Executor接收到資訊之後啟動并且運作,最後是由Driver處理結果并回收資源。

技巧

了解spark的通信機制即可。偏底層問題。需要能夠從通信機制的整體架構;啟動消息通信,運作時消息通信三個方面來回答。另外需要能夠答出使用的架構,注意2.0之前和之後使用的架構不同。我們使用的2.2版本,使用的是Netty的通信架構。

繼續閱讀