天天看點

《循序漸進學Spark 》Spark 程式設計模型

本節書摘來自華章出版社《循序漸進學spark 》一書中的第1章,第3節,作者 小象學院 楊 磊,更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

spark機制原理

本書前面幾章分别介紹了spark的生态系統、spark運作模式及spark的核心概念rdd和基本算子操作等重要基礎知識。本章重點講解spark的主要機制原理,因為這是spark程式得以高效執行的核心。本章先從application、job、stage和task等層次闡述spark的排程邏輯,并且介紹fifo、fair等經典算法,然後對spark的重要組成子產品:i/o與通信控制子產品、容錯子產品及shuffle子產品做了深入的闡述。其中,在spark i/o子產品中,資料以資料塊的形式管理,存儲在記憶體、磁盤或者spark叢集中的其他機器上。spark叢集通信機制采用了akka通信架構,在叢集機器中傳遞指令和狀态資訊。另外,容錯是分布式系統的一個重要特性,spark采用了lineage與checkpoint機制來保證容錯性。spark shuffle子產品借鑒了mapreduce的shuffle機制,但在其基礎上進行了改進與創新。

3.1 spark應用執行機制分析

下面對spark application的基本概念和執行機制進行深入介紹。

3.1.1 spark應用的基本概念

spark應用(application)是使用者送出的應用程式。spark運作模式分為:local、standalone、yarn、mesos等。根據spark application的driver program是否在叢集中運作,spark應用的運作方式又可以分為cluster模式和client模式。

下面介紹spark應用涉及的一些基本概念:

1) sparkcontext:spark 應用程式的入口,負責排程各個運算資源,協調各個worker node 上的executor。

2) driver program:運作application的main()函數并建立sparkcontext。

3) rdd:前面已經講過,rdd是spark的核心資料結構,可以通過一系列算子進行操作。當rdd遇到action算子時,将之前的所有算子形成一個有向無環圖(dag)。再在spark中轉化為job(job的概念在後面講述),送出到叢集執行。一個app中可以包含多個job。

4) worker node:叢集中任何可以運作application代碼的節點,運作一個或多個executor程序。

5) executor:為application運作在worker node上的一個程序,該程序負責運作task,并且負責将資料存在記憶體或者磁盤上。每個application都會申請各自的executor來處理任務。

下面介紹spark 應用(application)執行過程中各個元件的概念:

1) task(任務):rdd中的一個分區對應一個task,task是單個分區上最小的處理流程單元。

2) taskset(任務集): 一組關聯的,但互相之間沒有shuffle依賴關系的task集合。

3) stage(排程階段):一個taskset對應的排程階段。每個job會根據rdd的寬依賴關系被切分很多stage,每個stage都包含一個taskset。

4) job(作業): 由action算子觸發生成的由一個或多個stage組成的計算作業。

5) application:使用者編寫的spark的應用程式,由一個或多個job組成。送出到spark之後,spark為application配置設定資源,将程式轉換并執行。

6) dagscheduler:根據job建構基于stage的dag,并送出stage給taskscheduler。

7) taskscheduler:将taskset送出給worker node叢集運作并傳回結果。

以上基本概念之間的關系如圖3-1所示。

3.1.2 spark應用執行機制概要

spark application從送出後到在worker node執行,期間經曆了一系列變換,具體過程如圖3-2所示。

圖3-1 spark基本概念之間的關系

圖3-2 spark 執行流程

如圖3-2所示,前面講過,當rdd遇見action算子之後,觸發job送出。送出後的job在spark中形成了rdd dag有向無環圖(directed acyclic graph)。rdd dag經過dag scheduler排程之後,根據rdd依賴關系被切分為一系列的stage。每個stage包含一組task集合,再經過task scheduler之後,task被配置設定到worker節點上的executor線程池執行。如前文所述,rdd中的每一個邏輯分區對應一個實體的資料塊,同時每個分區對應一個task,是以task也有自己對應的實體資料塊,使用使用者定義的函數來處理。spark出于節約記憶體的考慮,采用了延遲執行的政策,如前文所述,隻有action算子才可以觸發整個操作序列的執行。另外,spark對于中間計算結果也不會重新配置設定記憶體,而是在同一個資料塊上流水線操作。

spark使用blockmanager管理資料塊,在記憶體或者磁盤進行存儲,如果資料不在本節點,則還可以通過遠端節點複制到本機進行計算。在計算時,spark會在具體執行計算的worker節點的executor中建立線程池,executor将需要執行的任務通過線程池來并發執行。

3.1.3 應用送出與執行

spark使用driver程序負責應用的解析、切分stage并排程task到executor執行,包含dagscheduler等重要對象。driver程序的運作地點有如下兩種:

1) driver程序運作在client端,對應用進行管理監控。

2) master節點指定某個worker節點啟動driver程序,負責監控整個應用的執行。

針對這兩種情況,應用送出及執行過程分别如下:

1. driver運作在client

使用者啟動client端,在client端啟動driver程序。在driver中啟動或執行個體化dags-

cheduler等元件。

1)driver向master注冊。

2)worker向master注冊,master通過指令讓worker啟動executor。

3)worker通過建立executorrunner線程,進而executorrunner線程啟動executor-backend程序。

4)executorbackend啟動後,向client端driver程序内的schedulerbackend注冊,是以driver程序就可以發現計算資源。

5)driver的dagscheduler解析應用中的rdd dag并生成相應的stage,每個stage包含的taskset通過taskscheduler配置設定給executor。在executor内部啟動線程池并行化執行task。

2. driver運作在worker節點

使用者啟動用戶端,用戶端送出應用程式給master。

1)master排程應用,指定一個worker節點啟動driver,即scheduler-backend。

2)worker接收到master指令後建立driverrunner線程,在driverrunner線程内建立schedulerbackend程序。driver充當整個作業的主要程序。

3)master指定其他worker節點啟動exeuctor,此處流程和上面相似,worker建立executorrunner線程,啟動executorbackend程序。

4)executorbackend啟動後,向driver的schedulerbackend注冊,這樣driver擷取了計算資源就可以排程和将任務分發到計算節點執行。

schedulerbackend程序中包含dagscheduler,它會根據rdd的dag切分stage,生成taskset,并排程和分發task到executor。對于每個stage的taskset,都會被存放到taskscheduler中。taskscheduler将任務分發到executor,執行多線程并行任務。

圖3-3為spark應用的送出與執行示意圖。

圖3-3 spark應用的送出與執行

3.2 spark排程機制

spark排程機制是保證spark應用高效執行的關鍵。本節從application、job、stage和task的次元,從上層到底層來一步一步揭示spark的排程政策。

3.2.1 application的排程

spark中,每個application對應一個sparkcontext。sparkcontext之間的排程關系取決于spark的運作模式。對standalone模式而言,spark master節點先計算叢集内的計算資源能否滿足等待隊列中的應用對記憶體和cpu資源的需求,如果可以,則master建立spark driver,啟動應用的執行。宏觀上來講,這種對應用的排程類似于fifo政策。在mesos和yarn模式下,底層的資源排程系統的排程政策都是由mesos和yarn決定的。具體分類描述如下:

1. standalone模式

預設以使用者送出application的順序來排程,即fifo政策。每個應用執行時獨占所有資源。如果有多個使用者要共享叢集資源,則可以使用參數spark.cores.max來配置應用在叢集中可以使用的最大cpu核數。如果不配置,則采用預設參數spark.deploy.defaultcore的值來确定。

2. mesos模式

如果在mesos上運作spark,使用者想要靜态配置資源的話,可以設定spark.mesos.coarse為true,這樣mesos變為粗粒度排程模式,然後可以設定spark.cores.max指定叢集中可以使用的最大核數,與上面的standalone模式類似。同時,在mesos模式下,使用者還可以設定參數spark.executor.memory來配置每個executor的記憶體使用量。如果想使mesos在細粒度模式下運作,可以通過mesos://<url-info>設定動态共享cpu core的執行模式。在這種模式下,應用不執行時的空閑cpu資源得以被其他使用者使用,提升了cpu使用率。

3. yarn模式

如果在yarn上運作spark,使用者可以在yarn的用戶端上設定--num-executors 來控制為應用配置設定的executor數量,然後設定--executor-memory指定每個executor的記憶體大小,設定--executor-cores指定executor占用的cpu核數。

3.2.2 job的排程

前面章節提到過,spark應用程式實際上是一系列對rdd的操作,這些操作直至遇見action算子,才觸發job的送出。事實上,在底層實作中,action算子最後調用了runjob函數送出job給spark。其他的操作隻是生成對應的rdd關系鍊。如在rdd.scala程式檔案中,count函數源碼所示。

def count(): long = sc.runjob(this, utils.getiteratorsize _).sum

其中sc為sparkcontext的對象。可見在spark中,對job的送出都是在action算子中隐式完成的,并不需要使用者顯式地送出作業。在sparkcontext中job送出的實作中,最後會調用dagscheduler中的job送出接口。dagscheduler最重要的任務之一就是計算job與task的依賴關系,制定排程邏輯。

job排程的基本工作流程如圖3-4所示,每個job從送出到完成,都要經曆一系列步驟,拆分成以tsk為最小機關,按照一定邏輯依賴關系的執行序列。

圖3-4 job的排程流程

圖3-5則從job排程流程中的細節子產品出發,揭示了工作流程與對應子產品之間的關系。從整體上描述了各個類在job排程流程中的互動關系。

圖3-5 job排程流程細節

在spark1.5.0的排程目錄下的schedulingalgorithm.scala檔案中,描述了spark對job的排程模式。

1. fifo模式

預設情況下,spark對job以fifo(先進先出)的模式進行排程。在schedulingalgorithm.scala檔案中聲明了fifo算法實作。

private[spark] class fifoschedulingalgorithm extends schedulingalgorithm {

  override def comparator(s1: schedulable, s2: schedulable): boolean = {

    //定義優先級

    val priority1 = s1.priority

    val priority2 = s2.priority

    var res = math.signum(priority1 - priority2)

    if (res == 0) {

      val stageid1 = s1.stageid

      val stageid2 = s2.stageid

      //signum是符号函數,傳回0(參數等于0)、1(參數大于0)或-1(參數小于0)。

      res = math.signum(stageid1 - stageid2)

    }

    if (res < 0) {

      true

    } else {

      false

  }

}

2. fair模式

spark在fair的模式下,采用輪詢的方式為多個job配置設定資源,排程job。所有的任務優先級大緻相同,共享叢集計算資源。具體實作代碼在schedulingalgorithm.scala檔案中,聲明如下:

private[spark] class fairschedulingalgorithm extends schedulingalgorithm {

    val minshare1 = s1.minshare

    val minshare2 = s2.minshare

    val runningtasks1 = s1.runningtasks

    val runningtasks2 = s2.runningtasks

    val s1needy = runningtasks1 < minshare1

    val s2needy = runningtasks2 < minshare2

    val minshareratio1 = runningtasks1.todouble / math.max(minshare1, 1.0).todouble

    val minshareratio2 = runningtasks2.todouble / math.max(minshare2, 1.0).todouble

    val tasktoweightratio1 = runningtasks1.todouble / s1.weight.todouble

    val tasktoweightratio2 = runningtasks2.todouble / s2.weight.todouble

    var compare: int = 0

    if (s1needy && !s2needy) {

      return true

    } else if (!s1needy && s2needy) {

      return false

    } else if (s1needy && s2needy) {

      compare = minshareratio1.compareto(minshareratio2)

      compare = tasktoweightratio1.compareto(tasktoweightratio2)

    if (compare < 0) {

    } else if (compare > 0) {

      s1.name < s2.name

3. 配置排程池

dagscheduler建構了具有依賴關系的任務集。taskscheduler負責提供任務給task-setmanager作為排程的先決條件。tasksetmanager負責具體任務集内部的排程任務。排程池(pool)則用于排程每個sparkcontext運作時并存的多個互相獨立無依賴關系的任務集。排程池負責管理下一級的排程池和tasksetmanager對象。

使用者可以通過配置檔案定義排程池的屬性。一般排程池支援如下3個參數:

1)排程模式scheduling mode:使用者可以設定fifo或者fair排程方式。

2)weight:排程池的權重,在擷取叢集資源上權重高的可以擷取多個資源。

3)minishare:代表計算資源中的cpu核數。

使用者可以通過conf/fairscheduler.xml配置排程池的屬性,同時要在sparkconf對象中配置屬性。

3.2.3 stage(排程階段)和tasksetmanager的排程

1. stage劃分

當一個job被送出後,dagscheduler會從rdd依賴鍊的末端觸發,周遊整個rdd依賴鍊,劃分stage(排程階段)。劃分依據主要基于shuffledependency依賴關系。換句話說,當某rdd在計算中需要将資料進行shuffle操作時,這個包含shuffle操作的rdd将會被用來作為輸入資訊,構成一個新的stage。以這個基準作為劃分stage,可以保證存在依賴關系的資料按照正确資料得到處理和運算。在spark1.5.0的源代碼中,dagscheduler.scala中的getparentstages函數的實作從一定角度揭示了stage的劃分邏輯。

/**

 * 對于給定的rdd建構或擷取父stage的連結清單。新的stage建構時會包含參數中提供的firstjobid

 */

private def getparentstages(rdd: rdd[_], firstjobid: int): list[stage] = {

   val parents = new hashset[stage]

   val visited = new hashset[rdd[_]]

   // we are manually maintaining a stack here to prevent stackoverflowerror

   // caused by recursively visiting

   val waitingforvisit = new stack[rdd[_]]

   def visit(r: rdd[_]) {

     if (!visited(r)) {

       visited += r

       // kind of ugly: need to register rdds with the cache here since

       // we can't do it in its constructor because # of partitions is unknown

       /* 周遊rdd的依賴鍊 */

       for (dep <- r.dependencies) {

         dep match {

           /*如果遇見shuffledependency,則依據此依賴關系劃分stage,并添加該stage的父stage到哈希清單中*/

           case shufdep: shuffledependency[_, _, _] =>

             parents += getshufflemapstage(shufdep, firstjobid)

           case _ =>

             waitingforvisit.push(dep.rdd)

      }

2. stage排程

在第一步的stage劃分過程中,會産生一個或者多個互相關聯的stage。其中,真正執行action算子的rdd所在的stage被稱為final stage。dagscheduler會從這個final stage生成作業執行個體。

在stage送出時,dagscheduler首先會判斷該stage的父stage的執行結果是否可用。如果所有父stage的執行結果都可用,則送出該stage。如果有任意一個父stage的結果不可用,則嘗試疊代送出該父stage。所有結果不可用的stage都将會被加入waiting隊列,等待執行,如圖3-6所示。

圖3-6 stage依賴

在圖3-6中,虛箭頭表示依賴關系。stage序号越小,表示stage越靠近上遊。

圖3-6中的stage排程運作順序如圖3-7所示。

圖3-7 stage執行順序

從圖3-7可以看出,上遊父stage先得到執行,waiting queue中的stage随後得到執行。

3. tasksetmanager

每個stage的送出會被轉化為一組task的送出。dagscheduler最終通過調用taskscheduler的接口來送出這組任務。在taskscheduler内部實作中建立了tasksetmanager執行個體來管理任務集taskset的生命周期。事實上可以說每個stage對應一個tasksetmanager。

至此,dagscheduler的工作基本完畢。taskscheduler在得到叢集計算資源時,taskset-manager會配置設定task到具體worker節點上執行。在spark1.5.0的taskschedulerimpl.scala檔案中,送出task的函數實作如下:

override def submittasks(taskset: taskset) {

    val tasks = taskset.tasks

    loginfo("adding task set " + taskset.id + " with " + tasks.length + " tasks")

    this.synchronized {

      /*建立tasksetmanager執行個體以管理stage包含的任務集*/

      val manager = createtasksetmanager(taskset, maxtaskfailures)

      val stage = taskset.stageid

      val stagetasksets =

        tasksetsbystageidandattempt.getorelseupdate(stage, new hashmap[int, tasksetmanager])

      stagetasksets(taskset.stageattemptid) = manager

      val conflictingtaskset = stagetasksets.exists { case (_, ts) =>

        ts.taskset != taskset && !ts.iszombie

      if (conflictingtaskset) {

        throw new illegalstateexception(s"more than one active taskset for stage $stage:" +

          s" ${stagetasksets.toseq.map{_._2.taskset.id}.mkstring(",")}")

      /*将tasksetmanager添加到全局的排程隊列*/

      schedulablebuilder.addtasksetmanager(manager, manager.taskset.properties)

      if (!islocal && !hasreceivedtask) {

        starvationtimer.scheduleatfixedrate(new timertask() {

          override def run() {

            if (!haslaunchedtask) {

              logwarning("initial job has not accepted any resources; " +

                "check your cluster ui to ensure that workers are registered " +

                "and have sufficient resources")

            } else {

              this.cancel()

            }

          }

        }, starvation_timeout_ms, starvation_timeout_ms)

      hasreceivedtask = true

    backend.reviveoffers()

當tasksetmanager進入到排程池中時,會依據job id對tasksetmanager排序,總體上先進入的tasksetmanager先得到排程。對于同一job内的tasksetmanager而言,job id較小的先得到排程。如果有的tasksetmanager父stage還未執行完,則該taskset-manager不會被放到排程池。

3.2.4 task的排程

在dagscheduler.scala中,定義了函數submitmissingtasks,讀者閱讀完整實作,從中可以看到task的排程方式。限于篇幅,以下截取部分代碼。

private def submitmissingtasks(stage: stage, jobid: int) {

  logdebug("submitmissingtasks(" + stage + ")")

  // get our pending tasks and remember them in our pendingtasks entry

  stage.pendingtasks.clear()

  // first figure out the indexes of partition ids to compute.

  /*過濾出計算位置,用以執行計算*/

  val (allpartitions: seq[int], partitionstocompute: seq[int]) = {

    stage match {

      /*針對shufflemap類型的stage*/

      case stage: shufflemapstage =>

        val allpartitions = 0 until stage.numpartitions

        val filteredpartitions = allpartitions.filter { id =>  stage.outputlocs(id).isempty }

        (allpartitions, filteredpartitions)

      /*針對result類型的stage*/

      case stage: resultstage =>

        val job = stage.resultofjob.get

        val allpartitions = 0 until job.numpartitions

        val filteredpartitions = allpartitions.filter { id => ! job.finished(id) }

  .....[以下代碼略]

  /*擷取task執行的優先節點*/

  private[spark]

  def getpreferredlocs(rdd: rdd[_], partition: int): seq[tasklocation]    = {

    getpreferredlocsinternal(rdd, partition, new hashset)

計算task執行的優先節點位置的代碼實作在getpreferredlocsinternal函數中,具體如下:

/*計算位置的遞歸實作*/

private def getpreferredlocsinternal(

      rdd: rdd[_],

      partition: int,

      visited: hashset[(rdd[_], int)]): seq[tasklocation] = {

    // if the partition has already been visited, no need to re-visit.

    // this avoids exponential path exploration.  spark-695

    if (!visited.add((rdd, partition))) {

      // nil has already been returned for previously visited partitions.

      return nil

    // 如果調用cache緩存過,則計算緩存位置,讀取緩存分區中的資料

    val cached = getcachelocs(rdd)(partition)

    if (cached.nonempty) {

      return cached

    // 如果能直接擷取到執行地點,則傳回作為該task的執行地點

    val rddprefs = rdd.preferredlocations(rdd.partitions(partition)).tolist

    if (rddprefs.nonempty) {

      return rddprefs.map(tasklocation(_))

    /*針對窄依賴關系的rdd, 取出第一個窄依賴的父rdd分區的執行地點*/

    rdd.dependencies.foreach {

      case n: narrowdependency[_] =>

        for (inpart <- n.getparents(partition)) {

          val locs = getpreferredlocsinternal(n.rdd, inpart, visited)

          if (locs != nil) {

            return locs

        }

      case _ =>

    /*對于shuffle依賴的rdd,選取至少含reducer_pref_locs_fraction這麼多資料的位置作為優先節點*/

    if (shufflelocalityenabled && rdd.partitions.length < shuffle_pref_reduce_threshold) {

      rdd.dependencies.foreach {

        case s: shuffledependency[_, _, _] =>

          if (s.rdd.partitions.length < shuffle_pref_map_threshold) {

            // get the preferred map output locations for this reducer

            val toplocsforreducer = mapoutputtracker.getlocationswithlargestou-tputs(s.shuffleid,

              partition, rdd.partitions.length, reducer_pref_locs_fraction)

            if (toplocsforreducer.nonempty) {

              return toplocsforreducer.get.map(loc => tasklocation(loc.host, loc.executorid))

        case _ =>

    nil

3.3 spark存儲與i/o

前面已經講過,rdd是按照partition分區劃分的,是以rdd可以看作由一些分布在不同節點上的分區組成。由于partition分區與資料塊是一一對應的,是以rdd中儲存了partitionid與實體資料塊之間的映射。實體資料塊并非都儲存在磁盤上,也有可能儲存在記憶體中。

3.3.1 spark存儲系統概覽

spark i/o機制可以分為兩個層次:

1)通信層:用于master與slave之間傳遞控制指令、狀态等資訊,通信層在架構上也采用master-slave結構。

2)存儲層:同于儲存資料塊到記憶體、磁盤,或遠端複制資料塊。

下面介紹幾個spark存儲方面的功能子產品。

1)blockmanager:spark提供操作storage的統一接口類。

2)blockmanagermasteractor:master建立,slave利用該子產品向master傳遞資訊。

3)blockmanagerslaveactor:slave建立,master利用該子產品向slave節點傳遞控制指令,控制slave節點對block的讀寫。

4)blockmanagermaster: 管理actor通信。

5)diskstore:支援以檔案方式讀寫的方式操作block。

6)memorystore: 支援記憶體中的block讀寫。

7)blockmanagerworker: 對遠端異步傳輸進行管理。

8)connectionmanager:支援本地節點與遠端節點資料block的傳輸。

圖3-8概要性地揭示了spark存儲系統各個主要子產品之間的通信。

圖3-8 spark存儲系統概覽

3.3.2 blockmanager中的通信

存儲系統的通信仍然類似master-slave架構,節點之間傳遞指令與狀态。總體而言,master向slave傳遞指令,slave向master傳遞資訊和狀态。這些master與slave節點之間的資訊傳遞通過actor對象實作(關于actor的詳細功能會在下一節spark通信機制中講述)。但在blockmanager中略有不同,下面分别講述。

1)master節點上的blockmanagermaster包含内容如下:

①blockmanagermasteractor的actor引用。

②blockmanagerslaveactor的ref引用。

2)slave節點上的blockmanagermaster包含内容如下:

①blockmanagermasteractor的ref引用。

②blockmanagerslaveactor的actor引用。

其中,在ref與actor之間的通信由blockmanagermasteractor和blockmanagerslave-actor完成。這個部分相關的源碼篇幅較多,此處省略,感興趣的讀者請自行研究。

3.4 spark通信機制

前面介紹過,spark的部署模式可以分為local、standalone、mesos、yarn等。

本節以spark部署在standalone模式下為例,介紹spark的通信機制(其他模式類似)。

3.4.1 分布式通信方式

先介紹分布式通信的幾種基本方式。

1. rpc

遠端過程調用協定(remote procedure call protocol,rpc)是一種通過網絡從遠端計算機程式上請求服務,而不需要了解底層網絡技術的協定。rpc假定某些傳輸協定的存在,如tcp或udp,為通信程式之間攜帶資訊資料。在osi網絡通信模型中,rpc跨越了傳輸層和應用層。rpc使得開發分布式應用更加容易。rpc采用c/s架構。請求程式就是一個client,而服務提供程式就是一個server。首先,client調用程序發送一個有程序參數的調用資訊到service程序,然後等待應答資訊。在server端,程序保持睡眠狀态直到調用資訊到達為止。當一個調用資訊到達時,server獲得程序參數,計算結果,發送答複資訊,然後等待下一個調用資訊,最後,client調用程序接收答複資訊,獲得程序結果,然後調用執行繼續進行。

2.  rmi

遠端方法調用(remote method invocation,rmi)是java的一組擁護開發分布式應用程式的api。rmi使用java語言接口定義了遠端對象,它集合了java序列化和java遠端方法協定(java remote method protocol)。簡單地說,這樣使原先的程式在同一作業系統的方法調用,變成了不同作業系統之間程式的方法調用。由于j2ee是分布式程式平台,它以rmi機制實作程式元件在不同作業系統之間的通信。比如,一個ejb可以通過rmi調用web上另一台機器上的ejb遠端方法。rmi可以被看作是rpc的java版本,但是傳統rpc并不能很好地應用于分布式對象系統。java rmi 則支援存儲于不同位址空間的程式級對象之間彼此進行通信,實作遠端對象之間的無縫遠端調用。

3.  jms

java消息服務(java message service,jms)是一個與具體平台無關的api,用來通路消息收發。jms 使使用者能夠通過消息收發服務(有時稱為消息中介程式或路由器)從一個 jms 客戶機向另一個jms客戶機發送消息。消息是 jms 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由資訊以及有關該消息的中繼資料組成。消息主體則攜帶着應用程式的資料或有效負載。jms定義了5種消息正文格式,以及調用的消息類型,允許發送并接收以一些不同形式的資料,提供現有消息格式的一些級别的相容性。

 streammessage:java原始值的資料流。

 mapmessage:一套名稱–值對。

 textmessage:一個字元串對象。

 objectmessage:一個序列化的 java對象。

 bytesmessage:一個未解釋位元組的資料流。

4. ejb

javaee伺服器端元件模型(enterprise javabean,ejb)的設計目标是部署分布式應用程式。簡單來說就是把已經編寫好的程式打包放在伺服器上執行。ejb定義了一個用于開發基于元件的企業多重應用程式的标準。ejb的核心是會話bean(session bean)、實體bean(entity bean)和消息驅動bean(message driven bean)。

5. web service

web service是一個平台獨立的、低耦合的、自包含的、基于可程式設計的web應用程式。可以使用開放的xml(标準通用标記語言下的一個子集)标準來描述、釋出、發現、協調和配置這些應用程式,用于開發分布式的應用程式。web service技術能使得運作在不同機器上的不同應用無須借助第三方軟硬體, 就可互相交換資料或內建。web service減少了應用接口的花費。web service為整個企業甚至多個組織之間的業務流程的內建提供了一個通用機制。

3.4.2 通信架構akka

akka是一個用scala語言編寫的庫,用于簡化編寫容錯的、高可伸縮性的java和scala的actor模型應用。它分為開發庫和運作環境,可以用于建構高并發、分布式、可容錯、事件驅動的基于jvm的應用。akka使建構高并發的分布式應用變得更加容易。akka已經被成功運用在衆多行業的衆多大企業,從投資業到商業銀行、從零售業到社會媒體、仿真、遊戲和賭博、汽車和交通系統、資料分析等。任何需要高吞吐率和低延遲的系統都是使用akka的候選,是以spark選擇akka通信架構來支援子產品間的通信。

actor模型常見于并發程式設計,它由carl hewitt于20世紀70年代早期提出,目的是解決分布式程式設計中的一系列問題。其特點如下:

1) 系統中的所有事物都可以扮演一個actor。

2) actor之間完全獨立。

3) 在收到消息時actor采取的所有動作都是并行的。

4) actor有辨別和對目前行為的描述。

actor可以看作是一個個獨立的實體,它們之間是毫無關聯的。但是,它們可以通過消息來通信。當一個actor收到其他actor的資訊後,它可以根據需要做出各種響應。消息的類型和内容都可以是任意的。這點與web service類似,隻提供接口服務,不必了解内部實作。一個actor在處理多個actor的請求時,通常先建立一個消息隊列,每次收到消息後,就放入隊列。actor每次也可以從隊列中取出消息體來處理,而且這個過程是可循環的,這個特點讓actor可以時刻處理發送來的消息。

akka的優勢如下:

1) 易于建構并行與分布式應用(simple concurrency & distribution):akka采用異步通信與分布式架構,并對上層進行抽象,如actors、futures、stm等。

2) 可靠性(resilient by design):系統具備自愈能力,在本地/遠端都有監護。

3) 高性能(high performance):在單機中每秒可發送5000萬個消息。記憶體占用小,1gb記憶體中可儲存250萬個actors。

4) 彈性,無中心(elastic — decentralized):自适應的負責均衡、路由、分區、配置。

5) 可擴充性(extensible):可以使用akka擴充包進行擴充。

3.4.3 client、master 和 worker之間的通信

client、master與worker之間的互動代碼實作位于如下路徑:

(spark-root)/core/src/main/scala/org/apache/spark/deploy

主要涉及的類包括client.scala、master.scala和worker.scala。這三大子產品之間的通信架構如圖3-9所示:

圖3-9 client、master和worker之間的通信

以standalone部署模式為例,三大子產品分工如下:

1)client:送出作業給master。

2)master:接收client送出的作業,管理worker,并指令worker啟動driver和executor。

3)worker:負責管理本節點的資源,定期向master彙報心跳資訊,接收master的指令,如啟動driver和executor。

下面列出client、master與worker的實作代碼,讀者可以從中看到三個子產品間的通信互動。

1. client端通信

private class clientendpoint(

   override val rpcenv: rpcenv,

   driverargs: clientarguments,

   masterendpoints: seq[rpcendpointref],

   conf: sparkconf)

   extends threadsaferpcendpoint with logging {

   <限于篇幅,此處代碼省略……>

  override def onstart(): unit = {

  driverargs.cmd match {

    case "launch" =>

      val mainclass = "org.apache.spark.deploy.worker.driverwrapper"

      val classpathconf = "spark.driver.extraclasspath"

      val classpathentries = sys.props.get(classpathconf).toseq.flatmap { cp =>

        cp.split(java.io.file.pathseparator)

      val librarypathconf = "spark.driver.extralibrarypath"

      val librarypathentries = sys.props.get (librarypathconf).toseq.flatmap { cp =>

      val extrajavaoptsconf = "spark.driver.extrajavaoptions"

      val extrajavaopts = sys.props.get(extrajavaoptsconf)

        .map(utils.splitcommandstring).getorelse(seq.empty)

      val sparkjavaopts = utils.sparkjavaopts(conf)

      val javaopts = sparkjavaopts ++ extrajavaopts

      val command = new command(mainclass,

        seq("{{worker_url}}", "{{user_jar}}", driverargs.mainclass) ++ driverargs.driveroptions,

        sys.env, classpathentries, librarypathentries, javaopts)

      /* 建立driverdescription對象 */

      val driverdescription = new driverdescription(

        driverargs.jarurl,

        driverargs.memory,

        driverargs.cores,

        driverargs.supervise,

        command)

      /* 此處向master的actor送出driver*/

      ayncsendtomasterandforwardreply[submitdriverresponse](

        requestsubmitdriver(driverdescription))

    case "kill" =>

      val driverid = driverargs.driverid

      /* 接收停止driver是否成功的通知 */

      ayncsendtomasterandforwardreply[killdriverresponse](requestkill-driver(driverid))

 /* 向master發送消息,并異步地轉發傳回資訊給client */

  private def ayncsendtomasterandforwardreply[t: classtag](message: any): unit = {

    for (masterendpoint <- masterendpoints) {

      masterendpoint.ask[t](message).oncomplete {

        case success(v) => self.send(v)

        case failure(e) =>

          logwarning(s"error sending messages to master $masterendpoint", e)

      }(forwardmessageexecutioncontext)

2. master端通信

private[deploy] class master(

    override val rpcenv: rpcenv,

    address: rpcaddress,

    webuiport: int,

    val securitymgr: securitymanager,

    val conf: sparkconf)

  extends threadsaferpcendpoint with logging with leaderelectable {

  ……

  override def receive: partialfunction[any, unit] = {

     /* 選舉為master,當狀态為recoverystate.recovering時恢複 */

     case electedleader => {

       val (storedapps, storeddrivers, storedworkers) = persistenceengine.readpersisteddata(rpcenv)

       state = if (storedapps.isempty && storeddrivers.isempty && storedworkers.isempty) {

       recoverystate.alive

       } else {

       recoverystate.recovering

       }

       loginfo("i have been elected leader! new state: " + state)

       if (state == recoverystate.recovering) {

       beginrecovery(storedapps, storeddrivers, storedworkers)

       recoverycompletiontask = forwardmessagethread.schedule(new runnable {

         override def run(): unit = utils.trylognonfatalerror {

         self.send(completerecovery)

         }

       }, worker_timeout_ms, timeunit.milliseconds)

     /* 完成恢複 */

     case completerecovery => completerecovery()

     case revokedleadership => {

        logerror("leadership has been revoked -- master shutting down.")

        system.exit(0)

     }

     /* 注冊worker */

     case registerworker(

        id, workerhost, workerport, workerref, cores, memory, workeruiport, publicaddress) => {

        loginfo("registering worker %s:%d with %d cores, %s ram".format(

        workerhost, workerport, cores, utils.megabytestostring(memory)))

        /* 當狀态為recoverystate.standby時,不注冊 */

        if (state == recoverystate.standby) {

        // ignore, don't send response

        } else if (idtoworker.contains(id)) {

        /* 重複注冊,通知注冊失敗 */

          workerref.send(registerworkerfailed("duplicate worker id"))

          val worker = new workerinfo(id, workerhost, workerport, cores, memory,

          workerref, workeruiport, publicaddress)

          if (registerworker(worker)) {

             /* 注冊成功,通知worker節點 */

             persistenceengine.addworker(worker)

             workerref.send(registeredworker(self, masterwebuiurl))

             schedule()

          } else {

             val workeraddress = worker.endpoint.address

             logwarning("worker registration failed. attempted to re-register worker at same " +"address: " + workeraddress)

             /* 注冊失敗,通知worker節點 */

             workerref.send(registerworkerfailed("attempted to re-register worker at same address: "+ workeraddress))

      /* 通知executor的driver更新狀态 */

      case executorstatechanged(appid, execid, state, message, exitstatus) => {

      ……

 override def receiveandreply(context: rpccallcontext): partialfunction[any, unit] = {

  case requestsubmitdriver(description) => {

     /* 當master狀态不為alive的時候,通知client無法送出driver */

     if (state != recoverystate.alive) {

       val msg = s"${utils.backup_standalone_master_prefix}: $state. " +

         "can only accept driver submissions in alive state."

       context.reply(submitdriverresponse(self, false, none, msg))

     } else {

       loginfo("driver submitted " + description.command.mainclass)

       val driver = createdriver(description)

       persistenceengine.adddriver(driver)

       waitingdrivers += driver

       drivers.add(driver)

       schedule()

       /* 送出driver */

       context.reply(submitdriverresponse(self, true, some(driver.id), s"driver successfully submitted as ${driver.id}"))

   }  

   case requestkilldriver(driverid) => {

        val msg = s"${utils.backup_standalone_master_prefix}: $state. " + s"can only kill drivers in alive state."

        /* 當master不為alive時,通知無法終止driver */

        context.reply(killdriverresponse(self, driverid, success = false, msg))

        loginfo("asked to kill driver " + driverid)

        val driver = drivers.find(_.id == driverid)

        driver match {

        case some(d) =>

          if (waitingdrivers.contains(d)) {

            /* 當想kill的driver在等待隊列中時,删除driver并更新狀态為killed */

            waitingdrivers -= d

            self.send(driverstatechanged(driverid, driverstate.killed, none))

            /* 通知worker,driver被終止 */

            d.worker.foreach { w =>

              w.endpoint.send(killdriver(driverid))

          // todo: it would be nice for this to be a synchronous response

          val msg = s"kill request for $driverid submitted"

          loginfo(msg)

          /* 通知請求者,終止driver的請求已送出 */

          context.reply(killdriverresponse(self, driverid, success = true, msg))

      case none =>

        val msg = s"driver $driverid has already finished or does not exist"

        logwarning(msg)

        /* 通知請求者,driver已被終止或不存在 */

   }

 }

 ……

3. worker端通信邏輯

private[deploy] class worker(

   webuiport: int,

   cores: int,

   memory: int,

   masterrpcaddresses: array[rpcaddress],

   systemname: string,

   endpointname: string,

   workdirpath: string = null,

   val conf: sparkconf,

   val securitymgr: securitymanager)

 extends threadsaferpcendpoint with logging {

   ……

   override def receive: partialfunction[any, unit] = {

      /* 注冊worker */

      case registeredworker(masterref, masterwebuiurl) =>

          ……

      /* 向master發送心跳 */

      case sendheartbeat =>

          if (connected) { sendtomaster(heartbeat(workerid, self)) }

      /* 清理舊應用的工作目錄 */

      case workdircleanup =>

          // spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker

          // rpcendpoint.

          // copy ids so that it can be used in the cleanup thread.

          val appids = executors.values.map(_.appid).toset

          val cleanupfuture = concurrent.future {

      /* 新master選舉産生時,work更新master相關資訊,包括url等 */

      case masterchanged(masterref, masterwebuiurl) =>

          loginfo("master has changed, new master is at " + masterref.address.tosparkurl)

          changemaster(masterref, masterwebuiurl)

      /* worker向主節點注冊失敗 */

      case registerworkerfailed(message) =>

         if (!registered) {

            logerror("worker registration failed: " + message)

            system.exit(1)

      /* worker重新連接配接向master注冊 */

      case reconnectworker(masterurl) =>

          loginfo(s"master with url $masterurl requested this worker to reconnect.")

          registerwithmaster()

      /* 啟動executor */

      case launchexecutor(masterurl, appid, execid, appdesc, cores_, memory_) =>

          /* 啟動executorrunner */

          val manager = new executorrunner(

      /* executor狀态改變 */

      case executorstatechanged @ executorstatechanged(appid, execid, state, message, exitstatus) =>

          /* 通知master executor狀态改變 */

          handleexecutorstatechanged(executorstatechanged)

      /* 終止目前節點上運作的executor */

      case killexecutor(masterurl, appid, execid) =>

          if (masterurl != activemasterurl) {

             logwarning("invalid master (" + masterurl + ") attempted to launch executor " + execid)

             val fullid = appid + "/" + execid

             executors.get(fullid) match {

                case some(executor) =>

                   loginfo("asked to kill executor " + fullid)

                   executor.kill()

                case none =>

                   loginfo("asked to kill unknown executor " + fullid)

      /* 啟動driver */

      case launchdriver(driverid, driverdesc) => {

         loginfo(s"asked to launch driver $driverid")

         /* 建立driverrunner */

         val driver = new driverrunner(...)

         drivers(driverid) = driver

         /* 啟動driver */

         driver.start()

         ……

       /* 終止worker節點上運作的driver */

       case killdriver(driverid) => {

         loginfo(s"asked to kill driver $driverid")

         drivers.get(driverid) match {

             case some(runner) =>

                runner.kill()

             case none =>

                logerror(s"asked to kill unknown driver $driverid")

       /* driver狀态更新 */             

       case driverstatechanged @ driverstatechanged(driverid, state, exception) => {

          handledriverstatechanged(driverstatechanged)

       ……

3.5 容錯機制及依賴

一般而言,對于分布式系統,資料集的容錯性通常有兩種方式:

1) 資料檢查點(在spark中對應checkpoint機制)。

2) 記錄資料的更新(在spark中對應lineage血統機制)。

對于大資料分析而言,資料檢查點操作成本較高,需要通過資料中心的網絡連接配接在機器之間複制龐大的資料集,而網絡帶寬往往比記憶體帶寬低,同時會消耗大量存儲資源。

spark選擇記錄更新的方式。但更新粒度過細時,記錄更新成本也不低。是以,rdd隻支援粗粒度轉換,即隻記錄單個塊上執行的單個操作,然後将建立rdd的一系列變換序列記錄下來,以便恢複丢失的分區。

3.5.1 lineage(血統)機制

每個rdd除了包含分區資訊外,還包含它從父輩rdd變換過來的步驟,以及如何重建某一塊資料的資訊,是以rdd的這種容錯機制又稱“血統”(lineage)容錯。lineage本質上很類似于資料庫中的重做日志(redo log),隻不過這個重做日志粒度很大,是對全局資料做同樣的重做以便恢複資料。

相比其他系統的細顆粒度的記憶體資料更新級别的備份或者log機制,rdd的lineage記錄的是粗顆粒度的特定資料transformation操作(如filter、map、join等)。當這個rdd的部分分區資料丢失時,它可以通過lineage擷取足夠的資訊來重新計算和恢複丢失的資料分區。但這種資料模型粒度較粗,是以限制了spark的應用場景。是以可以說spark并不适用于所有高性能要求的場景,但同時相比細顆粒度的資料模型,也帶來了性能方面的提升。

rdd在lineage容錯方面采用如下兩種依賴來保證容錯方面的性能:

窄依賴(narrow dependeny):窄依賴是指父rdd的每一個分區最多被一個子rdd的分區所用,表現為一個父rdd的分區對應于一個子rdd的分區,或多個父rdd的分區對應于一個子rdd的分區。也就是說一個父rdd的一個分區不可能對應一個子rdd的多個分區。其中,1個父rdd分區對應1個子rdd分區,可以分為如下兩種情況:

子rdd分區與父rdd分區一一對應(如map、filter等算子)。

一個子rdd分區對應n個父rdd分區(如co-paritioned(協同劃分)過的join)。

 寬依賴(wide dependency,源碼中稱為shuffle dependency):

寬依賴是指一個父rdd分區對應多個子rdd分區,可以分為如下兩種情況:

一個父rdd對應所有子rdd分區(未經協同劃分的join)。

一個父rdd對應多個rdd分區(非全部分區)(如groupbykey)。

窄依賴與寬依賴關系如圖3-10所示。

從圖3-10可以看出對依賴類型的劃分:根據父rdd分區是對應一個還是多個子rdd分區來區分窄依賴(父分區對應一個子分區)和寬依賴(父分區對應多個子分區)。如果對應多個,則當容錯重算分區時,對于需要重新計算的子分區而言,隻需要父分區的一部分資料,是以其餘資料的重算就導緻了備援計算。

圖3-10 兩種依賴關系

對于寬依賴,stage計算的輸入和輸出在不同的節點上,對于輸入節點完好,而輸出節點當機的情況,在通過重新計算恢複資料的情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上追溯其祖先看是否可以重試(這就是lineage,血統的意思),窄依賴對于資料的重算開銷要遠小于寬依賴的資料重算開銷。

窄依賴和寬依賴的概念主要用在兩個地方:一個是容錯中相當于redo日志的功能;另一個是在排程中建構dag作為不同stage的劃分點(前面排程機制中已講過)。

依賴關系在lineage容錯中的應用總結如下:

1)窄依賴可以在某個計算節點上直接通過計算父rdd的某塊資料計算得到子rdd對應的某塊資料;寬依賴則要等到父rdd所有資料都計算完成,并且父rdd的計算結果進行hash并傳到對應節點上之後,才能計算子rdd。

2)資料丢失時,對于窄依賴,隻需要重新計算丢失的那一塊資料來恢複;對于寬依賴,則要将祖先rdd中的所有資料塊全部重新計算來恢複。是以在長“血統”鍊特别是有寬依賴時,需要在适當的時機設定資料檢查點(checkpoint機制在下節講述)。可見spark在容錯性方面要求對于不同依賴關系要采取不同的任務排程機制和容錯恢複機制。

在spark容錯機制中,如果一個節點當機了,而且運算屬于窄依賴,則隻要重算丢失的父rdd分區即可,不依賴于其他節點。而寬依賴需要父rdd的所有分區都存在,重算就很昂貴了。更深入地來說:在窄依賴關系中,當子rdd的分區丢失,重算其父rdd分區時,父rdd相應分區的所有資料都是子rdd分區的資料,是以不存在備援計算。而在寬依賴情況下,丢失一個子rdd分區重算的每個父rdd的每個分區的所有資料并不是都給丢失的子rdd分區使用,其中有一部分資料對應的是其他不需要重新計算的子rdd分區中的資料,是以在寬依賴關系下,這樣計算就會産生備援開銷,這也是寬依賴開銷更大的原因。為了減少這種備援開銷,通常在lineage血統鍊比較長,并且含有寬依賴關系的容錯中使用checkpoint機制設定檢查點。

3.5.2 checkpoint(檢查點)機制

通過上述分析可以看出checkpoint的本質是将rdd寫入disk來作為檢查點。這種做法是為了通過lineage血統做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之後有節點出現問題而丢失分區,從做檢查點的rdd開始重做lineage,就會減少開銷。

下面從代碼層面介紹checkpoint的實作。

1. 設定檢查點資料的存取路徑[sparkcontext.scala]

/* 設定作為rdd檢查點的目錄,如果是叢集上運作,則必須為hdfs路徑 */

def setcheckpointdir(directory: string) {

    // if we are running on a cluster, log a warning if the directory is local.

    // otherwise, the driver may attempt to reconstruct the checkpointed rdd from

    // its own local file system, which is incorrect because the checkpoint files

    // are actually on the executor machines.

    if (!islocal && utils.nonlocalpaths(directory).isempty) {

       logwarning("checkpoint directory must be non-local " +

       "if spark is running on a cluster: " + directory)

    checkpointdir = option(directory).map { dir =>

        val path = new path(dir, uuid.randomuuid().tostring)

        val fs = path.getfilesystem(hadoopconfiguration)

        fs.mkdirs(path)

        fs.getfilestatus(path).getpath.tostring

2. 設定檢查點的具體實作

[rdd.scala]

/* 設定檢查點入口 */

private[spark] def docheckpoint(): unit = {

    rddoperationscope.withscope(sc, "checkpoint", allownesting = false, ignoreparent = true) {

      if (!docheckpointcalled) {

          docheckpointcalled = true

      if (checkpointdata.isdefined) {

          checkpointdata.get.checkpoint()

      } else {

          /*  */              

          dependencies.foreach(_.rdd.docheckpoint())

[rddcheckpointdata.scala]

/* 設定檢查點,在子類中會覆寫此函數以實作具體功能 */

protected def docheckpoint(): checkpointrdd[t]

[reliablerddcheckpointdata.scala]

/* 設定檢查點,将rdd内容寫入可靠的分布式檔案系統中 */

protected override def docheckpoint(): checkpointrdd[t] = {

    /* 為檢查點建立輸出目錄 */

    val path = new path(cpdir)

    val fs = path.getfilesystem(rdd.context.hadoopconfiguration)

    if (!fs.mkdirs(path)) {

        throw new sparkexception(s"failed to create checkpoint path $cpdir")

    /* 儲存為檔案,加載時作為一個rdd加載 */

    val broadcastedconf = rdd.context.broadcast(

       new serializableconfiguration(rdd.context.hadoopconfiguration))

    /* 重新計算rdd */

    rdd.context.runjob(rdd, reliablecheckpointrdd.writecheckpointfile[t](cpdir, broadcastedconf) _)

    val newrdd = new reliablecheckpointrdd[t](rdd.context, cpdir)

    if (newrdd.partitions.length != rdd.partitions.length) {

    throw new sparkexception(

        s"checkpoint rdd $newrdd(${newrdd.partitions.length}) has different " +

        s"number of partitions from original rdd $rdd(${rdd.partitions.length})")

    /* 當引用不在此範圍時,清除檢查點檔案 */

    if (rdd.conf.getboolean("spark.cleaner.referencetracking.cleancheckpoints", false)) {

         rdd.context.cleaner.foreach { cleaner =>

         cleaner.registerrddcheckpointdataforcleanup(newrdd, rdd.id)

   loginfo(s"done checkpointing rdd ${rdd.id} to $cpdir, new parent is rdd ${newrdd.id}")

   newrdd

3.6 shuffle機制

在mapreduce架構中,shuffle是連接配接map和reduce之間的橋梁,map的輸出要用到reduce中必須經過shuffle這個環節,shuffle的性能高低直接影響了整個程式的性能和吞吐量。spark作為mapreduce架構的一種實作,自然也實作了shuffle的邏輯。對于大資料計算架構而言,shuffle階段的效率是決定性能好壞的關鍵因素之一。

3.6.1 什麼是shuffle

shuffle是mapreduce架構中的一個特定的階段,介于map階段和reduce階段之間,當map的輸出結果要被reduce使用時,輸出結果需要按關鍵字值(key)哈希,并且分發到每一個reducer上,這個過程就是shuffle。直覺來講,spark shuffle機制是将一組無規則的資料轉換為一組具有一定規則資料的過程。由于shuffle涉及了磁盤的讀寫和網絡的傳輸,是以shuffle性能的高低直接影響整個程式的運作效率。

在mapreduce計算架構中,shuffle連接配接了map階段和reduce階段,即每個reduce task從每個map task産生的資料中讀取一片資料,極限情況下可能觸發m*r個資料拷貝通道(m是map task數目,r是reduce task數目)。通常shuffle分為兩部分:map階段的資料準備和reduce階段的資料拷貝。首先,map階段需根據reduce階段的task數量決定每個map task輸出的資料分片數目,有多種方式存放這些資料分片:

1) 儲存在記憶體中或者磁盤上(spark和mapreduce都存放在磁盤上)。

2) 每個分片對應一個檔案(現在spark采用的方式,以及以前mapreduce采用的方式),或者所有分片放到一個資料檔案中,外加一個索引檔案記錄每個分片在資料檔案中的偏移量(現在mapreduce采用的方式)。

是以可以認為spark shuffle與mapreduce shuffle的設計思想相同,但在實作細節和優化方式上不同。

在spark中,任務通常分為兩種,shuffle maptask和reducetask,具體邏輯如圖3-11所示:

圖3-11 spark shuffle

圖3-11中的主要邏輯如下:

1)首先每一個maptask會根據reducetask的數量建立出相應的bucket,bucket的數量是m×r,其中m是map的個數,r是reduce的個數。

2)其次maptask産生的結果會根據設定的partition算法填充到每個bucket中。這裡的partition算法是可以自定義的,當然預設的算法是根據key哈希到不同的bucket中。

當reducetask啟動時,它會根據自己task的id和所依賴的mapper的id從遠端或本地的block manager中取得相應的bucket作為reducer的輸入進行處理。

這裡的bucket是一個抽象概念,在實作中每個bucket可以對應一個檔案,可以對應檔案的一部分或是其他等。spark shuffle可以分為兩部分:

1) 将資料分成bucket,并将其寫入磁盤的過程稱為shuffle write。

2) 在存儲shuffle資料的節點fetch資料,并執行使用者定義的聚集操作,這個過程稱為shuffle fetch。

3.6.2 shuffle曆史及細節

下面介紹shuffle write與fetch。

1. shuffle write

在spark的早期版本實作中,spark在每一個maptask中為每個reducetask建立一個bucket,并将rdd計算結果放進bucket中。

但早期的shuffle write有兩個比較大的問題。

1)map的輸出必須先全部存儲到記憶體中,然後寫入磁盤。這對記憶體是非常大的開銷,當記憶體不足以存儲所有的map輸出時就會出現oom(out of memory)。

2)每個maptask會産生與reducetask數量一緻的shuffle檔案,如果maptask個數是1k,reducetask個數也是1k,就會産生1m個shuffle檔案。這對于檔案系統是比較大的壓力,同時在shuffle資料量不大而shuffle檔案又非常多的情況下,随機寫也會嚴重降低io的性能。

後來到了spark 0.8版實作時,顯著減少了shuffle的記憶體壓力,現在map輸出不需要先全部存儲在記憶體中,再flush到硬碟,而是record-by-record寫入磁盤中。對于shuffle檔案的管理也獨立出新的shuffleblockmanager進行管理,而不是與rdd cache檔案在一起了。

但是spark 0.8版的shuffle write仍然有兩個大的問題沒有解決。

1)shuffle檔案過多的問題。這會導緻檔案系統的壓力過大并降低io的吞吐量。

2)雖然map輸出資料不再需要預先存儲在記憶體中然後寫入磁盤,進而顯著減少了記憶體壓力。但是新引入的diskobjectwriter所帶來的buffer開銷也是不容小視的記憶體開銷。假定有1k個maptask和1k個reducetask,就會有1m個bucket,相應地就會有1m個write handler,而每一個write handler預設需要100kb記憶體,那麼總共需要100gb記憶體。這樣僅僅是buffer就需要這麼多的記憶體。是以當reducetask數量很多時,記憶體開銷會很大。

為了解決shuffle檔案過多的情況,spark後來引入了新的shuffle consolidation,以期顯著減少shuffle檔案的數量。

shuffle consolidation的原理如圖3-12所示:

在圖3-12中,假定該job有4個mapper和4個reducer,有2個core能并行運作兩個task。可以算出spark的shuffle write共需要16個bucket,也就有了16個write handler。在之前的spark版本中,每個bucket對應一個檔案,是以在這裡會産生16個shuffle檔案。

圖3-12 shuffle consolidation

而在shuffle consolidation中,每個bucket并非對應一個檔案,而是對應檔案中的一個segment。同時shuffle consolidation産生的shuffle檔案數量與spark core的個數也有關系。在圖3-12中,job中的4個mapper分為兩批運作,在第一批2個mapper運作時會申請8個bucket,産生8個shuffle檔案;而在第二批mapper運作時,申請的8個bucket并不會再産生8個新的檔案,而是追加寫到之前的8個檔案後面,這樣一共就隻有8個shuffle檔案,而在檔案内部共有16個不同的segment。是以從理論上講shuffle consolidation産生的shuffle檔案數量為c×r,其中c是spark叢集的core number,r是reducer的個數。

很顯然,當m=c時,shuffle consolidation産生的檔案數和之前的實作相同。

shuffle consolidation顯著減少了shuffle檔案的數量,解決了spark之前實作中一個比較嚴重的問題。但是writer handler的buffer開銷過大依然沒有減少,若要減少writer handler的buffer開銷,隻能減少reducer的數量,但是這又會引入新的問題。

2. shuffle fetch與aggregator

shuffle write寫出去的資料要被reducer使用,就需要shuffle fetch将所需的資料fetch過來。這裡的fetch操作包括本地和遠端,因為shuffle資料有可能一部分是存儲在本地的。在早期版本中,spark對shuffle fetcher實作了兩套不同的架構:nio通過socket連接配接fetch資料;oio通過netty server去fetch資料。分别對應的類是basic-blockfetcheriterator和nettyblockfetcheriterator。

目前在spark1.5.0中做了優化。新版本定義了類shuffleblockfetcheriterator來完成資料的fetch。對于local的資料,shuffleblockfetcheriterator會通過local的blockman-ager來fetch。對于遠端的資料塊,它通過blocktransferservice類來完成。具體實作參見如下代碼:

[shuffleblockfetcheriterator.scala]

/* fetch local資料塊 */

private[this] def fetchlocalblocks() {

    val iter = localblocks.iterator

    while (iter.hasnext) {

    val blockid = iter.next()

    try {

       /* 通過blockmanager來fetch資料 */

       val buf = blockmanager.getblockdata(blockid)

       shufflemetrics.inclocalblocksfetched(1)

       shufflemetrics.inclocalbytesread(buf.size)

       buf.retain()

       results.put(new successfetchresult(blockid, blockmanager.blockmanagerid, 0, buf))

    } catch {

       case e: exception =>

         // if we see an exception, stop immediately.

         logerror(s"error occurred while fetching local blocks", e)

         results.put(new failurefetchresult(blockid, blockmanager.blockmanagerid, e))

         return

/* 發送請求擷取遠端資料 */

private[this] def sendrequest(req: fetchrequest) {

    /* 請求格式 */

    logdebug("sending request for %d blocks (%s) from %s".format(

    req.blocks.size, utils.bytestostring(req.size), req.address.hostport))

    bytesinflight += req.size

    // so we can look up the size of each blockid

    val sizemap = req.blocks.map { case (blockid, size) => (blockid.tostring, size) }.tomap

    val blockids = req.blocks.map(_._1.tostring)

    val address = req.address

    /* fetch資料 */

    shuffleclient.fetchblocks(address.host, address.port, address.executorid, blockids.toarray,

    new blockfetchinglistener {

       override def onblockfetchsuccess(blockid: string, buf: managedbuffer): unit = {

         // only add the buffer to results queue if the iterator is not zombie,

         // i.e. cleanup() has not been called yet.

         if (!iszombie) {

           // increment the ref count because we need to pass this to a different thread.

           // this needs to be released after use.

           buf.retain()

           /* fetch請求成功 */

           results.put(new successfetchresult(blockid(blockid), address, sizemap(blockid), buf))

           shufflemetrics.incremotebytesread(buf.size)

           shufflemetrics.incremoteblocksfetched(1)

    override def onblockfetchfailure(blockid: string, e: throwable):

    /* fetch 失敗*/

    ……

在mapreduce的shuffle過程中,shuffle fetch過來的資料會進行歸并排序(merge sort),使得相同key下的不同value按序歸并到一起供reducer使用,這個過程如圖3-13所示:

這些歸并排序都是在磁盤上進行的,這樣做雖然有效地控制了記憶體使用,但磁盤io卻大幅增加了。雖然spark屬于mapreduce體系,但是對傳統的mapreduce算法進行了一定的改變。spark假定在大多數應用場景下,shuffle資料的排序不是必須的,如word count。強制進行排序隻會使性能變差,是以spark并不在reducer端做歸并排序。既然沒有歸并排序,那spark是如何進行reduce的呢?這就涉及下面要講的shuffle aggregator了。

圖3-13 fetch merge

aggregator本質上是一個hashmap,它是以map output的key為key,以任意所要combine的類型為value的hashmap。

在做word count reduce計算count值時,它會将shuffle fetch到的每一個key-value對更新或是插入hashmap中(若在hashmap中沒有查找到,則插入其中;若查找到,則更新value值)。這樣就不需要預先把所有的key-value進行merge sort,而是來一個處理一個,省去了外部排序這一步驟。但同時需要注意的是,reducer的記憶體必須足以存放這個partition的所有key和count值,是以對記憶體有一定的要求。

在上面word count的例子中,因為value會不斷地更新,而不需要将其全部記錄在記憶體中,是以記憶體的使用還是比較少的。考慮一下如果是groupbykey這樣的操作,reducer需要得到key對應的所有value。在hadoop mapreduce中,由于有了歸并排序,是以給予reducer的資料已經是group by key了,而spark沒有這一步,是以需要将key和對應的value全部存放在hashmap中,并将value合并成一個array。可以想象為了能夠存放所有資料,使用者必須確定每一個partition小到記憶體能夠容納,這對于記憶體是非常嚴峻的考驗。是以在spark文檔中,建議使用者涉及這類操作時盡量增加partition,也就是增加mapper和reducer的數量。

增加mapper和reducer的數量固然可以減小partition的大小,使記憶體可以容納這個partition。但是在shuffle write中提到,bucket和對應于bucket的write handler是由mapper和reducer的數量決定的,task越多,bucket就會增加得更多,由此帶來write handler所需的buffer也會更多。在一方面我們為了減少記憶體的使用采取了增加task數量的政策,另一方面task數量增多又會帶來buffer開銷更大的問題,是以陷入了記憶體使用的兩難境地。

為了減少記憶體的使用,隻能将aggregator的操作從記憶體移到磁盤上進行,是以spark新版本中提供了外部排序的實作,以解決這個問題。

spark将需要聚集的資料分為兩類:不需要歸并排序和需要歸并排序的資料。對于前者,在記憶體中的appendonlymap中對資料聚集。對于需要歸并排序的資料,現在記憶體中進行聚集,當記憶體資料達到門檻值時,将資料排序後寫入磁盤。事實上,磁盤上的資料隻是全部資料的一部分,最後将磁盤資料全部進行歸并排序和聚集。具體aggregator的邏輯可以參見aggregator類的實作。

@developerapi

case class aggregator[k, v, c] (

   createcombiner: v => c,

   mergevalue: (c, v) => c,

   mergecombiners: (c, c) => c) {

  // 是否外部排序

  private val isspillenabled = sparkenv.get.conf.getboolean("spark.shuffle.spill", true)

  @deprecated("use combinevaluesbykey with taskcontext argument", "0.9.0")

  def combinevaluesbykey(iter: iterator[_ <: product2[k, v]]): iterator[(k, c)] =

  combinevaluesbykey(iter, null)

  def combinevaluesbykey(iter: iterator[_ <: product2[k, v]],

                     context: taskcontext): iterator[(k, c)] = {

  if (!isspillenabled) {

     /* 建立appendonlymap對象存儲了combine集合,每個combine是一個key及對應key的元素seq */

     val combiners = new appendonlymap[k, c]

     var kv: product2[k, v] = null

     val update = (hadvalue: boolean, oldvalue: c) => {

     /* 檢查是否處理的是第一個元素,如果是則先建立集合結構,如果不是則直接插入 */

     if (hadvalue) mergevalue(oldvalue, kv._2) else createcombiner(kv._2)

  while (iter.hasnext) {

     kv = iter.next()

     /* 當不采用外排時,利用appendonlymap結構存儲資料 */

     combiners.changevalue(kv._1, update)

  combiners.iterator

  } else {

     val combiners = new externalappendonlymap[k, v, c](createcombiner, mergevalue, mergecombiners)

     /* 如果采用外排時,使用externalappendonlymap結構存儲聚集資料 */

     combiners.insertall(iter)

     updatemetrics(context, combiners)

     combiners.iterator

……

本節就shuffle的概念與原理先介紹到這裡。在下一章講解spark源碼時,會對shuffle的核心機制——shuffle存儲做代碼層面的講解。相信學習完本章和第4章的shuffle存儲機制後,讀者會對shuffle機制掌握得更加深入。

3.7 本章小結

本章主要講述了spark的工作機制與原理。首先剖析了spark的送出和執行時的具體機制,重點強調了spark程式的宏觀執行過程: 送出後的job在spark中形成了rdd dag(有向無環圖),然後進入一系列切分排程的過程。在剖析過程中,結合spark的源碼呈現了這些排程過程的代碼細節。本章後半部分接着剖析了spark的存儲及io、spark通信機制,最後講述了spark的容錯機制及shuffle機制。 本章内容比較多,希望讀者仔細體會。

繼續閱讀