Spark核心泛指Spark的核心運作機制,包括Spark核心元件的運作機制、Spark任務排程機制、Spark記憶體管理機制、Spark核心功能的運作原理等,熟練掌握Spark核心原理。
Driver
Spark驅動器節點,用于執行Spark任務中的main方法,負責實際代碼的執行工作。Driver在Spark作業執行時主要負責:
1、将使用者程式轉化為任務(Job);
2、在Executor之間排程任務(task);
3、跟蹤Executor的執行情況;
4、通過UI展示查詢運作情況。
Executor
Spark Executor節點是一個JVM程序,負責在Spark作業中運作具體任務,任務彼此之間互相獨立。Spark應用啟動時,Executor節點被同時啟動,并且始終伴随着整個Spark應用的生命周期而存在。如果有Executor節點發生了故障或崩潰,Spark應用也可以繼續執行,會将出錯節點上的任務排程到其他Executor節點上繼續運作。
Executor有兩個核心功能:
1、負責運作組成Spark應用的任務,并将結果傳回給Driver程序;
2、它們通過自身的塊管理器(Block Manager)為使用者程式中要求緩存的RDD提供記憶體式存儲。RDD是直接緩存在Executor程序内的,是以任務可以在運作時充分利用緩存資料加速運算。
Spark通用運作流程概述

上圖為Spark通用運作流程,不論Spark以何種模式進行部署,任務送出後,都會先啟動Driver程序,随後Driver程序向叢集管理器注冊應用程式,之後叢集管理器根據此任務的配置檔案配置設定Executor并啟動,當Driver所需的資源全部滿足後,Driver開始執行main函數,Spark查詢為懶執行,當執行到action算子時開始反向推算,根據寬依賴進行stage的劃分,随後每一個stage對應一個taskset,taskset中有多個task,根據本地化原則,task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通信,報告任務運作情況。
Spark支援三種叢集管理器(Cluster Manager),分别為:
1、Standalone:獨立模式,Spark原生的簡單叢集管理器,自帶完整的服務,可單獨部署到一個叢集中,無需依賴任何其他資源管理系統,使用Standalone可以很友善地搭建一個叢集;
2、Apache Mesos:一個強大的分布式資源管理架構,它允許多種不同的架構部署在其上,包括yarn;
3、Hadoop YARN:統一的資源管理機制,在上面可以運作多套計算架構,如map reduce、storm等,根據driver在叢集中的位置不同,分為yarn client和yarn cluster。
實際上,除了上述這些通用的叢集管理器外,Spark内部也提供了一些友善使用者測試和學習的簡單叢集部署模式。由于在實際工廠環境下使用的絕大多數的叢集管理器是Hadoop YARN,是以我們關注的重點是Hadoop YARN模式下的Spark叢集部署。
Spark的運作模式取決于傳遞給SparkContext的MASTER環境變量的值,個别模式還需要輔助的程式接口來配合使用,目前支援的Master字元串及URL包括:
使用者在送出任務給Spark處理時,以下兩個參數共同決定了Spark的運作方式。
- master MASTER_URL:決定了Spark任務送出給哪種叢集處理。
- deploy-mode DEPLOY_MODE:決定了Driver的運作方式,可選值為Client或者Cluster。
Standalone模式運作機制
Standalone叢集有四個重要組成部分,分别是:
(1)Driver:是一個程序,我們編寫的Spark應用程式就運作在Driver上,由Driver程序執行;
(2)Master:是一個程序,主要負責資源排程和配置設定,并進行叢集的監控等職責;
(3)Worker:是一個程序,一個Worker運作在叢集中的一台伺服器上,主要負責兩個職責,一個是用自己的記憶體存儲RDD的某個或某些partition;另一個是啟動其他程序和線程(Executor),對RDD上的partition進行并行的處理和計算。
(4)Executor:是一個程序,一個Worker上可以運作多個Executor,Executor通過啟動多個線程(task)來執行對RDD的partition進行并行計算,也就是執行我們對RDD定義的例如map、flatMap、reduce等算子操作。
Standalone Client模式
1、在Standalone Client模式下,Driver在任務送出的本地機器上運作;
2、Driver啟動後向Master注冊應用程式,Master根據submit腳本的資源需求找到内部資源至少可以啟動一個Executor的所有Worker,然後在這些Worker之間配置設定Executor;
3、Worker上的Executor啟動後會向Driver反向注冊;
4、當所有的Executor注冊完成後,Driver開始執行main函數;
5、之後執行到Action算子時,開始劃分stage;
6、每個stage生成對應的taskSet,之後将task 分發到各個Executor上執行。
Standalone Cluster模式
1、在Standalone Cluster模式下,任務送出後,Master會找到一個Worker啟動Driver程序;
2、Driver啟動後向 Master注冊應用程式;
3、Master根據submit腳本的資源需求找到内部資源至少可以啟動一個Executor的所有 Worker,然後在這些Worker之間配置設定Executor;
4、Worker上的Executor啟動後會向Driver反向注冊;
5、所有的 Executor注冊完成後,Driver開始執行main函數;
6、之後執行到Action算子時,開始劃分stage,每個stage生成對應的taskSet;
7、之後将task分發到各個Executor上執行。
注意,Standalone的兩種模式下(client/Cluster),Master在接到Driver注冊Spark應用程式的請求後,會擷取其所管理的剩餘資源能夠啟動一個 Executor的所有Worker,然後在這些Worker之間分發Executor,此時的分發隻考慮Worker上的資源是否足夠使用,直到目前應用程式所需的所有Executor都配置設定完畢,Executor反向注冊完畢後,Driver開始執行main程式。
YARN模式運作機制
YARN Client模式
1、在YARN Client模式下,Driver在任務送出的本地機器上運作;
2、Driver啟動後會和ResourceManager通訊申請啟動ApplicationMaster;
3、随後ResourceManager配置設定container,在合适的NodeManager上啟動ApplicationMaster,此時的ApplicationMaster的功能相當于一個ExecutorLaucher(執行者發射器),隻負責向ResourceManager申請Executor記憶體;
4、ResourceManager接到ApplicationMaster的資源申請後會配置設定container,然後ApplicationMaster在資源配置設定指定的NodeManager上啟動Executor程序;
5、Executor程序啟動後會向Driver反向注冊;
6、Executor全部注冊完成後Driver開始執行main函數;
7、之後執行到Action算子時,觸發一個job,并根據寬依賴開始劃分stage;
8、每個stage生成對應的taskSet,之後将task分發到各個Executor上執行。
YARN Cluster模式
1、在YARN Cluster模式下,任務送出後會和ResourceManager通訊申請啟動ApplicationMaster;
2、随後ResourceManager配置設定container,在合适的NodeManager上啟動ApplicationMaster;(此時的ApplicationMaster就是Driver)
3、Driver啟動後向ResourceManager申請Executor記憶體,ResourceManager接到ApplicationMaster的資源申請後會配置設定container,然後在合适的NodeManager上啟動Executor程序;
4、Executor程序啟動後會向Driver反向注冊;
5、Executor全部注冊完成後Driver開始執行main函數;
6、之後執行到Action算子時,觸發一個job,并根據寬依賴開始劃分stage;
7、每個stage生成對應的taskSet,之後将task分發到各個Executor上執行。
Spark通信架構概述
Spark2.x版本使用Netty通訊架構作為内部通訊元件。Spark基于Netty新的rpc架構借鑒了Akka中的設計,它是基于Actor模型,如下圖所示:
Spark通訊架構中各個元件(Client/Master/Worker)可以認為是一個個獨立的實體,各個實體之間通過消息來進行通信。具體各個元件之間的關系如下:
Endpoint(Client/Master/Worker)有一個InBox和N個OutBox(N>=1,N取決于目前Endpoint與多少其他的Endpoint進行通信,一個與其通訊的其他Endpoint對應一個OutBox),Endpoint接收到的消息被寫入InBox,發送出去的消息寫入OutBox并被發送到其他Endpoint的InBox中。
Spark通訊架構解析
Spark通信架構如下圖所示:
RpcEndpoint:RPC端點,Spark針對每個節點(Client/Master/Worker)都稱之為一個Rpc 端點,且都實作RpcEndpoint接口,内部根據不同端點的需求,設計不同的消息和不同的業務處理,如果需要發送(詢問)則調用 Dispatcher;
RpcEnv:RPC上下文環境,每個RPC端點運作時依賴的上下文環境稱為 RpcEnv;
Dispatcher:消息分發器,針對于RPC端點需要發送消息或者從遠端 RPC 接收到的消息,分發至對應的指令收件箱/發件箱。如果指令接收方是自己則存入收件箱,如果指令接收方不是自己,則放入發件箱;
Inbox:指令消息收件箱,一個本地RpcEndpoint對應一個收件箱,Dispatcher在每次向Inbox存入消息時,都将對應EndpointData加入内部ReceiverQueue中,另外Dispatcher建立時會啟動一個單獨線程進行輪詢ReceiverQueue,進行收件箱消息消費;
RpcEndpointRef:RpcEndpointRef是對遠端RpcEndpoint的一個引用。當我 們需要向一個具體的RpcEndpoint發送消息時,一般我們需要擷取到該RpcEndpoint的引用,然後通過該應用發送消息。
OutBox:指令消息發件箱,對于目前RpcEndpoint來說,一個目标RpcEndpoint對應一個發件箱,如果向多個目标RpcEndpoint發送資訊,則有多個OutBox。當消息放入Outbox後,緊接着通過TransportClient将消息發送出去。消息放入發件箱以及發送過程是在同一個線程中進行;
RpcAddress:表示遠端的RpcEndpointRef的位址,Host + Port。
TransportClient:Netty通信用戶端,一個OutBox對應一個TransportClient,TransportClient不斷輪詢OutBox,根據OutBox消息的receiver資訊,請求對應的遠端TransportServer;
TransportServer:Netty通信服務端,一個RpcEndpoint對應一個TransportServer,接受遠端消息後調用 Dispatcher分發消息至對應收發件箱;
根據上面的分析,Spark通信架構的高層視圖如下圖所示:
在Spark中由SparkContext負責與叢集進行通訊、資源的申請以及任務的配置設定和監控等。當 Worker節點中的Executor運作完畢Task後,Driver同時負責将SparkContext關閉。
通常也可以使用SparkContext來代表驅動程式(Driver)。
SparkContext是使用者通往Spark叢集的唯一入口,可以用來在Spark叢集中建立RDD、累加器和廣播變量。
SparkContext也是整個Spark應用程式中至關重要的一個對象,可以說是整個Application運作排程的核心(不包括資源排程)。
SparkContext的核心作用是初始化Spark應用程式運作所需的核心元件,包括高層排程器(DAGScheduler)、底層排程器(TaskScheduler)和排程器的通信終端(SchedulerBackend),同時還會負責Spark程式向Cluster Manager的注冊等。
在實際的編碼過程中,我們會先建立SparkConf執行個體,并對SparkConf的屬性進行自定義設定,随後,将SparkConf作為SparkContext類的唯一構造參數傳入來完成SparkContext執行個體對象的建立。SparkContext在執行個體化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend,當RDD的action算子觸發了作業(Job)後,SparkContext會調用DAGScheduler根據寬窄依賴将Job劃分成幾個小的階段(Stage),TaskScheduler會排程每個Stage的任務(Task),另外,SchedulerBackend負責申請和管理叢集為目前Application配置設定的計算資源(即Executor)。
如果我們将Spark Application比作汽車,那麼SparkContext就是汽車的引擎,而SparkConf就是引擎的配置參數。
下圖描述了Spark-On-Yarn模式下在任務排程期間,ApplicationMaster、Driver以及Executor内部子產品的互動過程:
Driver初始化SparkContext過程中,會分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并啟動SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通過ApplicationMaster申請資源,并不斷從TaskScheduler中拿到合适的Task分發到Executor執行。HeartbeatReceiver負責接收Executor的心跳資訊,監控Executor的存活狀況,并通知到TaskScheduler。
在工廠環境下,Spark叢集的部署方式一般為YARN-Cluster模式,之後的核心分析内容中我們預設叢集的部署方式為YARN-Cluster模式。
Spark任務送出流程
Spark YARN-Cluster模式下的任務送出流程
下面的時序圖清晰地說明了一個Spark應用程式從送出到運作的完整流程:
1、送出一個Spark應用程式,首先通過Client向ResourceManager請求啟動一個Application,同時檢查是否有足夠的資源滿足Application的需求,如果資源條件滿足,則準備ApplicationMaster的啟動上下文,交給ResourceManager,并循環監控Application狀态。
2、當送出的資源隊列中有資源時,ResourceManager會在某個 NodeManager上啟動ApplicationMaster程序,ApplicationMaster會單獨啟動Driver背景線程,當Driver啟動後,ApplicationMaster會通過本地的RPC連接配接Driver,并開始向ResourceManager申請Container資源運作Executor程序(一個Executor對應與一個Container),當ResourceManager傳回Container資源,ApplicationMaster則在對應的Container上啟動Executor。
3、Driver線程主要是初始化SparkContext對象,準備運作所需的上下文,然後一方面保持與ApplicationMaster的RPC連接配接,通過ApplicationMaster申請資源,另一方面根據使用者業務邏輯開始排程任務,将任務下發到已有的空閑Executor上。
4、當ResourceManager向ApplicationMaster傳回Container資源時,ApplicationMaster就嘗試在對應的Container上啟動Executor程序,Executor程序起來後,會向Driver反向注冊,注冊成功後保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢後,将任務狀态上報給 Driver。
從上述時序圖可知,Client隻負責送出Application并監控Application 的狀态。對于Spark的任務排程主要是集中在兩個方面: 資源申請和任務分發,其主要是通過ApplicationMaster、Driver以及Executor之間來完成。
Spark任務排程概述
當Driver起來後,Driver則會根據使用者程式邏輯準備任務,并根據Executor資源情況逐漸分發任務。在詳細闡述任務排程前,首先說明下Spark裡的幾個概念。一個Spark應用程式包括Job、Stage以及Task三個概念:
Job是以Action方法為界,遇到一個Action方法則觸發一個Job;
Stage是Job的子集,以RDD寬依賴(即 Shuffle)為界,遇到Shuffle做一次劃分;
Task是Stage的子集,以并行度(分區數)來衡量,分區數是多少,則有多少個task。
Spark的任務排程總體來說分兩路進行,一路是Stage級的排程,一路是Task級的排程,總體排程流程如下圖所示:
Spark RDD通過其Transactions操作,形成了RDD血緣關系圖,即DAG,最後通過Action的調用,觸發Job并排程執行。DAGScheduler負責Stage級的排程,主要是将job切分成若幹個Stage,并将每個Stage打包成TaskSet交給TaskScheduler排程。TaskScheduler負責Task級的排程,将DAGScheduler給過來的TaskSet按照指定的排程政策分發到Executor上執行,排程過程中SchedulerBackend負責提供可用資源,其中SchedulerBackend有多種實作,分别對接不同的資源管理系統。
Spark Stage級排程
Spark的任務排程是從DAG切割開始,主要是由DAGScheduler來完成。當遇到一個Action操作後就會觸發一個Job的計算,并交給DAGScheduler來送出,下圖是涉及到Job送出的相關方法調用流程圖。
Job由最終的RDD和Action方法封裝而成,SparkContext 将Job交給DAGScheduler送出,它會根據RDD的血緣關系構成的DAG進行切分,将一個Job劃分為若幹Stages,具體劃分政策是,由最終的RDD不斷通過依賴回溯判斷父依賴 是否是寬依賴,即以Shuffle為界,劃分Stage,窄依賴的RDD之間被劃分到同一個Stage中,可以進行pipeline式的計算,如上圖紫色流程部分。劃分的Stages分兩類,一類叫做ResultStage,為DAG最下遊的Stage,由Action方法決定,另一類叫做ShuffleMapStage,為下遊Stage準備資料,下面看一個簡單的例子WordCount。
Job由saveAsTextFile觸發,該Job由RDD-3和saveAsTextFile方法組成,根據RDD之間的依賴關系從RDD-3開始回溯搜尋,直到沒有依賴的RDD-0,在回溯搜尋過程中,RDD-3依賴RDD-2,并且是寬依賴,是以在RDD-2和RDD-3之間劃分Stage,RDD-3被劃到最後一個Stage,即ResultStage中,RDD-2依賴RDD-1,RDD-1依賴RDD-0,這些依賴都是窄依賴,是以将RDD-0、RDD-1和RDD-2劃分到同一個 Stage,即 ShuffleMapStage中,實際執行的時候,資料記錄會一氣呵成地執行RDD-0到RDD-2的轉化。不難看出,其本質上是一個深度優先搜尋算法。一個Stage是否被送出,需要判斷它的父Stage是否執行,隻有在父Stage執行完畢才能送出目前Stage,如果一個Stage沒有父Stage,那麼從該Stage開始送出。Stage送出時會将Task資訊(分區資訊以及方法等)序列化并被打包成TaskSet 交給TaskScheduler,一個Partition對應一個Task,另一方面TaskScheduler會監控Stage的運作狀态,隻有Executor丢失或者Task由于Fetch失敗才需要重新送出失敗的Stage以排程運作失敗的任務,其他類型的Task失敗會在TaskScheduler的排程過程中重試。相對來說DAGScheduler做的事情較為簡單,僅僅是在Stage層面上劃分DAG,送出Stage并監控相關狀态資訊。TaskScheduler則相對較為複雜,下面詳細闡述其細節。
Spark Task級排程
Spark Task的排程是由TaskScheduler來完成,DAGScheduler将Stage打包到TaskSet交給TaskScheduler,TaskScheduler會将TaskSet封裝為TaskSetManager加入到排程隊列中,TaskSetManager結構如下圖所示。
TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager為單元來排程任務。
TaskScheduler初始化後會啟動SchedulerBackend,它負責跟外界打交道,接收Executor的注冊資訊,并維護Executor的狀态,是以說SchedulerBackend是管“糧食”的,同時它在啟動後會定期地去“詢問”TaskScheduler有沒有任務要運作,也就是說,它會定期地“問”TaskScheduler“我有這麼餘量,你要不要啊”,TaskScheduler在SchedulerBackend“問 ”它的時候,會從排程隊列中按照指定的排程政策選擇TaskSetManager去排程運作,大緻方法調用流程如下圖所示:
将TaskSetManager加入rootPool排程池中之後,調用SchedulerBackend的riviveOffers方法給driverEndpoint發送ReviveOffer消息;driverEndpoint收到ReviveOffer消息後調用makeOffers方法,過濾出活躍狀态的Executor(這些Executor都是任務啟動時反向注冊到Driver的Executor),然後将Executor封裝成WorkerOffer對象;準備好計算資源(WorkerOffer)後,taskScheduler基于這些資源調用resourceOffer在Executor上配置設定task。
前面講到,TaskScheduler會先把DAGScheduler給過來的TaskSet封裝成TaskSetManager扔到任務隊列裡,然後再從任務隊列裡按照一定的規則把它們取出來在SchedulerBackend給過來的Executor上運作。這個排程過程實際上還是比較粗粒度的,是面向TaskSetManager的。排程隊列的層次結構如下圖所示:
TaskScheduler是以樹的方式來管理任務隊列,樹中的節點類型為Schdulable,葉子節點為TaskSetManager,非葉子節點為Pool,下圖是它們之間的繼承關系。
TaskScheduler支援兩種排程政策,一種是FIFO,也是預設的排程政策,另一種是FAIR。在TaskScheduler初始化過程中會執行個體化rootPool,表示樹的根節點,是Pool類型。
1、FIFO排程政策
FIFO排程政策執行步驟如下:
1)對s1和s2兩個Schedulable的優先級(Schedulable類的一個屬性,記為priority,值越小,優先級越高);
2)如果兩個Schedulable的優先級相同,則對s1,s2所屬的Stage的身份進行辨別進行比較(Schedulable類的一個屬性,記為priority,值越小,優先級越高);
3)如果比較的結果小于0,則優先排程s1,否則優先排程s2。
2、FAIR 排程政策
FAIR 排程政策的樹結構如下圖所示:
FAIR模式中有一個rootPool和多個子Pool,各個子Pool中存儲着所有待配置設定的TaskSetMagager。
可以通過在Properties中指定spark.scheduler.pool屬性,指定排程池中的某個排程池作為TaskSetManager的父排程池,如果根排程池不存在此屬性值對應的排程池,會建立以此屬性值為名稱的排程池作為TaskSetManager的父排程池,并将此排程池作為根排程池的子排程池。
在FAIR模式中,需要先對子Pool進行排序,再對子Pool裡面的TaskSetMagager進行排序,因為Pool和TaskSetMagager都繼承了Schedulable特質,是以使用相同的排序算法。
排序過程的比較是基于Fair-share來比較的,每個要排序的對象包含三個屬性:runningTasks值(正在運作的Task數)、minShare值、weight值,比較時會綜合考量runningTasks值,minShare值以及weight值。
注意,minShare、weight的值均在公平排程配置檔案fairscheduler.xml中被指定,排程池在建構階段會讀取此檔案的相關配置。
1)如果A對象的runningTasks大于它的minShare,B對象的runningTasks小于它的minShare,那麼B排在A前面;(runningTasks比minShare小的先執行)
2)如果A、B對象的runningTasks都小于它們的minShare,那麼就比較runningTasks與minShare的比值(minShare使用率),誰小誰排前面;(minShare使用率低的先執行)
3)如果A、B對象的runningTasks都大于它們的minShare,那麼就比較runningTasks與weight的比值(權重使用率),誰小誰排前面。(權重使用率低的先執行)
4)如果上述比較均相等,則比較名字。
整體上來說就是通過minShare和weight這兩個參數控制比較過程,可以做到讓minShare使用率和權重使用率少(實際運作task比例較少)的先運作。
FAIR模式排序完成後,所有的TaskSetManager被放入一個ArrayBuffer裡,之後依次被取出并發送給Executor執行。
從排程隊列中拿到TaskSetManager後,由于TaskSetManager封裝了一個Stage的所有Task,并負責管理排程這些Task,那麼接下來的工作就是TaskSetManager按照一定的規則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。
本地化排程
DAGScheduler切割Job,劃分Stage,通過調用submitStage來送出一個Stage對應的tasks,submitStage會調用submitMissingTasks,submitMissingTasks确定每個需要計算的task的preferredLocations,通過調用getPreferrdeLocations()得到partition的優先位置,由于一個partition對應一個task,此partition的優先位置就是task的優先位置,對于要送出到TaskScheduler的TaskSet中的每一個task,該task優先位置與其對應的partition對應的優先位置一緻。從排程隊列中拿到TaskSetManager後,那麼接下來的工作就是TaskSetManager按照一定的規則一個個取出task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。前面也提到,TaskSetManager封裝了一個Stage的所有task,并負責管理排程這些task。根據每個task的優先位置,确定task的Locality級别,Locality一共有五種,優先級由高到低順序:
在排程執行時,Spark排程總是會盡量讓每個task以最高的本地性級别來啟動,當一個task以X本地性級别啟動,但是該本地性級别對應的所有節點都沒有空閑資源而啟動失敗,此時并不會馬上降低本地性級别啟動而是在某個時間長度内再次以X本地性級别來啟動該task,若超過限時時間則降級啟動,去嘗試下一個本地性級别,依次類推。可以通過調大每個類别的最大容忍延遲時間,在等待階段對應的Executor可能就會有相應的資源去執行此task,這就在在一定程度上提到了運作性能。
失敗重試與黑名單機制
除了選擇合适的Task排程運作外,還需要監控Task的執行狀态,前面也提到,與外部打交道的是SchedulerBackend,Task被送出到Executor啟動執行後,Executor會将執行狀态上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler,TaskScheduler找到該Task對應的TaskSetManager,并通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀态,對于失敗的Task,會記錄它失敗的次數,如果失敗次數還沒有超過最大重試次數,那麼就把它放回待排程的Task池子中,否則整個Application失敗。在記錄Task失敗次數過程中,會記錄它上一次失敗所在的ExecutorId和Host,這樣下次再排程這個Task時,會使用黑名單機制,避免它被排程到上一次失敗的節點上,起到一定的容錯作用。黑名單記錄Task上一次失敗所在的ExecutorId和Host,以及其對應的“拉黑”時間,“拉黑”時間是指這段時間内不要再往這個節點上排程這個Task了。
ShuffleMapStage與FinalStage
在劃分stage時,最後一個stage成為FinalStage,它本質上是一個ResultStage對象,前面的所有stage被稱為ShuffleMapStage。
ShuffleMapStage的結束伴随着shuffle檔案的寫磁盤。
ResultStage基本上對應代碼中的action算子,即将一個函數應用在RDD的各個partition的資料集上,意味着一個job的運作結束。
Shuffle中的任務個數
map端task個數的确定
Shuffle過程中的task個數由RDD分區數決定,而RDD的分區個數與參數spark.default.parallelism有密切關系。
在Yarn Cluster模式下,如果沒有手動設定spark.default.parallelism,則有:
Others: total number of cores on all executor nodes or 2, whichever is larger. spark.default.parallelism = max(所有executor使用的core總數,2)
如果進行了手動配置,則:
spark.default.parallelism = 配置值
還有一個重要的配置:
The maximum number of bytes to pack into a single partition when reading files. spark.files.maxPartitionBytes = 128 M (預設)
代表着rdd的一個分區能存放資料的最大位元組數,如果一個400MB的檔案,隻分了兩個區,則在action時會發生錯誤。
當一個spark應用程式執行時,生成sparkContext,同時會生成兩個參數,由上面得到的spark.default.parallelism推導出這兩個參數的值:
sc.defaultParallelism = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism,2)
當以上參數确定後,就可以推算RDD分區數目了。
(1)通過scala集合方式parallelize生成的RDD
val rdd = sc.parallelize(1 to 10)
這種方式下,如果在parallelize操作時沒有指定分區數,則有:
rdd的分區數 = sc.defaultParallelism
(2)在本地檔案系統通過textFile方式生成的RDD
val rdd = sc.textFile("path/file")
rdd的分區數 = max(本地file的分片數,sc.defaultMinPartitions)
(3)在HDFS檔案系統生成的RDD
rdd的分區數 = max(HDFS檔案的Block數目,sc.defaultMinPartitions)
(4)從HBase資料表擷取資料并轉換為RDD
rdd的分區數 = Table的region個數
(5)通過擷取json(或者parquet等等)檔案轉換成的DataFrame
rdd的分區數 = 該檔案在檔案系統中存放的Block數目
(6)Spark Streaming擷取Kafka消息對應的分區數
基于Receiver:
在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的,是以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理資料上的并行度。
基于DirectDStream:
Spark會建立跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取資料,是以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
reduce端task個數的确定
Reduce端進行資料的聚合,一部分聚合算子可以手動指定reduce task的并行度,如果沒有指定,則以map端的最後一個RDD的分區數作為其分區數,那麼分區數就決定了reduce端的task的個數。
reduce端資料的讀取
根據stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task會先執行,那麼後執行的reduce task如何知道從哪裡去拉去map task落盤後的資料呢?
reduce端的資料拉取過程如下:
1、map task執行完畢後會将計算狀态以及磁盤小檔案位置等資訊封裝到mapStatue對象中,然後由本程序中的MapOutPutTrackerWorker對象将mapstatus對象發送給Driver程序的MapOutPutTrackerMaster對象;
2、在reduce task開始執行之前會先讓本程序中的MapOutPutTrackerWorker向Driver程序中的MapOutPutTrackerMaster發動請求,請求磁盤小檔案位置資訊;
3、當所有的Map task執行完畢後,Driver程序中的MapOutPutTrackerMaster就掌握了所有的磁盤小檔案的位置資訊。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小檔案的位置資訊;
4、完成之前的操作之後,由BlockerTransforService去Executor所在的節點拉資料,預設會啟動五個子線程。每次拉取的資料量不能超過48M(reduce task每次最多拉取48M資料,将拉來的資料存儲到Executor記憶體的20%記憶體中)。
HashShuffle解析
以下的讨論都假設每個Executor有一個CPU core。
1、未經優化的HashShuffleManager
shuffle write階段,主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的算子(比如reduceByKey),而将每個task處理的資料按key進行“劃分”。所謂“劃分”,就是對相同的key執行hash算法,進而将相同key都寫入同一個磁盤檔案中,而每一個磁盤檔案都隻屬于下遊stage的一個task。在将資料寫入磁盤之前,會先将資料寫入記憶體緩沖中,當記憶體緩沖填滿之後,才會溢寫到磁盤檔案中去。
下一個stage的task有多少個,目前stage的每個task就要建立多少份磁盤檔案。比如下一個stage總共有100個task,那麼目前stage的每個task都要建立100份磁盤檔案。如果目前stage有50個task,總共有10個Executor,每個Executor執行5個task,那麼每個Executor上總共要建立500個磁盤檔案,所有Executor上會建立5000個磁盤檔案。由此可見,未經優化的shuffle write操作所産生的磁盤檔案的數量是極其驚人的。
shuffle read階段,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要将上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的集合或連結等操作。由于shuffle write的過程中,map task個下遊stage的每個reduce task都建立了一個磁盤檔案,是以shuffle read的過程中,每個reduce task隻要從上遊stage的所有map task所在的節點上,拉取屬于自己的那一個磁盤檔案即可。
shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都隻能拉取與buffer緩沖相同大小的資料,然後通過你村中的一個Map進行聚合等操作。聚合完一批資料後,再拉取下一批資料,并放到buffer緩沖中進行聚合操作。以此類推,知道最後将所有資料到拉取完,并得到最終的結果。
未經優化的HashShuffleManager工作原理如下圖所示:
2、優化後的HashShuffleManager
為了優化HashShuffleManager我們可以設定一個參數,spark.shuffle.consolidateFiles,該參數預設值為false,将其設定為true即可開啟優化機制,通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。
開啟consolidate機制之後,在shuffle write過程中,task就不是為了下遊stage的每個task建立一個磁盤檔案了,此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤檔案,磁盤檔案的數量與下遊stage的task數量是相同的。一個Executor上有多少個CPU core,就可以并行執行多少個task。而第一批并行執行的每個task都會闖将一個shuffleFileGroup,并将資料寫入對應的磁盤檔案内。
當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會複用之前已有的shuffleFileGroup,包括其中的磁盤檔案,也就是說,此時task會将資料寫入已有的磁盤檔案中,而不會寫入新的磁盤檔案中。是以,consolidate機制允許不同的task複用同一批磁盤檔案,這樣就可以有效将多個task的磁盤檔案進行一定程度上的合并,進而大幅度減少磁盤檔案的數量,進而提升shuffle write的性能。
假設第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數為1),每個Executor執行5個task。那麼原本使用未經優化的HashSHuffleManager時,每個Executor會産生500個磁盤檔案,所有Executor會産生5000個磁盤檔案的。但是此時經過優化之後,每個Executor建立的磁盤檔案的數量的計算公式為:CPU core的數量 * 下一個stage的task數量,也就是說,每個Executor此時隻會建立100個磁盤檔案,所有Executor隻會建立1000個磁盤檔案。
優化後的HashShuffleManager工作原理如下圖所示:
SortShuffle解析
SortShuffleManager的運作機制主要分為兩種,一種是普通運作機制,另一種是bypass運作機制。當shuffle read task的數量小于等于spark.shuffle.sort.bypassMergeThreshold參數的值時(預設為200),就會啟用bypass機制。
1、普通運作機制
在該模式下,資料會先寫入一個記憶體資料結構中此時根據不同的shuffle算子,可能選用不同的資料結構,如果是reduceByKey這種聚合類的shuffle算子,那麼會選用Map資料結構,一邊通過Map進行聚合,一邊寫入記憶體;如果是join這種普通的shuffle算子,那麼會選用Array資料結構,直接寫入記憶體。接着,每寫一條資料進如記憶體資料結構之後,就會判斷一下,是否達到了某個臨界門檻值。如果達到臨界門檻值的話,那麼就會嘗試将記憶體資料結構中的資料溢寫到磁盤,然後清空記憶體資料結構。
在溢寫到磁盤檔案之前,會先根據key對記憶體資料結構中已有的資料進行排序。排序過後,會分批将資料寫入磁盤檔案。預設的batch數量是10000條,也就是說,排序好的資料,會以每批1萬條資料的形式分批寫入磁盤檔案。寫入磁盤檔案是通過Java的BufferedOutputStream實作的。BufferedOutputStream是Java的緩沖輸出流,首先會将資料緩沖在記憶體中,當記憶體緩沖滿溢之後再一次寫入磁盤檔案中,這樣可以減少磁盤IO次數,提升性能。
一個task将所有資料寫入記憶體資料結構的過程中,會發生多次磁盤溢寫操作,也就會産生多個臨時檔案。最後會将之前所有的臨時磁盤檔案都進行合并,這就是merge過程,此時會将之前所有臨時磁盤檔案中的資料讀取出來,然後依次寫入最終的磁盤檔案之中。此外,由于一個task就隻對應一個磁盤檔案,也就意味着該task為下遊stage的task準備的資料都在這一個檔案中,一次你還會單獨寫一份索引檔案,其中辨別了下遊各個task的資料在檔案中的start offset與end offset。
SortShuffleManager由于有一個磁盤檔案merge的過程,是以大大減少了檔案數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由于每個task最終隻有一個磁盤檔案,是以此時每個Executor上隻有5個磁盤檔案,所有Executor隻有50個磁盤檔案。
普通運作機制的SortShuffleManager工作原理如下圖所示:
2、bypass運作機制
bypass運作機制的觸發條件如下:
(1)shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值。
(2)不是聚合類的shuffle算子。
此時,每個task會為每個下遊task都建立一個臨時磁盤檔案,并将資料按key進行hash然後根據key的hash值,将key寫入對應的磁盤檔案之中。當然,寫入磁盤檔案時也是先寫入記憶體緩沖,緩沖寫滿之後再溢寫到磁盤檔案的。最後,同樣會将所有臨時磁盤檔案都合并成一個磁盤檔案,并建立一個單獨的索引檔案。
該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要建立數量驚人的磁盤檔案,隻是在最後會做一個磁盤檔案的合并而已。是以少量的最終磁盤檔案,也讓該機制相對未經優化的HashShuffleManager來說,shuffleread的性能會更好。
而該機制與普通SortShuffleManager運作機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的性能開銷。
在執行Spark應用程式時,Spark叢集會啟動Driver和Executor兩種JVM程序,前者為主要程序,負責建立Spark上下文,送出Spark作業(Job),并将作業轉化為計算任務(Task),在各個Executor程序間協調任務的排程,後者負責在工作節點上執行具體的計算任務,并将結果傳回給Driver,同時為需要持久化的RDD提供存儲功能。
堆内和堆外記憶體規劃
作為一個JVM程序,Executor的記憶體管理建立在JVM的記憶體管理之上,Spark對JVM的堆内(On-heap)空間進行了更為詳細的配置設定,以充分利用記憶體。同時,Spark引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開辟空間,進一步優化了記憶體的使用。
堆内記憶體受到JVM統一管理,堆外記憶體是直接向作業系統進行記憶體的申請和釋放。
1、堆内記憶體
堆内記憶體的大小,由Spark應用程式啟動時的- executor-memory或spark.executor.memory參數配置。Executor内運作的并發任務共享JVM堆内記憶體,這些任務在緩存RDD資料和廣播(Broadcast)資料時占用的記憶體被規劃為存儲(Storage)記憶體,而這些任務在執行Shuffle時占用的記憶體被規劃為執行(Execution)記憶體,剩餘的部分不做特殊規劃,那些Spark内部的對象執行個體,或者使用者定義的Spark應用程式中的對象執行個體,均占用剩餘的空間。不同的管理模式下,這三部分占用的空間大小各不相同。
Spark對堆内記憶體的管理是一種邏輯上的俄“規劃式”的管理,因為對象執行個體占用記憶體的申請和釋放都由JVM完成,Spark隻能在申請後和釋放前記錄這些記憶體。其具體流程如下:
1、Spark在代碼中new一個對象執行個體;
2、JVM從堆内記憶體配置設定空間,建立對象并傳回對象引用;
3、Spark儲存該對象的引用,記錄該對象占用的記憶體。
釋放記憶體流程如下:
1、Spark記錄該對象釋放的記憶體,删除該對象的引用;
2、等待JVM的垃圾回收機制釋放該對象占用的堆内記憶體。
我們知道,JVM的對象可以以序列化的方式存儲,序列化的過程是将對象轉換為二進制位元組流,本質上可以了解為将非連續空間的鍊式存儲轉化為連續空間或塊存儲,在通路時則需要進行序列化的逆過程--反序列化,将位元組流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。
對于Spark中序列化的對象,由于是位元組流的形式,其占用的記憶體大小可直接計算,而對于非序列化的對象,其占用的記憶體是通過周期性地采樣近似估算而得,即并不是每次新增的資料項都會計算一次占用的記憶體大小,這種方法降低了時間開銷但是有可能誤差較大,導緻某一時刻的實際記憶體可能遠遠超出預期。此外,在被Spark标記為釋放的對象執行個體,很有可能在實際上并沒有被JVM回收,導緻實際可用的記憶體小于Spark記錄的可用記憶體。是以Spark并不能準确記錄實際可用的堆内記憶體,進而也就無法完全避免記憶體溢出(OOM,Out of Memory)的異常。
雖然不能精确控制堆内記憶體的申請和釋放,但Spark通過對存儲記憶體和執行記憶體各自獨立的規劃管理,可以決定是否要在存儲記憶體裡緩沖新的RDD,以及是否為新的任務配置設定執行記憶體,在一定程度上可以提升記憶體的使用率,減少異常的出現。
2、堆外記憶體
為了進一步優化記憶體的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開辟空間,存儲經過序列化的二進制資料。
堆外記憶體意味着把記憶體對象配置設定在Java虛拟機的堆以外的記憶體,這些記憶體直接受作業系統管理(而不是虛拟機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。
利用JDK Unsafe API(從spark2.0開始,在管理堆外的存儲記憶體時不再基于Tachyon,而是與堆外的執行記憶體一樣,基于JDK Unsafe API實作),Spark可以直接作業系統堆外記憶體,減少了不必要的記憶體開銷,以及頻繁的GC掃描和回收,提升了處理性能。堆外記憶體可以被精确地申請和釋放(堆外記憶體之是以能夠被精确的申請和釋放,是由于記憶體的申請和釋放不再通過JVM機制,而是直接向作業系統申請,JVM對于記憶體的清理是無法準确指定時間點的,是以無法實作精确的釋放),而且序列化的資料占用的空間可以被精确計算,是以相比堆内記憶體來說降低了管理的難度,也降低了誤差。
在預設情況下堆外記憶體并不啟用,可以通過配置spark.memory.offHeap.enabled參數啟用,并由spark.memory.offHeap.size參數設定堆外空間的大小。除了沒有other空間,堆外記憶體與堆内記憶體的劃分方式相同,所有運作中的并發任務共享存儲記憶體和執行記憶體。
(該部分記憶體主要用于程式的共享庫,Perm Space、線程Stack和一些Memory mapping等,或者類C方式allocate object)
記憶體空間配置設定
1、靜态記憶體管理
在Spark最初采用的靜态記憶體管理機制下,存儲記憶體、執行記憶體和其他記憶體的大小在Spark應用程式運作期間均為固定的,但使用者可以應用程式啟動前進行配置,堆内記憶體的配置設定如下圖所示:
可以看到,可用的堆内記憶體的大小需要按照代碼清單的方式計算:
可用的存儲記憶體 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction
可用的執行記憶體 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction
其中systemMaxMemory取決于目前JVM堆内記憶體的大小,最後可用的執行記憶體或者存儲記憶體要在此基礎上與各自的memoryFraction參數和safetyFraction參數相乘得出。上述計算公式中的兩個safetyFraction參數,其意義在于在邏輯預留出1-safetyFraction這麼一塊保險區域,降低因實際記憶體超出目前預設範圍而導緻OOM的風險(上文提到,對于非序列化對象的記憶體采樣估算會産生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,再具體使用時Spark并沒有差別對待,和“其他記憶體”一樣交給了JVM去管理。
Storage記憶體和Executor記憶體都有預留白間,目的是防止OOM,因為Spark堆内記憶體大小的記錄是不準确的,需要留出保險區域。
堆外的空間配置設定較為簡單,隻有存儲記憶體和執行記憶體。可用的執行記憶體和存儲記憶體占用的空間大小直接由參數spark.memory.storageFraction決定,由于堆外記憶體占用的空間可以被精确計算,是以無需再設定保險區域。
靜态記憶體管理機制實作起來較為簡單,但如果使用者不熟悉Spark的鵆機制,或沒有根據具體的資料規模和計算任務或做相應的配置,很容易造成“一般海水,一般火焰”的局面,即存儲記憶體和執行記憶體中的一方剩餘大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的内容以存儲新的内容。由于新的記憶體管理機制的出現,這種方式目前已經很少有開發者使用,出于相容舊版本的應用程式的目的,Spark依然保留了它的實作。
2、統一記憶體管理
Spark1.6之後引入的統一記憶體管理機制,與靜态記憶體管理的差別在于存儲記憶體和執行記憶體共享同一塊空間,可以動态占用對方的空閑區域,統一記憶體管理的堆内記憶體結構如下圖所示:
統一記憶體管理的堆外記憶體結構如下圖所示:
其中最重要的優化在于動态占用機制,其規則如下:
1、設定基本的存儲記憶體和執行記憶體區域(spark.storage.storageFraction參數),該設定确定了雙方各自擁有的空間的範圍;
2、雙方的空間都不足時,則存儲到磁盤;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的Block)
3、執行記憶體的空間被對方占用後,可讓對方将占用的部分轉存到磁盤,然後“歸還”借用的空間;
4、存儲記憶體的空間被對方占用後,無法讓對方“歸還”,因為需要考慮Shuffle過程中的很多因素,實作起來較為複雜。
統一記憶體管理的動态占用機制如下圖所示:
憑借統一記憶體管理機制,spark在一定程度上提高了堆内和堆外記憶體資源的使用率,降低了開發者維護spark記憶體的難度。如果存儲記憶體的空間太大或者說緩存的資料過多,反而會導緻頻繁的全量垃圾回收,降低任務執行時的性能,因為緩存的RDD資料通常都是長期主流記憶體的。是以要想充分發揮Spark的性能,需要開發者進一步了解存儲記憶體和執行記憶體各自管理方式和實作原理。
存儲記憶體管理
1、RDD持久化機制
彈性分布式資料集(RDD)作為Spark最根本的資料抽象,是隻讀的分區記錄(Partition)的集合,隻能基于在穩定實體存儲中的資料集上建立,或者在其他已有的RDD上執行轉換(Transformation)操作産生一個新的RDD。轉換後的RDD與原始的RDD之間産生了依賴關系,構成了血統(Lineage)。憑借血統,Spark保證了每一個RDD都可以被重新恢複。但是RDD的所有轉換都是有惰性的,即隻有當一個傳回結果給Driver的行動(Action)發生時,Spark才會建立任務讀取RDD,然後真正觸發轉換的執行。
Task在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查Checkpoint或按照血統重新計算。是以如果一個RDD上要執行多次行動,可以在第一次行動中使用persist或cache方法,在記憶體或磁盤中持久化或緩存這個RDD,進而在後面的行動中提升計算速度。
事實上,cache方法是使用預設的MEMORY_ONLY的存儲級别将RDD持久化到記憶體,故緩存是一種特殊的持久化。堆内和堆外存儲記憶體的設計,便可以對緩存RDD時使用的記憶體做統一的規劃和管理。
RDD的持久化由Spark的Storage子產品負責,實作了RDD與實體存儲的解耦合。Storage子產品負責管理Spark在計算過程中産生的資料,将那些在記憶體或磁盤、在本地或遠端存取資料的功能封裝了起來。在具體實作時Driver端和Executor端的Storage子產品構成了主從式的架構,即Driver端的BlockManager為Master,Executor端的BlockManager為Slave。
Storage子產品在邏輯上以Block為基本存儲機關,RDD的每個Partition經過處理後位移對應一個Block(BlockId的格式為rdd_RDD-ID_PARTITION-ID)。Driver端的Master負責整個Spark應用程式的Block的中繼資料資訊的管理和維護,而Executor端的Slave需要将Block的更新等狀态上報到Master,同時接受Master的指令,例如新增或删除一個RDD。
在對RDD持久化時,Spark規定了MEMORY_ONLY、MEMORY_AND_DISK等7中不同的存儲級别,而存儲級别是以下5個變量的組合:
class StorageLevel private(
private var _useDisk: Boolean, //磁盤
private var _useMemory: Boolean, //這裡其實是指堆内記憶體
private var _useOffHeap: Boolean, //堆外記憶體
private var _deserialized: Boolean, //是否為非序列化
private var _replication: Int = 1 //副本個數
)
Spark中7中存儲級别如下:
通過對資料結構的分析,可以看出存儲級别從三個次元定義了RDD的Partition(同時也就是Block)的存儲方式:
(1)存儲位置:磁盤/堆内記憶體/堆外記憶體。如MEMORY_AND_DISK是同時在磁盤和堆内記憶體上存儲,實作了備援備份。OFF_HEAP則是隻在堆外記憶體存儲,目前選擇堆外記憶體時不能同時存儲到其他位置。
(2)存儲形式:Block緩存到存儲記憶體後,是否為非序列化的形式。如MEMORY_ONLY是非序列化方式存儲,OFF_HEAP是序列化方式存儲。
(3)副本數量:大于1時需要遠端備援備份到其他節點。如DISK_ONLY_2需要遠端備份1個副本。
2、RDD的緩存過程
RDD在緩存到存儲記憶體之前,Partition中的資料一般以疊代器(Iterator)的資料結構來通路,這是Scala語言中一種周遊資料集合的方法。通過Iterator可以擷取分區中每一條序列化或者非序列化的資料項(Record),這些Record的對象執行個體在邏輯上占用了JVM堆内記憶體的other部分的空間,同一Partition的不同Record的存儲空間并不連續。
RDD在緩存到存儲記憶體之後,Partition被轉換成Block,Record在堆内或堆外存儲記憶體中占用一塊連續的空間。将Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為“展開”(Unroll)。
Block有序列化和非序列化兩種存儲格式,具體以哪種方式取決于該RDD的存儲級别。非序列化的Block以一種DeserializedMemoryEntry的資料結構定義,用一個數組存儲所有的對象執行個體,序列化的Block則以SerializedMemoryEntry的資料結構定義,用位元組緩沖區(ByteBuffer)來存儲二進制資料。每個Executor的Storage子產品用一個鍊式Map結構(LinkedHashMap)來管理堆内和堆外存儲記憶體中所有的Block對象的執行個體,對這個LinkedHashMap新增和删除間接記錄了記憶體的申請和釋放。
因為不能保證存儲空間可以一次容納Iterator中的所有資料,目前的計算任務在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時占位,空間不足則Unroll失敗,空間足夠時可以繼續進行。
對于序列化的Partition,其所需的Unroll空間可以直接累加計算,一次申請。
對于非序列化的Partition則要在便利Record的過程中一次申請,即每讀取一條Record,采樣估算其所需的Unroll空間并進行申請,空間不足時可以中斷,釋放已占用的Unroll空間。
如果最終Unroll成功,目前Partition所占用的Unroll空間被轉換為正常的緩存RDD的存儲空間,如下圖所示。
在靜态記憶體管理時,Spark在存儲記憶體中專門劃分了一塊Unroll空間,其大小是固定的,統一記憶體管理時則沒有對Unroll空間進行特别區分,當存儲空間不足時會根據動态占用機制進行處理。
3、淘汰與落盤
由于同一個Executor的所有的計算任務共享有限的存儲記憶體空間,當有新的Block需要緩存單數剩餘空間不足且無法動态占用時,就要對LinkedHashMap中的舊Block進行淘汰(Eviction),而被淘汰的Block如果其存儲級别中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接删除該Block。
存儲記憶體的淘汰規則為:
被淘汰的舊Block要與新的Block的MemoryMode相同,即同屬于堆外或堆内記憶體;
新舊Block不能屬于同一個RDD,避免循環淘汰;
舊Block所屬RDD不能處于被讀狀态,避免引發一緻性問題;
周遊LinkedHashMap中Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新Block所需的空間。其中LRU是LinkedHashMap的特性。
落盤的流程則比較簡單,如果其存儲級别符合_useDisk為true的條件,再根據其_deserialized判斷是否是非序列化的形式,若是則對其進行序列化,最後将資料存儲到磁盤,在Storage子產品中更新其資訊。
執行記憶體管理
執行記憶體主要用來存儲任務再在執行Shuffle時占用的記憶體,Shuffle是按照一定規則對RDD資料重新分區的過程,Shuffle的Write和Read兩階段對執行記憶體的使用:
Shuffle Write
在map端會采用ExternalSorter進行外排,在記憶體中存儲資料時主要占用堆内執行空間。
Shuffle Read
(1)在對reduce端的資料進行聚合時,要将資料交給Aggregator處理,在記憶體中存儲資料時占用堆内執行空間。
(2)如果需要進行最終結果排序,則要将再次将資料交給ExternalSorter處理,占用堆内執行空間。
在ExternalSorter和Aggregator中,Spark會使用一種叫做AppendOnlyMap的哈希表在堆内執行記憶體中存儲資料,但是Shuffle過程中所有資料并不能都儲存到該哈希表中,當這個哈希表占用的記憶體會進行周期性地采樣估算,當其大到一定程度,無法再從MemoryManager申請到新的執行記憶體時,Spark就會将其全部内容存儲到磁盤檔案中,這個過程被稱為溢存(Spill),溢存到磁盤的檔案最後會被歸并(Merge)。
Spark的存儲記憶體和執行記憶體有着截然不同的管理方式:對于存儲記憶體來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要緩存的RDD的Partition轉化而成;而對于執行記憶體,Spark用AppendOnlyMap來存儲Shuffle過程中的資料,在Tungsten排序中甚至抽象稱為頁式記憶體管理,開辟了全新的JVM記憶體管理機制。
九、Spark核心元件解析
BlockManager資料存儲與管理機制
BlockManager是整個Spark底層負責資料存儲與管理的一個元件,Driver和Executor的所有資料都由對應的BlockManager進行管理。
Driver上有BlockManagerMaster,負責對各個節點上的BlockManager内部管理的資料的中繼資料進行維護,比如block的增删改等操作,都會在這裡維護好中繼資料的變更。
每個節點都有一個BlockManager,每個BlockManager建立之後,第一件事即使去向BlockManagerMaster進行注冊,此時BlockManagerMaster會為其建立對應的BlockManagerInfo。
BlockManagerMaster與BlockManager的關系非常像NameNode與DataNode的關系,BlockManagerMaster中儲存BlockManager内部管理資料的中繼資料,進行維護,當BlockManager進行Block增删改等操作時,都會在BlockManagerMaster中進行中繼資料的變更,這與NameNode維護DataNode的中繼資料資訊,DataNode中資料發生變化時NameNode中的中繼資料也會相應變化是一緻的。
每個節點上都有一個BlockManager,BlockManager中有三個非常重要的元件:
DisStore:負責對磁盤資料進行讀寫;
MemoryStore:負責對記憶體資料進行讀寫;
BlockTransferService:負責建立BlockManager到遠端其他節點的BlockManager的連接配接,負責對遠端其他節點的BlockManager的資料進行讀寫;
每個BlockManager建立之後,做的第一件事就是向BlockManagerMaster進行注冊,此時BlockManagerMaster會為其建立對應的BlockManagerInfo。
使用BlockManager進行寫操作時,比如說,RDD運作過程中的一些中間資料,或者我們手動指定了persist(),會優先将資料寫入記憶體中,如果記憶體大小不夠,會使用自己的算法,将記憶體中的部分資料寫入磁盤;此外,如果persist()指定了要replica,那麼會使用BlockTransferService将資料replicate一份到其他節點的BlockManager上去。
使用BlockManager進行讀操作時,比如說,shuffleRead操作,如果能從本地讀取,就利用DisStore或者MemoryStore從本地讀取資料,但是本地沒有資料的話,那麼會用BlockTransferService與有資料的BlockManager建立連接配接,然後用BlockTransferService從遠端BlockManager讀取資料;例如,shuffle Read操作中,很有可能要拉取的資料本地沒有,那麼此時就會從遠端有資料的節點上,找那個節點的BlockManager來拉取需要的資料。
隻要使用BlockManager執行了資料增删改的操作,那麼必須将Block的BlockStatus上報到BlockManagerMaster,在BlockManagerMaster上會對指定BlockManager的BlockManagerInfo内部的BlockStatus進行增删改操作,進而達到中繼資料的維護功能。
Spark共享變量底層實作
Spark一個非常重要的特性就是共享變量。
預設情況下,如果在一個算子的函數中使用到了某個外部的變量,那麼這個變量的值會被拷貝到每個task中,此時每個task隻能操作自己的那份變量副本。如果多個task想要共享某個變量,那麼這種方式是做不到的。
Spark為此提供了兩種共享變量,一種是Broadcast Variable(廣播變量),另一種是Accumulator(累加變量)。Broadcast Variable會将用到的變量,僅僅為每個節點拷貝一份,即每個Executor拷貝一份,更大的用途是優化性能,見上網絡傳輸以及記憶體損耗。Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。Broadcast Variable是共享讀變量,task不能去修改它,而Accumulator可以讓多個task操作一個變量。
廣播變量
廣播變量允許程式設計者在每個Executor上暴力外部資料的隻讀變量,而不是給每個任務發送一個副本。
每個task都會儲存一份它所使用的外部變量的副本,當一個Executor上的多個task都使用一個外部變量時,對于Executor記憶體的消耗是非常大的,是以,我們可以将大型外部變量封裝為廣播變量,此時一個Executor儲存一個變量副本,此Executor上的所有task共用此變量,不再是一個task單獨儲存一個副本,這在一定程度上降低了Spark任務的記憶體占用。
使用外部變量
使用廣播變量
Spark還嘗試使用高效的廣播算法分發廣播變量,以降低通信成本。
Spark提供的Broadcast Variable是隻讀的,并且在每個Executor上隻會有一個副本,而不會為每個task都拷貝一份副本,是以,它的最大作用,就是減少變量到各個節點的網絡傳輸消耗,以及在各個節點上的記憶體消耗。此外,Spark内部也是用了高效的廣播算法來減少網絡消耗。
可以通過調用SparkContext的broadcast()方法來針對每個變量建立廣播變量。然後再算子的函數内,使用到廣播變量時,每個Executor隻會拷貝一份副本了,每個task可以使用廣播變量的value()方法擷取值。
在任務運作時,Executor并不擷取廣播變量,當task執行到使用廣播變量的代碼時,會向Executor的記憶體中請求廣播變量,如下圖所示:
之後Executor會通過BlockManager向Driver拉取廣播變量,然後提供給task進行使用,如下圖所示:
廣播大變量是Spark中常用的基礎優化方法,通過減少記憶體占用實作任務執行性能的提升。
累加器
累加器(accumulator):Accumulator是僅僅被相關操作累加的變量,是以可以在并行中被有效地支援。它們可用于實作計數器(如MapReduce)或總和計數。
Accumulator是存在于Driver端的,叢集上運作的task進行Accumulator的累加,随後把值發送到Driver端,在Driver端彙總(Spark UI在SparkContext建立時被建立,即在Driver端被建立,是以它可以讀取Accumulator的數值),由于Accumulator存在于Driver端,從節點讀取不到Accumulator的數值。
Spark提供的Accumulator主要用于多個節點對一個變量進行共享性的操作。Accumulator隻提供了累加的功能,但是卻給我們提供了多個task對于同一個變量并行操作的功能,但是task隻能對Accumulator進行累加操作,不能讀取它的值,隻有Driver程式可以讀取Accumulator的值。
Accumulator的底層原理如下圖所示:
關注公衆号,獲得精美書籍、資料,擷取第一手最新文章。