spark是一個分布式的記憶體計算架構,其特點是能處理大規模資料,計算速度快。spark延續了hadoop的mapreduce計算模型,相比之下spark的計算過程保持在記憶體中,減少了硬碟讀寫,能夠将多個操作進行合并後計算,是以提升了計算速度。同時spark也提供了更豐富的計算api。
mapreduce是hadoop和spark的計算模型,其特點是map和reduce過程高度可并行化;過程間耦合度低,單個過程的失敗後可以重新計算,而不會導緻整體失敗;最重要的是資料進行中的計算邏輯可以很好的轉換為map和reduce操作。對于一個資料集來說,map對每條資料做相同的轉換操作,reduce可以按條件對資料分組,然後在分組上做操作。除了map和reduce操作之外,spark還延伸出了如filter,flatmap,count,distinct等更豐富的操作。
rdd的是spark中最主要的資料結構,可以直覺的認為rdd就是要處理的資料集。rdd是分布式的資料集,每個rdd都支援mapreduce類操作,經過mapreduce操作後會産生新的rdd,而不會修改原有rdd。rdd的資料集是分區的,是以可以把每個資料分區放到不同的分區上進行計算,而實際上大多數mapreduce操作都是在分區上進行計算的。spark不會把每一個mapreduce操作都發起運算,而是盡量的把操作累計起來一起計算。spark把操作劃分為轉換(transformation)和動作(action),對rdd進行的轉換操作會疊加起來,直到對rdd進行動作操作時才會發起計算。這種特性也使spark可以減少中間結果的吞吐,可以快速的進行多次疊代計算。
spark自身隻對計算負責,其計算資源的管理和排程由第三方架構來實作。常用的架構有yarn和mesos。本文以yarn為例進行介紹。先看一下spark on yarn的系統結構圖:
spark on yarn系統結構圖
圖中共分為三大部分:spark driver, worker, cluster manager。其中driver program負責将rdd轉換為任務,并進行任務排程。worker負責任務的執行。yarn負責計算資源的維護和配置設定。
driver可以運作在使用者程式中,或者運作在其中一個worker上。spark中的每一個應用(application)對應着一個driver。這個driver可以接收rdd上的計算請求,每個動作(action)類型的操作将被作為一個job進行計算。spark會根據rdd的依賴關系建構計算階段(stage)的有向無環圖,每個階段有與分區數相同的任務(task)。這些任務将在每個分區(partition)上進行計算,任務劃分完成後driver将任務送出到運作于worker上的executor中進行計算,并對任務的成功、失敗進行記錄和重新開機等處理。
worker一般對應一台實體機,每個worker上可以運作多個executor,每個executor都是獨立的jvm程序,driver送出的任務就是以線程的形式運作在executor中的。如果使用yarn作為資源排程架構的話,其中一個worker上還會有executor launcher作為yarn的applicationmaster,用于向yarn申請計算資源,并啟動、監測、重新開機executor。
這裡我們從rdd到輸出結果的整個計算過程為主線,探究spark的計算過程。這個計算過程可以分為:
rdd建構:建構rdd之間的依賴關系,将rdd轉換為階段的有向無環圖。
任務排程:根據空閑計算資源情況進行任務送出,并對任務的運作狀态進行監測和處理。
任務計算:搭建任務運作環境,執行任務并傳回任務結果。
shuffle過程:兩個階段之間有寬依賴時,需要進行shuffle操作。
計算結果收集:從每個任務收集并彙總結果。
在這裡我們用一個簡潔的charcount程式為例,這個程式把含有a-z字元的清單轉化為rdd,對此rdd進行了map和reduce操作計算每個字母的頻數,最後将結果收集。其代碼如下:
charcount例子程式
rdd按照其作用可以分為兩種類型,一種是對資料源的封裝,可以把資料源轉換為rdd,這種類型的rdd包括newhadooprdd,parallelcollectionrdd,jdbcrdd等。另一種是對rdd的轉換,進而實作一種計算方法,這種類型的rdd包括mappedrdd,shuffledrdd,filteredrdd等。資料源類型的rdd不依賴于其他rdd,計算類的rdd擁有自己的rdd依賴。
rdd有三個要素:分區,依賴關系,計算邏輯。分區是保證rdd分布式的特性,分區可以對rdd的資料進行劃分,劃分後的分區可以分布到不同的executor中,大部分對rdd的計算都是在分區上進行的。依賴關系維護着rdd的計算過程,每個計算類型的rdd在計算時,會将所依賴的rdd作為資料源進行計算。根據一個分區的輸出是否被多分區使用,spark還将依賴分為窄依賴和寬依賴。rdd的計算邏輯是其功能的展現,其計算過程是以所依賴的rdd為資料源進行的。
例子中共産生了三個rdd,除了第一個rdd之外,每個rdd與上級rdd有依賴關系。
spark.parallelize(data, partitionsize)方法将産生一個資料源型的parallelcollectionrdd,這個rdd的分區是對清單資料的切分,沒有上級依賴,計算邏輯是直接傳回分區資料。
map函數将會建立一個mappedrdd,其分區與上級依賴相同,會有一個依賴于parallelcollectionrdd的窄依賴,計算邏輯是對parallelcollectionrdd的資料做map操作。
reducebykey函數将會産生一個shuffledrdd,分區數量與上面的mappedrdd相同,會有一個依賴于mappedrdd的寬依賴,計算邏輯是shuffle後在分區上的聚合操作。
rdd的依賴關系
spark在遇到動作類操作時,就會發起計算job,把rdd轉換為任務,并發送任務到executor上執行。從rdd到任務的轉換過程是在dagscheduler中進行的。其總體思路是根據rdd的依賴關系,把窄依賴合并到一個階段中,遇到寬依賴則劃分出新的階段,最終形成一個階段的有向無環圖,并根據圖的依賴關系先後送出階段。每個階段按照分區數量劃分為多個任務,最終任務被序列化并送出到executor上執行。
rdd到task的建構過程
當rdd的動作類操作被調用時,rdd将調用sparkcontext開始送出job,sparkcontext将調用dagscheduler把rdd轉化為階段的有向無環圖,然後首先将有向無環圖中沒有未完成的依賴的階段進行送出。在階段被送出時,每個階段将産生與分區數量相同的任務,這些任務稱之為一個taskset。任務的類型分為 shufflemaptask和resulttask,如果階段的輸出将用于下個階段的輸入,也就是需要進行shuffle操作,則任務類型為shufflemaptask。如果階段的輸入即為job結果,則任務類型為resulttask。任務建立完成後會交給taskschedulerimpl進行taskset級别的排程執行。
在任務排程的分工上,dagscheduler負責總體的任務排程,schedulerbackend負責與executors通信,維護計算資源資訊,并負責将任務序列化并送出到executor。tasksetmanager負責對一個階段的任務進行管理,其中會根據任務的資料本地性選擇優先送出的任務。taskschedulerimpl負責對taskset進行排程,通過排程政策确定taskset優先級。同時是一個中介者,其将dagscheduler,schedulerbackend和tasksetmanager聯結起來,對executor和task的相關事件進行轉發。
在任務送出流程上,dagscheduler送出taskset到taskschedulerimpl,使taskset在此注冊。taskschedulerimpl通知schedulerbackend有新的任務進入,schedulerbackend調用makeoffers根據注冊到自己的executors資訊,确定是否有計算資源執行任務,如有資源則通知taskschedulerimpl去配置設定這些資源。 taskschedulerimpl根據taskset排程政策優先配置設定taskset接收此資源。tasksetmanager再根據任務的資料本地性,确定送出哪些任務。最終任務的閉包被schedulerbackend序列化,并傳輸給executor進行執行。
spark的任務排程
根據以上過程,spark中的任務排程實際上分了三個層次。第一層次是基于階段的有向無環圖進行stage的排程,第二層次是根據排程政策(fifo,fair)進行taskset排程,第三層次是根據資料本地性(process,node,rack)在taskset内進行排程。
任務的計算過程是在executor上完成的,executor監聽來自schedulerbackend的指令,接收到任務時會啟動taskrunner線程進行任務執行。在taskrunner中首先将任務和相關資訊反序列化,然後根據相關資訊擷取任務所依賴的jar包和所需檔案,完成準備工作後執行任務的run方法,實際上就是執行shufflemaptask或resulttask的run方法。任務執行完畢後将結果發送給driver進行處理。
在task.run方法中可以看到shufflemaptask和resulttask有着不同的計算邏輯。shufflemaptask是将所依賴rdd的輸出寫入到shufflewriter中,為後面的shuffle過程做準備。resulttask是在所依賴rdd上應用一個函數,并傳回函數的計算結果。在這兩個task中隻能看到資料的輸出方式,而看不到應有的計算邏輯。實際上計算過程是包含在rdd中的,調用rdd. iterator方法擷取rdd的資料将觸發這個rdd的計算動作(rdd. iterator),由于此rdd的計算過程中也會使用所依賴rdd的資料。進而rdd的計算過程将遞歸向上直到一個資料源類型的rdd,再遞歸向下計算每個rdd的值。需要注意的是,以上的計算過程都是在分區上進行的,而不是整個資料集,計算完成得到的是此分區上的結果,而不是最終結果。
從rdd的計算過程可以看出,rdd的計算過程是包含在rdd的依賴關系中的,隻要rdd之間是連續窄依賴,那麼多個計算過程就可以在同一個task中進行計算,中間結果可以立即被下個操作使用,而無需在程序間、節點間、磁盤上進行交換。
rdd計算過程
shuffle是一個對資料進行分組聚合的操作過程,原資料将按照規則進行分組,然後使用一個聚合函數應用于分組上,進而産生新資料。shuffle操作的目的是把同組資料配置設定到相同分區上,進而能夠在分區上進行聚合計算。為了提高shuffle性能,還可以先在原分區對資料進行聚合(mapsidecombine),然後再配置設定部分聚合的資料到新分區,第三步在新分區上再次進行聚合。
在劃分階段時,隻有遇到寬依賴才會産生新階段,才需要shuffle操作。寬依賴與窄依賴取決于原分區被新分區的使用關系,隻要一個原分區會被多個新分區使用,則為寬依賴,需要shuffle。否則為窄依賴,不需要shuffle。
以上也就是說隻有階段與階段之間需要shuffle,最後一個階段會輸出結果,是以不需要shuffle。例子中的程式會産生兩個階段,第一個我們簡稱map階段,第二個我們簡稱reduce階段。shuffle是通過map階段的shufflemaptask與reduce階段的shuffledrdd配合完成的。其中shufflemaptask會把任務的計算結果寫入shufflewriter,shuffledrdd從shufflereader中讀取資料,shuffle過程會在寫入和讀取過程中完成。以hashshuffle為例,hashshufflewriter在寫入資料時,會決定是否在原分區做聚合,然後根據資料的hash值寫入相應新分區。hashshufflereader再根據分區号取出相應資料,然後對資料進行聚合。
spark的shuffle過程
resulttask任務計算完成後可以得到每個分區的計算結果,此時需要在driver上對結果進行彙總進而得到最終結果。
rdd在執行collect,count等動作時,會給出兩個函數,一個函數在分區上執行,一個函數在分區結果集上執行。例如collect動作在分區上(executor中)執行将iterator轉換為array的函數,并将此函數結果傳回到driver。driver 從多個分區上得到array類型的分區結果集,然後在結果集上(driver中)執行合并array的操作,進而得到最終結果。
spark對于rdd的設計是其精髓所在。用rdd操作資料的感覺就一個字:爽!。想到rdd背後是幾噸重的大資料集,而我們随手調用下map(), reduce()就可以把它轉換來轉換去,一種半兩撥千斤的感覺就會油然而生。我想是以下特性給我們帶來了這些:
rdd把不同來源,不同類型的資料進行了統一,使我們面對rdd的時候就會産生一種信心,就會認為這是某種類型的rdd,進而可以進行rdd的所有操作。
對rdd的操作可以疊加到一起計算,我們不必擔心中間結果吞吐對性能的影響。
rdd提供了更豐富的資料集操作函數,這些函數大都是在mapreduce基礎上擴充的,使用起來很友善。
rdd為提供了一個簡潔的程式設計界面,背後複雜的分布式計算過程對開發者是透明的。進而能夠讓我們把關注點更多的放在業務上。