天天看點

[Spark基礎]-- spark-2.0圖文講解核心

導語

spark2.0于2016-07-27正式釋出,伴随着更簡單、更快速、更智慧的新特性,spark 已經逐漸替代 hadoop 在大資料中的地位,成為大資料處理的主流标準。本文主要以代碼和繪圖的方式結合,對運作架構、RDD 的實作、spark 作業原理、Sort-Based Shuffle 的存儲原理、 Standalone 模式 HA 機制進行解析。

1、運作架構

​​Spark​​支援多種運作模式。單機部署下,既可以用本地(Local)模式運作,也可以使用僞分布式模式來運作;當以分布式叢集部署的時候,可以根據實際情況選擇Spark自帶的獨立(Standalone)運作模式、YARN運作模式或者Mesos模式。雖然模式多,但是Spark的運作架構基本由三部分組成,包括SparkContext(驅動程式)、ClusterManager(叢集資料總管)和Executor(任務執行程序)。 

[Spark基礎]-- spark-2.0圖文講解核心

1、SparkContext送出作業,向ClusterManager申請資源;

2、ClusterManager會根據目前叢集的資源使用情況,進行有條件的FIFO政策:先配置設定的應用程式盡可能多地擷取資源,後配置設定的應用程式則在剩餘資源中篩選,沒有合适資源的應用程式隻能等待其他應用程式釋放資源;

3、ClusterManager預設情況下會将應用程式分布在盡可能多的Worker上,這種配置設定算法有利于充分利用叢集資源,适合記憶體使用多的場景,以便更好地做到資料處理的本地性;另一種則是分布在盡可能少的Worker上,這種适合CPU密集型且記憶體使用較少的場景;

4、Excutor建立後與SparkContext保持通訊,SparkContext配置設定任務集給Excutor,Excutor按照一定的排程政策執行任務集。

2、RDD

彈性分布式資料集(Resilient Distributed Datasets,RDD)作為Spark的程式設計模型,相比MapReduce模型有着更好的擴充和延伸: 

  • 提供了抽象層次更高的API 
  • 高效的資料共享 
  • 高效的容錯性 

2.1、RDD 的操作類型 

RDD大緻可以包括四種操作類型:

  • 建立操作(Creation):從記憶體集合和外部存儲系統建立RDD,或者是通過轉換操作生成RDD
  • 轉換操作(Transformation):轉換操作是惰性操作,隻是定義一個RDD并記錄依賴關系,沒有立即執行 
  • 控制操作(Control):進行RDD的持久化,通過設定不同級别對RDD進行緩存
  • 行動操作(Action):觸發任務送出、Spark運作的操作,操作的結果是擷取到結果集或者儲存至外部存儲系統 

2.2、RDD 的實作

2.2.1、RDD 的分區 

RDD的分區是一個邏輯概念,轉換操作前後的分區在實體上可能是同一塊記憶體或者存儲。在RDD操作中使用者可以設定和擷取分區數目,預設分區數目為該程式所配置設定到的cpu核數,如果是從HDFS檔案建立,預設為檔案的分片數。

2.2.2、RDD 的“血統”和依賴關系 

[Spark基礎]-- spark-2.0圖文講解核心

   

“血統”和依賴關系:RDD 的容錯機制是通過記錄更新來實作的,且記錄的是粗粒度的轉換操作。我們将記錄的資訊稱為血統(Lineage)關系,而到了源碼級别,Apache Spark 記錄的則是 RDD 之間的依賴(Dependency)關系。如上所示,每次轉換操作産生一個新的RDD(子RDD),子RDD會記錄其父RDD的資訊以及相關的依賴關系。 

2.2.3、依賴關系

[Spark基礎]-- spark-2.0圖文講解核心

依賴關系劃分為兩種:窄依賴(Narrow Dependency)和 寬依賴(源碼中為Shuffle Dependency)。

窄依賴指的是父 RDD 中的一個分區最多隻會被子 RDD 中的一個分區使用,意味着父RDD的一個分區内的資料是不能被分割的,子RDD的任務可以跟父RDD在同一個Executor一起執行,不需要經過 Shuffle 階段去重組資料。

窄依賴包括兩種:一對一依賴(OneToOneDependency)和範圍依賴(RangeDependency) 

一對一依賴: 

 

[Spark基礎]-- spark-2.0圖文講解核心

範圍依賴(僅union方法): 

 

[Spark基礎]-- spark-2.0圖文講解核心

寬依賴指的是父 RDD 中的分區可能會被多個子 RDD 分區使用。因為父 RDD 中一個分區内的資料會被分割,發送給子 RDD 的所有分區,是以寬依賴也意味着父 RDD 與子 RDD 之間存在着 Shuffle 過程。

寬依賴隻有一種:Shuffle依賴(ShuffleDependency) 

[Spark基礎]-- spark-2.0圖文講解核心

3、作業執行原理

[Spark基礎]-- spark-2.0圖文講解核心

作業(Job):RDD每一個行動操作都會生成一個或者多個排程階段

  

排程階段(Stage):每個Job都會根據依賴關系,以Shuffle過程作為劃分,分為Shuffle Map Stage和Result Stage。每個Stage包含多個任務集(TaskSet),TaskSet的數量與分區數相同。 

任務(Task):分發到Executor上的工作任務,是Spark的最小執行單元 

  

DAGScheduler:DAGScheduler是面向排程階段的任務排程器,負責劃分排程階段并送出給TaskScheduler 

TaskScheduler:TaskScheduler是面向任務的排程器,它負責将任務分發到Woker節點,由Executor進行執行 

3.1、送出作業及作業排程政策(适用于排程階段) 

[Spark基礎]-- spark-2.0圖文講解核心

每一次行動操作都會觸發SparkContext的runJob方法進行作業的送出。

這些作業之間可以沒有任何依賴關系,對于多個作業之間的排程,共有兩種:一種是預設的FIFO模式,另一種則是FAIR模式,該模式的排程可以通過設定minShare(最小任務數)和weight(任務的權重)來決定Job執行的優先級。 

FIFO排程政策:優先比較作業優先級(作業編号越小優先級越高),再比較排程階段優先級(排程階段編号越小優先級越高) 

FAIR排程政策:先擷取兩個排程的饑餓程度,是否處于饑餓狀态由目前正在運作的任務是否小于最小任務決定,擷取後進行如下比較:

  • 優先滿足處于饑餓狀态的排程 
  • 同處于饑餓狀态,優先滿足資源比小的排程 
  • 同處于非饑餓狀态,優先滿足權重比小的排程  
  • 以上情況均相同的情況下,根據排程名稱進行排序 

3.2、劃分排程階段(DAG建構) 

DAG建構圖: 

 

[Spark基礎]-- spark-2.0圖文講解核心

  

DAG的建構:主要是通過對最後一個RDD進行遞歸,使用廣度優先周遊每個RDD跟父RDD的依賴關系(前面提到子RDD會記錄依賴關系),碰到ShuffleDependency的則進行切割。切割後形成TaskSet傳遞給TaskScheduler進行執行。 

DAG的作用:讓窄依賴的RDD操作合并為同一個TaskSet,将多個任務進行合并,有利于任務執行效率的提高。 

TaskSet結構圖:假設資料有兩個Partition時,TaskSet是一組關聯的,但互相之間沒有Shuffle依賴關系的Task集合,TaskSet的ShuffleMapStage數量跟Partition個數相關,主要包含task的集合,stage中的rdd資訊等等。Task會被序列化和壓縮 

4、存儲原理(Sort-Based Shuffle分析)

4.1、Shuffle過程解析(wordcount執行個體)

[Spark基礎]-- spark-2.0圖文講解核心

1. 資料處理:檔案在hdfs中以多個切片形式存儲,讀取時每一個切片會被配置設定給一個Excutor進行處理;

2. map端操作:map端對檔案資料進行處理,格式化為(key,value)鍵值對,每個map都可能包含a,b,c,d等多個字母,如果在map端使用了combiner,則資料會被壓縮,value值會被合并;(注意:這個過程的使用需要保證對最終結果沒有影響,有利于減少shuffle過程的資料傳輸);

3.reduce端操作:reduce過程中,假設a和b,c和d在同一個reduce端,需要将map端被配置設定在同一個reduce端的資料進行洗牌合并,這個過程被稱之為shuffle。

4.2、map端的寫操作

[Spark基礎]-- spark-2.0圖文講解核心

  

1.map端處理資料的時候,先判斷這個過程是否使用了combiner,如果使用了combiner則采用PartitionedAppendOnlyMap資料結構作為記憶體緩沖區進行資料存儲,對于相同key的資料每次都會進行更新合并;如果沒有使用combiner,則采用PartitionedPairBuffer資料結構,把每次處理的資料追加到隊列末尾;

2.寫入資料的過程中如果出現記憶體不夠用的情況則會發生溢寫,溢寫;使用combiner的則會将資料按照分區id和資料key進行排序,做到分區有序,區中按key排序,其實就是将partitionId和資料的key作為key進行排序;沒有使用combiner的則隻是分區有序;

3.按照排序後的資料溢寫檔案,檔案分為data檔案和index檔案,index檔案作為索引檔案索引data檔案的資料,有利于reduce端的讀取;(注意:每次溢寫都會形成一個index和data檔案,在map完全處理完後會将多個inde和data檔案Merge為一個index和data檔案)

4.3、reduce端的讀操作

有了map端的處理,reduce端隻需要根據index檔案就可以很好地擷取到資料并進行相關的處理操作。這裡主要講reduce端讀操作時對資料讀取的政策:

如果在本地有,那麼可以直接從BlockManager中擷取資料;如果需要從其他的節點上擷取,由于Shuffle過程的資料量可能會很大,為了減少請求資料的時間并且充分利用帶寬,是以這裡的網絡讀有以下的政策: 

1.每次最多啟動5個線程去最多5個節點上讀取資料;

2.每次請求的資料大小不會超過spark.reducer.maxMbInFlight(預設值為48MB)/5

5、Spark的HA機制(Standalone模式)

5.1、Executor異常

當Executor發生異常退出的情況,Master會嘗試擷取可用的Worker節點并啟動Executor,這個Worker很可能是失敗之前運作Executor的Worker節點。這個過程系統會嘗試10次,限定失敗次數是為了避免因為應用程式存在bug而反複送出,占用叢集寶貴的資源。

5.2、Worker異常

Worker會定時發送心跳給Master,Master也會定時檢測注冊的Worker是否逾時,如果Worker異常,Master會告知Driver,并且同時将這些Executor從其應用程式清單中删除。

5.3、Master異常

1、ZooKeeper:将叢集中繼資料持久化到ZooKeeper,由ZooKeeper通過選舉機制選舉出新的Master,新的Master從ZooKeeper中擷取叢集資訊并恢複叢集狀态;

2、FileSystem:叢集中繼資料持久化到本地檔案系統中,當Master出現異常隻需要重新開機Master即可;

3、Custom:通過對StandaloneRecoveryModeFactory抽象類進行實作并配置到系統中,由使用者自定義恢複方式。

繼續閱讀