天天看点

《循序渐进学Spark 》Spark 编程模型

本节书摘来自华章出版社《循序渐进学spark 》一书中的第1章,第3节,作者 小象学院 杨 磊,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

spark机制原理

本书前面几章分别介绍了spark的生态系统、spark运行模式及spark的核心概念rdd和基本算子操作等重要基础知识。本章重点讲解spark的主要机制原理,因为这是spark程序得以高效执行的核心。本章先从application、job、stage和task等层次阐述spark的调度逻辑,并且介绍fifo、fair等经典算法,然后对spark的重要组成模块:i/o与通信控制模块、容错模块及shuffle模块做了深入的阐述。其中,在spark i/o模块中,数据以数据块的形式管理,存储在内存、磁盘或者spark集群中的其他机器上。spark集群通信机制采用了akka通信框架,在集群机器中传递命令和状态信息。另外,容错是分布式系统的一个重要特性,spark采用了lineage与checkpoint机制来保证容错性。spark shuffle模块借鉴了mapreduce的shuffle机制,但在其基础上进行了改进与创新。

3.1 spark应用执行机制分析

下面对spark application的基本概念和执行机制进行深入介绍。

3.1.1 spark应用的基本概念

spark应用(application)是用户提交的应用程序。spark运行模式分为:local、standalone、yarn、mesos等。根据spark application的driver program是否在集群中运行,spark应用的运行方式又可以分为cluster模式和client模式。

下面介绍spark应用涉及的一些基本概念:

1) sparkcontext:spark 应用程序的入口,负责调度各个运算资源,协调各个worker node 上的executor。

2) driver program:运行application的main()函数并创建sparkcontext。

3) rdd:前面已经讲过,rdd是spark的核心数据结构,可以通过一系列算子进行操作。当rdd遇到action算子时,将之前的所有算子形成一个有向无环图(dag)。再在spark中转化为job(job的概念在后面讲述),提交到集群执行。一个app中可以包含多个job。

4) worker node:集群中任何可以运行application代码的节点,运行一个或多个executor进程。

5) executor:为application运行在worker node上的一个进程,该进程负责运行task,并且负责将数据存在内存或者磁盘上。每个application都会申请各自的executor来处理任务。

下面介绍spark 应用(application)执行过程中各个组件的概念:

1) task(任务):rdd中的一个分区对应一个task,task是单个分区上最小的处理流程单元。

2) taskset(任务集): 一组关联的,但相互之间没有shuffle依赖关系的task集合。

3) stage(调度阶段):一个taskset对应的调度阶段。每个job会根据rdd的宽依赖关系被切分很多stage,每个stage都包含一个taskset。

4) job(作业): 由action算子触发生成的由一个或多个stage组成的计算作业。

5) application:用户编写的spark的应用程序,由一个或多个job组成。提交到spark之后,spark为application分配资源,将程序转换并执行。

6) dagscheduler:根据job构建基于stage的dag,并提交stage给taskscheduler。

7) taskscheduler:将taskset提交给worker node集群运行并返回结果。

以上基本概念之间的关系如图3-1所示。

3.1.2 spark应用执行机制概要

spark application从提交后到在worker node执行,期间经历了一系列变换,具体过程如图3-2所示。

图3-1 spark基本概念之间的关系

图3-2 spark 执行流程

如图3-2所示,前面讲过,当rdd遇见action算子之后,触发job提交。提交后的job在spark中形成了rdd dag有向无环图(directed acyclic graph)。rdd dag经过dag scheduler调度之后,根据rdd依赖关系被切分为一系列的stage。每个stage包含一组task集合,再经过task scheduler之后,task被分配到worker节点上的executor线程池执行。如前文所述,rdd中的每一个逻辑分区对应一个物理的数据块,同时每个分区对应一个task,因此task也有自己对应的物理数据块,使用用户定义的函数来处理。spark出于节约内存的考虑,采用了延迟执行的策略,如前文所述,只有action算子才可以触发整个操作序列的执行。另外,spark对于中间计算结果也不会重新分配内存,而是在同一个数据块上流水线操作。

spark使用blockmanager管理数据块,在内存或者磁盘进行存储,如果数据不在本节点,则还可以通过远端节点复制到本机进行计算。在计算时,spark会在具体执行计算的worker节点的executor中创建线程池,executor将需要执行的任务通过线程池来并发执行。

3.1.3 应用提交与执行

spark使用driver进程负责应用的解析、切分stage并调度task到executor执行,包含dagscheduler等重要对象。driver进程的运行地点有如下两种:

1) driver进程运行在client端,对应用进行管理监控。

2) master节点指定某个worker节点启动driver进程,负责监控整个应用的执行。

针对这两种情况,应用提交及执行过程分别如下:

1. driver运行在client

用户启动client端,在client端启动driver进程。在driver中启动或实例化dags-

cheduler等组件。

1)driver向master注册。

2)worker向master注册,master通过指令让worker启动executor。

3)worker通过创建executorrunner线程,进而executorrunner线程启动executor-backend进程。

4)executorbackend启动后,向client端driver进程内的schedulerbackend注册,因此driver进程就可以发现计算资源。

5)driver的dagscheduler解析应用中的rdd dag并生成相应的stage,每个stage包含的taskset通过taskscheduler分配给executor。在executor内部启动线程池并行化执行task。

2. driver运行在worker节点

用户启动客户端,客户端提交应用程序给master。

1)master调度应用,指定一个worker节点启动driver,即scheduler-backend。

2)worker接收到master命令后创建driverrunner线程,在driverrunner线程内创建schedulerbackend进程。driver充当整个作业的主控进程。

3)master指定其他worker节点启动exeuctor,此处流程和上面相似,worker创建executorrunner线程,启动executorbackend进程。

4)executorbackend启动后,向driver的schedulerbackend注册,这样driver获取了计算资源就可以调度和将任务分发到计算节点执行。

schedulerbackend进程中包含dagscheduler,它会根据rdd的dag切分stage,生成taskset,并调度和分发task到executor。对于每个stage的taskset,都会被存放到taskscheduler中。taskscheduler将任务分发到executor,执行多线程并行任务。

图3-3为spark应用的提交与执行示意图。

图3-3 spark应用的提交与执行

3.2 spark调度机制

spark调度机制是保证spark应用高效执行的关键。本节从application、job、stage和task的维度,从上层到底层来一步一步揭示spark的调度策略。

3.2.1 application的调度

spark中,每个application对应一个sparkcontext。sparkcontext之间的调度关系取决于spark的运行模式。对standalone模式而言,spark master节点先计算集群内的计算资源能否满足等待队列中的应用对内存和cpu资源的需求,如果可以,则master创建spark driver,启动应用的执行。宏观上来讲,这种对应用的调度类似于fifo策略。在mesos和yarn模式下,底层的资源调度系统的调度策略都是由mesos和yarn决定的。具体分类描述如下:

1. standalone模式

默认以用户提交application的顺序来调度,即fifo策略。每个应用执行时独占所有资源。如果有多个用户要共享集群资源,则可以使用参数spark.cores.max来配置应用在集群中可以使用的最大cpu核数。如果不配置,则采用默认参数spark.deploy.defaultcore的值来确定。

2. mesos模式

如果在mesos上运行spark,用户想要静态配置资源的话,可以设置spark.mesos.coarse为true,这样mesos变为粗粒度调度模式,然后可以设置spark.cores.max指定集群中可以使用的最大核数,与上面的standalone模式类似。同时,在mesos模式下,用户还可以设置参数spark.executor.memory来配置每个executor的内存使用量。如果想使mesos在细粒度模式下运行,可以通过mesos://<url-info>设置动态共享cpu core的执行模式。在这种模式下,应用不执行时的空闲cpu资源得以被其他用户使用,提升了cpu使用率。

3. yarn模式

如果在yarn上运行spark,用户可以在yarn的客户端上设置--num-executors 来控制为应用分配的executor数量,然后设置--executor-memory指定每个executor的内存大小,设置--executor-cores指定executor占用的cpu核数。

3.2.2 job的调度

前面章节提到过,spark应用程序实际上是一系列对rdd的操作,这些操作直至遇见action算子,才触发job的提交。事实上,在底层实现中,action算子最后调用了runjob函数提交job给spark。其他的操作只是生成对应的rdd关系链。如在rdd.scala程序文件中,count函数源码所示。

def count(): long = sc.runjob(this, utils.getiteratorsize _).sum

其中sc为sparkcontext的对象。可见在spark中,对job的提交都是在action算子中隐式完成的,并不需要用户显式地提交作业。在sparkcontext中job提交的实现中,最后会调用dagscheduler中的job提交接口。dagscheduler最重要的任务之一就是计算job与task的依赖关系,制定调度逻辑。

job调度的基本工作流程如图3-4所示,每个job从提交到完成,都要经历一系列步骤,拆分成以tsk为最小单位,按照一定逻辑依赖关系的执行序列。

图3-4 job的调度流程

图3-5则从job调度流程中的细节模块出发,揭示了工作流程与对应模块之间的关系。从整体上描述了各个类在job调度流程中的交互关系。

图3-5 job调度流程细节

在spark1.5.0的调度目录下的schedulingalgorithm.scala文件中,描述了spark对job的调度模式。

1. fifo模式

默认情况下,spark对job以fifo(先进先出)的模式进行调度。在schedulingalgorithm.scala文件中声明了fifo算法实现。

private[spark] class fifoschedulingalgorithm extends schedulingalgorithm {

  override def comparator(s1: schedulable, s2: schedulable): boolean = {

    //定义优先级

    val priority1 = s1.priority

    val priority2 = s2.priority

    var res = math.signum(priority1 - priority2)

    if (res == 0) {

      val stageid1 = s1.stageid

      val stageid2 = s2.stageid

      //signum是符号函数,返回0(参数等于0)、1(参数大于0)或-1(参数小于0)。

      res = math.signum(stageid1 - stageid2)

    }

    if (res < 0) {

      true

    } else {

      false

  }

}

2. fair模式

spark在fair的模式下,采用轮询的方式为多个job分配资源,调度job。所有的任务优先级大致相同,共享集群计算资源。具体实现代码在schedulingalgorithm.scala文件中,声明如下:

private[spark] class fairschedulingalgorithm extends schedulingalgorithm {

    val minshare1 = s1.minshare

    val minshare2 = s2.minshare

    val runningtasks1 = s1.runningtasks

    val runningtasks2 = s2.runningtasks

    val s1needy = runningtasks1 < minshare1

    val s2needy = runningtasks2 < minshare2

    val minshareratio1 = runningtasks1.todouble / math.max(minshare1, 1.0).todouble

    val minshareratio2 = runningtasks2.todouble / math.max(minshare2, 1.0).todouble

    val tasktoweightratio1 = runningtasks1.todouble / s1.weight.todouble

    val tasktoweightratio2 = runningtasks2.todouble / s2.weight.todouble

    var compare: int = 0

    if (s1needy && !s2needy) {

      return true

    } else if (!s1needy && s2needy) {

      return false

    } else if (s1needy && s2needy) {

      compare = minshareratio1.compareto(minshareratio2)

      compare = tasktoweightratio1.compareto(tasktoweightratio2)

    if (compare < 0) {

    } else if (compare > 0) {

      s1.name < s2.name

3. 配置调度池

dagscheduler构建了具有依赖关系的任务集。taskscheduler负责提供任务给task-setmanager作为调度的先决条件。tasksetmanager负责具体任务集内部的调度任务。调度池(pool)则用于调度每个sparkcontext运行时并存的多个互相独立无依赖关系的任务集。调度池负责管理下一级的调度池和tasksetmanager对象。

用户可以通过配置文件定义调度池的属性。一般调度池支持如下3个参数:

1)调度模式scheduling mode:用户可以设置fifo或者fair调度方式。

2)weight:调度池的权重,在获取集群资源上权重高的可以获取多个资源。

3)minishare:代表计算资源中的cpu核数。

用户可以通过conf/fairscheduler.xml配置调度池的属性,同时要在sparkconf对象中配置属性。

3.2.3 stage(调度阶段)和tasksetmanager的调度

1. stage划分

当一个job被提交后,dagscheduler会从rdd依赖链的末端触发,遍历整个rdd依赖链,划分stage(调度阶段)。划分依据主要基于shuffledependency依赖关系。换句话说,当某rdd在计算中需要将数据进行shuffle操作时,这个包含shuffle操作的rdd将会被用来作为输入信息,构成一个新的stage。以这个基准作为划分stage,可以保证存在依赖关系的数据按照正确数据得到处理和运算。在spark1.5.0的源代码中,dagscheduler.scala中的getparentstages函数的实现从一定角度揭示了stage的划分逻辑。

/**

 * 对于给定的rdd构建或获取父stage的链表。新的stage构建时会包含参数中提供的firstjobid

 */

private def getparentstages(rdd: rdd[_], firstjobid: int): list[stage] = {

   val parents = new hashset[stage]

   val visited = new hashset[rdd[_]]

   // we are manually maintaining a stack here to prevent stackoverflowerror

   // caused by recursively visiting

   val waitingforvisit = new stack[rdd[_]]

   def visit(r: rdd[_]) {

     if (!visited(r)) {

       visited += r

       // kind of ugly: need to register rdds with the cache here since

       // we can't do it in its constructor because # of partitions is unknown

       /* 遍历rdd的依赖链 */

       for (dep <- r.dependencies) {

         dep match {

           /*如果遇见shuffledependency,则依据此依赖关系划分stage,并添加该stage的父stage到哈希列表中*/

           case shufdep: shuffledependency[_, _, _] =>

             parents += getshufflemapstage(shufdep, firstjobid)

           case _ =>

             waitingforvisit.push(dep.rdd)

      }

2. stage调度

在第一步的stage划分过程中,会产生一个或者多个互相关联的stage。其中,真正执行action算子的rdd所在的stage被称为final stage。dagscheduler会从这个final stage生成作业实例。

在stage提交时,dagscheduler首先会判断该stage的父stage的执行结果是否可用。如果所有父stage的执行结果都可用,则提交该stage。如果有任意一个父stage的结果不可用,则尝试迭代提交该父stage。所有结果不可用的stage都将会被加入waiting队列,等待执行,如图3-6所示。

图3-6 stage依赖

在图3-6中,虚箭头表示依赖关系。stage序号越小,表示stage越靠近上游。

图3-6中的stage调度运行顺序如图3-7所示。

图3-7 stage执行顺序

从图3-7可以看出,上游父stage先得到执行,waiting queue中的stage随后得到执行。

3. tasksetmanager

每个stage的提交会被转化为一组task的提交。dagscheduler最终通过调用taskscheduler的接口来提交这组任务。在taskscheduler内部实现中创建了tasksetmanager实例来管理任务集taskset的生命周期。事实上可以说每个stage对应一个tasksetmanager。

至此,dagscheduler的工作基本完毕。taskscheduler在得到集群计算资源时,taskset-manager会分配task到具体worker节点上执行。在spark1.5.0的taskschedulerimpl.scala文件中,提交task的函数实现如下:

override def submittasks(taskset: taskset) {

    val tasks = taskset.tasks

    loginfo("adding task set " + taskset.id + " with " + tasks.length + " tasks")

    this.synchronized {

      /*创建tasksetmanager实例以管理stage包含的任务集*/

      val manager = createtasksetmanager(taskset, maxtaskfailures)

      val stage = taskset.stageid

      val stagetasksets =

        tasksetsbystageidandattempt.getorelseupdate(stage, new hashmap[int, tasksetmanager])

      stagetasksets(taskset.stageattemptid) = manager

      val conflictingtaskset = stagetasksets.exists { case (_, ts) =>

        ts.taskset != taskset && !ts.iszombie

      if (conflictingtaskset) {

        throw new illegalstateexception(s"more than one active taskset for stage $stage:" +

          s" ${stagetasksets.toseq.map{_._2.taskset.id}.mkstring(",")}")

      /*将tasksetmanager添加到全局的调度队列*/

      schedulablebuilder.addtasksetmanager(manager, manager.taskset.properties)

      if (!islocal && !hasreceivedtask) {

        starvationtimer.scheduleatfixedrate(new timertask() {

          override def run() {

            if (!haslaunchedtask) {

              logwarning("initial job has not accepted any resources; " +

                "check your cluster ui to ensure that workers are registered " +

                "and have sufficient resources")

            } else {

              this.cancel()

            }

          }

        }, starvation_timeout_ms, starvation_timeout_ms)

      hasreceivedtask = true

    backend.reviveoffers()

当tasksetmanager进入到调度池中时,会依据job id对tasksetmanager排序,总体上先进入的tasksetmanager先得到调度。对于同一job内的tasksetmanager而言,job id较小的先得到调度。如果有的tasksetmanager父stage还未执行完,则该taskset-manager不会被放到调度池。

3.2.4 task的调度

在dagscheduler.scala中,定义了函数submitmissingtasks,读者阅读完整实现,从中可以看到task的调度方式。限于篇幅,以下截取部分代码。

private def submitmissingtasks(stage: stage, jobid: int) {

  logdebug("submitmissingtasks(" + stage + ")")

  // get our pending tasks and remember them in our pendingtasks entry

  stage.pendingtasks.clear()

  // first figure out the indexes of partition ids to compute.

  /*过滤出计算位置,用以执行计算*/

  val (allpartitions: seq[int], partitionstocompute: seq[int]) = {

    stage match {

      /*针对shufflemap类型的stage*/

      case stage: shufflemapstage =>

        val allpartitions = 0 until stage.numpartitions

        val filteredpartitions = allpartitions.filter { id =>  stage.outputlocs(id).isempty }

        (allpartitions, filteredpartitions)

      /*针对result类型的stage*/

      case stage: resultstage =>

        val job = stage.resultofjob.get

        val allpartitions = 0 until job.numpartitions

        val filteredpartitions = allpartitions.filter { id => ! job.finished(id) }

  .....[以下代码略]

  /*获取task执行的优先节点*/

  private[spark]

  def getpreferredlocs(rdd: rdd[_], partition: int): seq[tasklocation]    = {

    getpreferredlocsinternal(rdd, partition, new hashset)

计算task执行的优先节点位置的代码实现在getpreferredlocsinternal函数中,具体如下:

/*计算位置的递归实现*/

private def getpreferredlocsinternal(

      rdd: rdd[_],

      partition: int,

      visited: hashset[(rdd[_], int)]): seq[tasklocation] = {

    // if the partition has already been visited, no need to re-visit.

    // this avoids exponential path exploration.  spark-695

    if (!visited.add((rdd, partition))) {

      // nil has already been returned for previously visited partitions.

      return nil

    // 如果调用cache缓存过,则计算缓存位置,读取缓存分区中的数据

    val cached = getcachelocs(rdd)(partition)

    if (cached.nonempty) {

      return cached

    // 如果能直接获取到执行地点,则返回作为该task的执行地点

    val rddprefs = rdd.preferredlocations(rdd.partitions(partition)).tolist

    if (rddprefs.nonempty) {

      return rddprefs.map(tasklocation(_))

    /*针对窄依赖关系的rdd, 取出第一个窄依赖的父rdd分区的执行地点*/

    rdd.dependencies.foreach {

      case n: narrowdependency[_] =>

        for (inpart <- n.getparents(partition)) {

          val locs = getpreferredlocsinternal(n.rdd, inpart, visited)

          if (locs != nil) {

            return locs

        }

      case _ =>

    /*对于shuffle依赖的rdd,选取至少含reducer_pref_locs_fraction这么多数据的位置作为优先节点*/

    if (shufflelocalityenabled && rdd.partitions.length < shuffle_pref_reduce_threshold) {

      rdd.dependencies.foreach {

        case s: shuffledependency[_, _, _] =>

          if (s.rdd.partitions.length < shuffle_pref_map_threshold) {

            // get the preferred map output locations for this reducer

            val toplocsforreducer = mapoutputtracker.getlocationswithlargestou-tputs(s.shuffleid,

              partition, rdd.partitions.length, reducer_pref_locs_fraction)

            if (toplocsforreducer.nonempty) {

              return toplocsforreducer.get.map(loc => tasklocation(loc.host, loc.executorid))

        case _ =>

    nil

3.3 spark存储与i/o

前面已经讲过,rdd是按照partition分区划分的,所以rdd可以看作由一些分布在不同节点上的分区组成。由于partition分区与数据块是一一对应的,所以rdd中保存了partitionid与物理数据块之间的映射。物理数据块并非都保存在磁盘上,也有可能保存在内存中。

3.3.1 spark存储系统概览

spark i/o机制可以分为两个层次:

1)通信层:用于master与slave之间传递控制指令、状态等信息,通信层在架构上也采用master-slave结构。

2)存储层:同于保存数据块到内存、磁盘,或远端复制数据块。

下面介绍几个spark存储方面的功能模块。

1)blockmanager:spark提供操作storage的统一接口类。

2)blockmanagermasteractor:master创建,slave利用该模块向master传递信息。

3)blockmanagerslaveactor:slave创建,master利用该模块向slave节点传递控制命令,控制slave节点对block的读写。

4)blockmanagermaster: 管理actor通信。

5)diskstore:支持以文件方式读写的方式操作block。

6)memorystore: 支持内存中的block读写。

7)blockmanagerworker: 对远端异步传输进行管理。

8)connectionmanager:支持本地节点与远端节点数据block的传输。

图3-8概要性地揭示了spark存储系统各个主要模块之间的通信。

图3-8 spark存储系统概览

3.3.2 blockmanager中的通信

存储系统的通信仍然类似master-slave架构,节点之间传递命令与状态。总体而言,master向slave传递命令,slave向master传递信息和状态。这些master与slave节点之间的信息传递通过actor对象实现(关于actor的详细功能会在下一节spark通信机制中讲述)。但在blockmanager中略有不同,下面分别讲述。

1)master节点上的blockmanagermaster包含内容如下:

①blockmanagermasteractor的actor引用。

②blockmanagerslaveactor的ref引用。

2)slave节点上的blockmanagermaster包含内容如下:

①blockmanagermasteractor的ref引用。

②blockmanagerslaveactor的actor引用。

其中,在ref与actor之间的通信由blockmanagermasteractor和blockmanagerslave-actor完成。这个部分相关的源码篇幅较多,此处省略,感兴趣的读者请自行研究。

3.4 spark通信机制

前面介绍过,spark的部署模式可以分为local、standalone、mesos、yarn等。

本节以spark部署在standalone模式下为例,介绍spark的通信机制(其他模式类似)。

3.4.1 分布式通信方式

先介绍分布式通信的几种基本方式。

1. rpc

远程过程调用协议(remote procedure call protocol,rpc)是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。rpc假定某些传输协议的存在,如tcp或udp,为通信程序之间携带信息数据。在osi网络通信模型中,rpc跨越了传输层和应用层。rpc使得开发分布式应用更加容易。rpc采用c/s架构。请求程序就是一个client,而服务提供程序就是一个server。首先,client调用进程发送一个有进程参数的调用信息到service进程,然后等待应答信息。在server端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达时,server获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,client调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

2.  rmi

远程方法调用(remote method invocation,rmi)是java的一组拥护开发分布式应用程序的api。rmi使用java语言接口定义了远程对象,它集合了java序列化和java远程方法协议(java remote method protocol)。简单地说,这样使原先的程序在同一操作系统的方法调用,变成了不同操作系统之间程序的方法调用。由于j2ee是分布式程序平台,它以rmi机制实现程序组件在不同操作系统之间的通信。比如,一个ejb可以通过rmi调用web上另一台机器上的ejb远程方法。rmi可以被看作是rpc的java版本,但是传统rpc并不能很好地应用于分布式对象系统。java rmi 则支持存储于不同地址空间的程序级对象之间彼此进行通信,实现远程对象之间的无缝远程调用。

3.  jms

java消息服务(java message service,jms)是一个与具体平台无关的api,用来访问消息收发。jms 使用户能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 jms 客户机向另一个jms客户机发送消息。消息是 jms 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。jms定义了5种消息正文格式,以及调用的消息类型,允许发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

 streammessage:java原始值的数据流。

 mapmessage:一套名称–值对。

 textmessage:一个字符串对象。

 objectmessage:一个序列化的 java对象。

 bytesmessage:一个未解释字节的数据流。

4. ejb

javaee服务器端组件模型(enterprise javabean,ejb)的设计目标是部署分布式应用程序。简单来说就是把已经编写好的程序打包放在服务器上执行。ejb定义了一个用于开发基于组件的企业多重应用程序的标准。ejb的核心是会话bean(session bean)、实体bean(entity bean)和消息驱动bean(message driven bean)。

5. web service

web service是一个平台独立的、低耦合的、自包含的、基于可编程的web应用程序。可以使用开放的xml(标准通用标记语言下的一个子集)标准来描述、发布、发现、协调和配置这些应用程序,用于开发分布式的应用程序。web service技术能使得运行在不同机器上的不同应用无须借助第三方软硬件, 就可相互交换数据或集成。web service减少了应用接口的花费。web service为整个企业甚至多个组织之间的业务流程的集成提供了一个通用机制。

3.4.2 通信框架akka

akka是一个用scala语言编写的库,用于简化编写容错的、高可伸缩性的java和scala的actor模型应用。它分为开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于jvm的应用。akka使构建高并发的分布式应用变得更加容易。akka已经被成功运用在众多行业的众多大企业,从投资业到商业银行、从零售业到社会媒体、仿真、游戏和赌博、汽车和交通系统、数据分析等。任何需要高吞吐率和低延迟的系统都是使用akka的候选,因此spark选择akka通信框架来支持模块间的通信。

actor模型常见于并发编程,它由carl hewitt于20世纪70年代早期提出,目的是解决分布式编程中的一系列问题。其特点如下:

1) 系统中的所有事物都可以扮演一个actor。

2) actor之间完全独立。

3) 在收到消息时actor采取的所有动作都是并行的。

4) actor有标识和对当前行为的描述。

actor可以看作是一个个独立的实体,它们之间是毫无关联的。但是,它们可以通过消息来通信。当一个actor收到其他actor的信息后,它可以根据需要做出各种响应。消息的类型和内容都可以是任意的。这点与web service类似,只提供接口服务,不必了解内部实现。一个actor在处理多个actor的请求时,通常先建立一个消息队列,每次收到消息后,就放入队列。actor每次也可以从队列中取出消息体来处理,而且这个过程是可循环的,这个特点让actor可以时刻处理发送来的消息。

akka的优势如下:

1) 易于构建并行与分布式应用(simple concurrency & distribution):akka采用异步通信与分布式架构,并对上层进行抽象,如actors、futures、stm等。

2) 可靠性(resilient by design):系统具备自愈能力,在本地/远程都有监护。

3) 高性能(high performance):在单机中每秒可发送5000万个消息。内存占用小,1gb内存中可保存250万个actors。

4) 弹性,无中心(elastic — decentralized):自适应的负责均衡、路由、分区、配置。

5) 可扩展性(extensible):可以使用akka扩展包进行扩展。

3.4.3 client、master 和 worker之间的通信

client、master与worker之间的交互代码实现位于如下路径:

(spark-root)/core/src/main/scala/org/apache/spark/deploy

主要涉及的类包括client.scala、master.scala和worker.scala。这三大模块之间的通信框架如图3-9所示:

图3-9 client、master和worker之间的通信

以standalone部署模式为例,三大模块分工如下:

1)client:提交作业给master。

2)master:接收client提交的作业,管理worker,并命令worker启动driver和executor。

3)worker:负责管理本节点的资源,定期向master汇报心跳信息,接收master的命令,如启动driver和executor。

下面列出client、master与worker的实现代码,读者可以从中看到三个模块间的通信交互。

1. client端通信

private class clientendpoint(

   override val rpcenv: rpcenv,

   driverargs: clientarguments,

   masterendpoints: seq[rpcendpointref],

   conf: sparkconf)

   extends threadsaferpcendpoint with logging {

   <限于篇幅,此处代码省略……>

  override def onstart(): unit = {

  driverargs.cmd match {

    case "launch" =>

      val mainclass = "org.apache.spark.deploy.worker.driverwrapper"

      val classpathconf = "spark.driver.extraclasspath"

      val classpathentries = sys.props.get(classpathconf).toseq.flatmap { cp =>

        cp.split(java.io.file.pathseparator)

      val librarypathconf = "spark.driver.extralibrarypath"

      val librarypathentries = sys.props.get (librarypathconf).toseq.flatmap { cp =>

      val extrajavaoptsconf = "spark.driver.extrajavaoptions"

      val extrajavaopts = sys.props.get(extrajavaoptsconf)

        .map(utils.splitcommandstring).getorelse(seq.empty)

      val sparkjavaopts = utils.sparkjavaopts(conf)

      val javaopts = sparkjavaopts ++ extrajavaopts

      val command = new command(mainclass,

        seq("{{worker_url}}", "{{user_jar}}", driverargs.mainclass) ++ driverargs.driveroptions,

        sys.env, classpathentries, librarypathentries, javaopts)

      /* 创建driverdescription对象 */

      val driverdescription = new driverdescription(

        driverargs.jarurl,

        driverargs.memory,

        driverargs.cores,

        driverargs.supervise,

        command)

      /* 此处向master的actor提交driver*/

      ayncsendtomasterandforwardreply[submitdriverresponse](

        requestsubmitdriver(driverdescription))

    case "kill" =>

      val driverid = driverargs.driverid

      /* 接收停止driver是否成功的通知 */

      ayncsendtomasterandforwardreply[killdriverresponse](requestkill-driver(driverid))

 /* 向master发送消息,并异步地转发返回信息给client */

  private def ayncsendtomasterandforwardreply[t: classtag](message: any): unit = {

    for (masterendpoint <- masterendpoints) {

      masterendpoint.ask[t](message).oncomplete {

        case success(v) => self.send(v)

        case failure(e) =>

          logwarning(s"error sending messages to master $masterendpoint", e)

      }(forwardmessageexecutioncontext)

2. master端通信

private[deploy] class master(

    override val rpcenv: rpcenv,

    address: rpcaddress,

    webuiport: int,

    val securitymgr: securitymanager,

    val conf: sparkconf)

  extends threadsaferpcendpoint with logging with leaderelectable {

  ……

  override def receive: partialfunction[any, unit] = {

     /* 选举为master,当状态为recoverystate.recovering时恢复 */

     case electedleader => {

       val (storedapps, storeddrivers, storedworkers) = persistenceengine.readpersisteddata(rpcenv)

       state = if (storedapps.isempty && storeddrivers.isempty && storedworkers.isempty) {

       recoverystate.alive

       } else {

       recoverystate.recovering

       }

       loginfo("i have been elected leader! new state: " + state)

       if (state == recoverystate.recovering) {

       beginrecovery(storedapps, storeddrivers, storedworkers)

       recoverycompletiontask = forwardmessagethread.schedule(new runnable {

         override def run(): unit = utils.trylognonfatalerror {

         self.send(completerecovery)

         }

       }, worker_timeout_ms, timeunit.milliseconds)

     /* 完成恢复 */

     case completerecovery => completerecovery()

     case revokedleadership => {

        logerror("leadership has been revoked -- master shutting down.")

        system.exit(0)

     }

     /* 注册worker */

     case registerworker(

        id, workerhost, workerport, workerref, cores, memory, workeruiport, publicaddress) => {

        loginfo("registering worker %s:%d with %d cores, %s ram".format(

        workerhost, workerport, cores, utils.megabytestostring(memory)))

        /* 当状态为recoverystate.standby时,不注册 */

        if (state == recoverystate.standby) {

        // ignore, don't send response

        } else if (idtoworker.contains(id)) {

        /* 重复注册,通知注册失败 */

          workerref.send(registerworkerfailed("duplicate worker id"))

          val worker = new workerinfo(id, workerhost, workerport, cores, memory,

          workerref, workeruiport, publicaddress)

          if (registerworker(worker)) {

             /* 注册成功,通知worker节点 */

             persistenceengine.addworker(worker)

             workerref.send(registeredworker(self, masterwebuiurl))

             schedule()

          } else {

             val workeraddress = worker.endpoint.address

             logwarning("worker registration failed. attempted to re-register worker at same " +"address: " + workeraddress)

             /* 注册失败,通知worker节点 */

             workerref.send(registerworkerfailed("attempted to re-register worker at same address: "+ workeraddress))

      /* 通知executor的driver更新状态 */

      case executorstatechanged(appid, execid, state, message, exitstatus) => {

      ……

 override def receiveandreply(context: rpccallcontext): partialfunction[any, unit] = {

  case requestsubmitdriver(description) => {

     /* 当master状态不为alive的时候,通知client无法提交driver */

     if (state != recoverystate.alive) {

       val msg = s"${utils.backup_standalone_master_prefix}: $state. " +

         "can only accept driver submissions in alive state."

       context.reply(submitdriverresponse(self, false, none, msg))

     } else {

       loginfo("driver submitted " + description.command.mainclass)

       val driver = createdriver(description)

       persistenceengine.adddriver(driver)

       waitingdrivers += driver

       drivers.add(driver)

       schedule()

       /* 提交driver */

       context.reply(submitdriverresponse(self, true, some(driver.id), s"driver successfully submitted as ${driver.id}"))

   }  

   case requestkilldriver(driverid) => {

        val msg = s"${utils.backup_standalone_master_prefix}: $state. " + s"can only kill drivers in alive state."

        /* 当master不为alive时,通知无法终止driver */

        context.reply(killdriverresponse(self, driverid, success = false, msg))

        loginfo("asked to kill driver " + driverid)

        val driver = drivers.find(_.id == driverid)

        driver match {

        case some(d) =>

          if (waitingdrivers.contains(d)) {

            /* 当想kill的driver在等待队列中时,删除driver并更新状态为killed */

            waitingdrivers -= d

            self.send(driverstatechanged(driverid, driverstate.killed, none))

            /* 通知worker,driver被终止 */

            d.worker.foreach { w =>

              w.endpoint.send(killdriver(driverid))

          // todo: it would be nice for this to be a synchronous response

          val msg = s"kill request for $driverid submitted"

          loginfo(msg)

          /* 通知请求者,终止driver的请求已提交 */

          context.reply(killdriverresponse(self, driverid, success = true, msg))

      case none =>

        val msg = s"driver $driverid has already finished or does not exist"

        logwarning(msg)

        /* 通知请求者,driver已被终止或不存在 */

   }

 }

 ……

3. worker端通信逻辑

private[deploy] class worker(

   webuiport: int,

   cores: int,

   memory: int,

   masterrpcaddresses: array[rpcaddress],

   systemname: string,

   endpointname: string,

   workdirpath: string = null,

   val conf: sparkconf,

   val securitymgr: securitymanager)

 extends threadsaferpcendpoint with logging {

   ……

   override def receive: partialfunction[any, unit] = {

      /* 注册worker */

      case registeredworker(masterref, masterwebuiurl) =>

          ……

      /* 向master发送心跳 */

      case sendheartbeat =>

          if (connected) { sendtomaster(heartbeat(workerid, self)) }

      /* 清理旧应用的工作目录 */

      case workdircleanup =>

          // spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker

          // rpcendpoint.

          // copy ids so that it can be used in the cleanup thread.

          val appids = executors.values.map(_.appid).toset

          val cleanupfuture = concurrent.future {

      /* 新master选举产生时,work更新master相关信息,包括url等 */

      case masterchanged(masterref, masterwebuiurl) =>

          loginfo("master has changed, new master is at " + masterref.address.tosparkurl)

          changemaster(masterref, masterwebuiurl)

      /* worker向主节点注册失败 */

      case registerworkerfailed(message) =>

         if (!registered) {

            logerror("worker registration failed: " + message)

            system.exit(1)

      /* worker重新连接向master注册 */

      case reconnectworker(masterurl) =>

          loginfo(s"master with url $masterurl requested this worker to reconnect.")

          registerwithmaster()

      /* 启动executor */

      case launchexecutor(masterurl, appid, execid, appdesc, cores_, memory_) =>

          /* 启动executorrunner */

          val manager = new executorrunner(

      /* executor状态改变 */

      case executorstatechanged @ executorstatechanged(appid, execid, state, message, exitstatus) =>

          /* 通知master executor状态改变 */

          handleexecutorstatechanged(executorstatechanged)

      /* 终止当前节点上运行的executor */

      case killexecutor(masterurl, appid, execid) =>

          if (masterurl != activemasterurl) {

             logwarning("invalid master (" + masterurl + ") attempted to launch executor " + execid)

             val fullid = appid + "/" + execid

             executors.get(fullid) match {

                case some(executor) =>

                   loginfo("asked to kill executor " + fullid)

                   executor.kill()

                case none =>

                   loginfo("asked to kill unknown executor " + fullid)

      /* 启动driver */

      case launchdriver(driverid, driverdesc) => {

         loginfo(s"asked to launch driver $driverid")

         /* 创建driverrunner */

         val driver = new driverrunner(...)

         drivers(driverid) = driver

         /* 启动driver */

         driver.start()

         ……

       /* 终止worker节点上运行的driver */

       case killdriver(driverid) => {

         loginfo(s"asked to kill driver $driverid")

         drivers.get(driverid) match {

             case some(runner) =>

                runner.kill()

             case none =>

                logerror(s"asked to kill unknown driver $driverid")

       /* driver状态更新 */             

       case driverstatechanged @ driverstatechanged(driverid, state, exception) => {

          handledriverstatechanged(driverstatechanged)

       ……

3.5 容错机制及依赖

一般而言,对于分布式系统,数据集的容错性通常有两种方式:

1) 数据检查点(在spark中对应checkpoint机制)。

2) 记录数据的更新(在spark中对应lineage血统机制)。

对于大数据分析而言,数据检查点操作成本较高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低,同时会消耗大量存储资源。

spark选择记录更新的方式。但更新粒度过细时,记录更新成本也不低。因此,rdd只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建rdd的一系列变换序列记录下来,以便恢复丢失的分区。

3.5.1 lineage(血统)机制

每个rdd除了包含分区信息外,还包含它从父辈rdd变换过来的步骤,以及如何重建某一块数据的信息,因此rdd的这种容错机制又称“血统”(lineage)容错。lineage本质上很类似于数据库中的重做日志(redo log),只不过这个重做日志粒度很大,是对全局数据做同样的重做以便恢复数据。

相比其他系统的细颗粒度的内存数据更新级别的备份或者log机制,rdd的lineage记录的是粗颗粒度的特定数据transformation操作(如filter、map、join等)。当这个rdd的部分分区数据丢失时,它可以通过lineage获取足够的信息来重新计算和恢复丢失的数据分区。但这种数据模型粒度较粗,因此限制了spark的应用场景。所以可以说spark并不适用于所有高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能方面的提升。

rdd在lineage容错方面采用如下两种依赖来保证容错方面的性能:

窄依赖(narrow dependeny):窄依赖是指父rdd的每一个分区最多被一个子rdd的分区所用,表现为一个父rdd的分区对应于一个子rdd的分区,或多个父rdd的分区对应于一个子rdd的分区。也就是说一个父rdd的一个分区不可能对应一个子rdd的多个分区。其中,1个父rdd分区对应1个子rdd分区,可以分为如下两种情况:

子rdd分区与父rdd分区一一对应(如map、filter等算子)。

一个子rdd分区对应n个父rdd分区(如co-paritioned(协同划分)过的join)。

 宽依赖(wide dependency,源码中称为shuffle dependency):

宽依赖是指一个父rdd分区对应多个子rdd分区,可以分为如下两种情况:

一个父rdd对应所有子rdd分区(未经协同划分的join)。

一个父rdd对应多个rdd分区(非全部分区)(如groupbykey)。

窄依赖与宽依赖关系如图3-10所示。

从图3-10可以看出对依赖类型的划分:根据父rdd分区是对应一个还是多个子rdd分区来区分窄依赖(父分区对应一个子分区)和宽依赖(父分区对应多个子分区)。如果对应多个,则当容错重算分区时,对于需要重新计算的子分区而言,只需要父分区的一部分数据,因此其余数据的重算就导致了冗余计算。

图3-10 两种依赖关系

对于宽依赖,stage计算的输入和输出在不同的节点上,对于输入节点完好,而输出节点死机的情况,在通过重新计算恢复数据的情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上追溯其祖先看是否可以重试(这就是lineage,血统的意思),窄依赖对于数据的重算开销要远小于宽依赖的数据重算开销。

窄依赖和宽依赖的概念主要用在两个地方:一个是容错中相当于redo日志的功能;另一个是在调度中构建dag作为不同stage的划分点(前面调度机制中已讲过)。

依赖关系在lineage容错中的应用总结如下:

1)窄依赖可以在某个计算节点上直接通过计算父rdd的某块数据计算得到子rdd对应的某块数据;宽依赖则要等到父rdd所有数据都计算完成,并且父rdd的计算结果进行hash并传到对应节点上之后,才能计算子rdd。

2)数据丢失时,对于窄依赖,只需要重新计算丢失的那一块数据来恢复;对于宽依赖,则要将祖先rdd中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖时,需要在适当的时机设置数据检查点(checkpoint机制在下节讲述)。可见spark在容错性方面要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。

在spark容错机制中,如果一个节点宕机了,而且运算属于窄依赖,则只要重算丢失的父rdd分区即可,不依赖于其他节点。而宽依赖需要父rdd的所有分区都存在,重算就很昂贵了。更深入地来说:在窄依赖关系中,当子rdd的分区丢失,重算其父rdd分区时,父rdd相应分区的所有数据都是子rdd分区的数据,因此不存在冗余计算。而在宽依赖情况下,丢失一个子rdd分区重算的每个父rdd的每个分区的所有数据并不是都给丢失的子rdd分区使用,其中有一部分数据对应的是其他不需要重新计算的子rdd分区中的数据,因此在宽依赖关系下,这样计算就会产生冗余开销,这也是宽依赖开销更大的原因。为了减少这种冗余开销,通常在lineage血统链比较长,并且含有宽依赖关系的容错中使用checkpoint机制设置检查点。

3.5.2 checkpoint(检查点)机制

通过上述分析可以看出checkpoint的本质是将rdd写入disk来作为检查点。这种做法是为了通过lineage血统做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的rdd开始重做lineage,就会减少开销。

下面从代码层面介绍checkpoint的实现。

1. 设置检查点数据的存取路径[sparkcontext.scala]

/* 设置作为rdd检查点的目录,如果是集群上运行,则必须为hdfs路径 */

def setcheckpointdir(directory: string) {

    // if we are running on a cluster, log a warning if the directory is local.

    // otherwise, the driver may attempt to reconstruct the checkpointed rdd from

    // its own local file system, which is incorrect because the checkpoint files

    // are actually on the executor machines.

    if (!islocal && utils.nonlocalpaths(directory).isempty) {

       logwarning("checkpoint directory must be non-local " +

       "if spark is running on a cluster: " + directory)

    checkpointdir = option(directory).map { dir =>

        val path = new path(dir, uuid.randomuuid().tostring)

        val fs = path.getfilesystem(hadoopconfiguration)

        fs.mkdirs(path)

        fs.getfilestatus(path).getpath.tostring

2. 设置检查点的具体实现

[rdd.scala]

/* 设置检查点入口 */

private[spark] def docheckpoint(): unit = {

    rddoperationscope.withscope(sc, "checkpoint", allownesting = false, ignoreparent = true) {

      if (!docheckpointcalled) {

          docheckpointcalled = true

      if (checkpointdata.isdefined) {

          checkpointdata.get.checkpoint()

      } else {

          /*  */              

          dependencies.foreach(_.rdd.docheckpoint())

[rddcheckpointdata.scala]

/* 设置检查点,在子类中会覆盖此函数以实现具体功能 */

protected def docheckpoint(): checkpointrdd[t]

[reliablerddcheckpointdata.scala]

/* 设置检查点,将rdd内容写入可靠的分布式文件系统中 */

protected override def docheckpoint(): checkpointrdd[t] = {

    /* 为检查点创建输出目录 */

    val path = new path(cpdir)

    val fs = path.getfilesystem(rdd.context.hadoopconfiguration)

    if (!fs.mkdirs(path)) {

        throw new sparkexception(s"failed to create checkpoint path $cpdir")

    /* 保存为文件,加载时作为一个rdd加载 */

    val broadcastedconf = rdd.context.broadcast(

       new serializableconfiguration(rdd.context.hadoopconfiguration))

    /* 重新计算rdd */

    rdd.context.runjob(rdd, reliablecheckpointrdd.writecheckpointfile[t](cpdir, broadcastedconf) _)

    val newrdd = new reliablecheckpointrdd[t](rdd.context, cpdir)

    if (newrdd.partitions.length != rdd.partitions.length) {

    throw new sparkexception(

        s"checkpoint rdd $newrdd(${newrdd.partitions.length}) has different " +

        s"number of partitions from original rdd $rdd(${rdd.partitions.length})")

    /* 当引用不在此范围时,清除检查点文件 */

    if (rdd.conf.getboolean("spark.cleaner.referencetracking.cleancheckpoints", false)) {

         rdd.context.cleaner.foreach { cleaner =>

         cleaner.registerrddcheckpointdataforcleanup(newrdd, rdd.id)

   loginfo(s"done checkpointing rdd ${rdd.id} to $cpdir, new parent is rdd ${newrdd.id}")

   newrdd

3.6 shuffle机制

在mapreduce框架中,shuffle是连接map和reduce之间的桥梁,map的输出要用到reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。spark作为mapreduce框架的一种实现,自然也实现了shuffle的逻辑。对于大数据计算框架而言,shuffle阶段的效率是决定性能好坏的关键因素之一。

3.6.1 什么是shuffle

shuffle是mapreduce框架中的一个特定的阶段,介于map阶段和reduce阶段之间,当map的输出结果要被reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个reducer上,这个过程就是shuffle。直观来讲,spark shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于shuffle涉及了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响整个程序的运行效率。

在mapreduce计算框架中,shuffle连接了map阶段和reduce阶段,即每个reduce task从每个map task产生的数据中读取一片数据,极限情况下可能触发m*r个数据拷贝通道(m是map task数目,r是reduce task数目)。通常shuffle分为两部分:map阶段的数据准备和reduce阶段的数据拷贝。首先,map阶段需根据reduce阶段的task数量决定每个map task输出的数据分片数目,有多种方式存放这些数据分片:

1) 保存在内存中或者磁盘上(spark和mapreduce都存放在磁盘上)。

2) 每个分片对应一个文件(现在spark采用的方式,以及以前mapreduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在mapreduce采用的方式)。

因此可以认为spark shuffle与mapreduce shuffle的设计思想相同,但在实现细节和优化方式上不同。

在spark中,任务通常分为两种,shuffle maptask和reducetask,具体逻辑如图3-11所示:

图3-11 spark shuffle

图3-11中的主要逻辑如下:

1)首先每一个maptask会根据reducetask的数量创建出相应的bucket,bucket的数量是m×r,其中m是map的个数,r是reduce的个数。

2)其次maptask产生的结果会根据设置的partition算法填充到每个bucket中。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中。

当reducetask启动时,它会根据自己task的id和所依赖的mapper的id从远端或本地的block manager中取得相应的bucket作为reducer的输入进行处理。

这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。spark shuffle可以分为两部分:

1) 将数据分成bucket,并将其写入磁盘的过程称为shuffle write。

2) 在存储shuffle数据的节点fetch数据,并执行用户定义的聚集操作,这个过程称为shuffle fetch。

3.6.2 shuffle历史及细节

下面介绍shuffle write与fetch。

1. shuffle write

在spark的早期版本实现中,spark在每一个maptask中为每个reducetask创建一个bucket,并将rdd计算结果放进bucket中。

但早期的shuffle write有两个比较大的问题。

1)map的输出必须先全部存储到内存中,然后写入磁盘。这对内存是非常大的开销,当内存不足以存储所有的map输出时就会出现oom(out of memory)。

2)每个maptask会产生与reducetask数量一致的shuffle文件,如果maptask个数是1k,reducetask个数也是1k,就会产生1m个shuffle文件。这对于文件系统是比较大的压力,同时在shuffle数据量不大而shuffle文件又非常多的情况下,随机写也会严重降低io的性能。

后来到了spark 0.8版实现时,显著减少了shuffle的内存压力,现在map输出不需要先全部存储在内存中,再flush到硬盘,而是record-by-record写入磁盘中。对于shuffle文件的管理也独立出新的shuffleblockmanager进行管理,而不是与rdd cache文件在一起了。

但是spark 0.8版的shuffle write仍然有两个大的问题没有解决。

1)shuffle文件过多的问题。这会导致文件系统的压力过大并降低io的吞吐量。

2)虽然map输出数据不再需要预先存储在内存中然后写入磁盘,从而显著减少了内存压力。但是新引入的diskobjectwriter所带来的buffer开销也是不容小视的内存开销。假定有1k个maptask和1k个reducetask,就会有1m个bucket,相应地就会有1m个write handler,而每一个write handler默认需要100kb内存,那么总共需要100gb内存。这样仅仅是buffer就需要这么多的内存。因此当reducetask数量很多时,内存开销会很大。

为了解决shuffle文件过多的情况,spark后来引入了新的shuffle consolidation,以期显著减少shuffle文件的数量。

shuffle consolidation的原理如图3-12所示:

在图3-12中,假定该job有4个mapper和4个reducer,有2个core能并行运行两个task。可以算出spark的shuffle write共需要16个bucket,也就有了16个write handler。在之前的spark版本中,每个bucket对应一个文件,因此在这里会产生16个shuffle文件。

图3-12 shuffle consolidation

而在shuffle consolidation中,每个bucket并非对应一个文件,而是对应文件中的一个segment。同时shuffle consolidation产生的shuffle文件数量与spark core的个数也有关系。在图3-12中,job中的4个mapper分为两批运行,在第一批2个mapper运行时会申请8个bucket,产生8个shuffle文件;而在第二批mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到之前的8个文件后面,这样一共就只有8个shuffle文件,而在文件内部共有16个不同的segment。因此从理论上讲shuffle consolidation产生的shuffle文件数量为c×r,其中c是spark集群的core number,r是reducer的个数。

很显然,当m=c时,shuffle consolidation产生的文件数和之前的实现相同。

shuffle consolidation显著减少了shuffle文件的数量,解决了spark之前实现中一个比较严重的问题。但是writer handler的buffer开销过大依然没有减少,若要减少writer handler的buffer开销,只能减少reducer的数量,但是这又会引入新的问题。

2. shuffle fetch与aggregator

shuffle write写出去的数据要被reducer使用,就需要shuffle fetch将所需的数据fetch过来。这里的fetch操作包括本地和远端,因为shuffle数据有可能一部分是存储在本地的。在早期版本中,spark对shuffle fetcher实现了两套不同的框架:nio通过socket连接fetch数据;oio通过netty server去fetch数据。分别对应的类是basic-blockfetcheriterator和nettyblockfetcheriterator。

目前在spark1.5.0中做了优化。新版本定义了类shuffleblockfetcheriterator来完成数据的fetch。对于local的数据,shuffleblockfetcheriterator会通过local的blockman-ager来fetch。对于远端的数据块,它通过blocktransferservice类来完成。具体实现参见如下代码:

[shuffleblockfetcheriterator.scala]

/* fetch local数据块 */

private[this] def fetchlocalblocks() {

    val iter = localblocks.iterator

    while (iter.hasnext) {

    val blockid = iter.next()

    try {

       /* 通过blockmanager来fetch数据 */

       val buf = blockmanager.getblockdata(blockid)

       shufflemetrics.inclocalblocksfetched(1)

       shufflemetrics.inclocalbytesread(buf.size)

       buf.retain()

       results.put(new successfetchresult(blockid, blockmanager.blockmanagerid, 0, buf))

    } catch {

       case e: exception =>

         // if we see an exception, stop immediately.

         logerror(s"error occurred while fetching local blocks", e)

         results.put(new failurefetchresult(blockid, blockmanager.blockmanagerid, e))

         return

/* 发送请求获取远端数据 */

private[this] def sendrequest(req: fetchrequest) {

    /* 请求格式 */

    logdebug("sending request for %d blocks (%s) from %s".format(

    req.blocks.size, utils.bytestostring(req.size), req.address.hostport))

    bytesinflight += req.size

    // so we can look up the size of each blockid

    val sizemap = req.blocks.map { case (blockid, size) => (blockid.tostring, size) }.tomap

    val blockids = req.blocks.map(_._1.tostring)

    val address = req.address

    /* fetch数据 */

    shuffleclient.fetchblocks(address.host, address.port, address.executorid, blockids.toarray,

    new blockfetchinglistener {

       override def onblockfetchsuccess(blockid: string, buf: managedbuffer): unit = {

         // only add the buffer to results queue if the iterator is not zombie,

         // i.e. cleanup() has not been called yet.

         if (!iszombie) {

           // increment the ref count because we need to pass this to a different thread.

           // this needs to be released after use.

           buf.retain()

           /* fetch请求成功 */

           results.put(new successfetchresult(blockid(blockid), address, sizemap(blockid), buf))

           shufflemetrics.incremotebytesread(buf.size)

           shufflemetrics.incremoteblocksfetched(1)

    override def onblockfetchfailure(blockid: string, e: throwable):

    /* fetch 失败*/

    ……

在mapreduce的shuffle过程中,shuffle fetch过来的数据会进行归并排序(merge sort),使得相同key下的不同value按序归并到一起供reducer使用,这个过程如图3-13所示:

这些归并排序都是在磁盘上进行的,这样做虽然有效地控制了内存使用,但磁盘io却大幅增加了。虽然spark属于mapreduce体系,但是对传统的mapreduce算法进行了一定的改变。spark假定在大多数应用场景下,shuffle数据的排序不是必须的,如word count。强制进行排序只会使性能变差,因此spark并不在reducer端做归并排序。既然没有归并排序,那spark是如何进行reduce的呢?这就涉及下面要讲的shuffle aggregator了。

图3-13 fetch merge

aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。

在做word count reduce计算count值时,它会将shuffle fetch到的每一个key-value对更新或是插入hashmap中(若在hashmap中没有查找到,则插入其中;若查找到,则更新value值)。这样就不需要预先把所有的key-value进行merge sort,而是来一个处理一个,省去了外部排序这一步骤。但同时需要注意的是,reducer的内存必须足以存放这个partition的所有key和count值,因此对内存有一定的要求。

在上面word count的例子中,因为value会不断地更新,而不需要将其全部记录在内存中,因此内存的使用还是比较少的。考虑一下如果是groupbykey这样的操作,reducer需要得到key对应的所有value。在hadoop mapreduce中,由于有了归并排序,因此给予reducer的数据已经是group by key了,而spark没有这一步,因此需要将key和对应的value全部存放在hashmap中,并将value合并成一个array。可以想象为了能够存放所有数据,用户必须确保每一个partition小到内存能够容纳,这对于内存是非常严峻的考验。因此在spark文档中,建议用户涉及这类操作时尽量增加partition,也就是增加mapper和reducer的数量。

增加mapper和reducer的数量固然可以减小partition的大小,使内存可以容纳这个partition。但是在shuffle write中提到,bucket和对应于bucket的write handler是由mapper和reducer的数量决定的,task越多,bucket就会增加得更多,由此带来write handler所需的buffer也会更多。在一方面我们为了减少内存的使用采取了增加task数量的策略,另一方面task数量增多又会带来buffer开销更大的问题,因此陷入了内存使用的两难境地。

为了减少内存的使用,只能将aggregator的操作从内存移到磁盘上进行,因此spark新版本中提供了外部排序的实现,以解决这个问题。

spark将需要聚集的数据分为两类:不需要归并排序和需要归并排序的数据。对于前者,在内存中的appendonlymap中对数据聚集。对于需要归并排序的数据,现在内存中进行聚集,当内存数据达到阈值时,将数据排序后写入磁盘。事实上,磁盘上的数据只是全部数据的一部分,最后将磁盘数据全部进行归并排序和聚集。具体aggregator的逻辑可以参见aggregator类的实现。

@developerapi

case class aggregator[k, v, c] (

   createcombiner: v => c,

   mergevalue: (c, v) => c,

   mergecombiners: (c, c) => c) {

  // 是否外部排序

  private val isspillenabled = sparkenv.get.conf.getboolean("spark.shuffle.spill", true)

  @deprecated("use combinevaluesbykey with taskcontext argument", "0.9.0")

  def combinevaluesbykey(iter: iterator[_ <: product2[k, v]]): iterator[(k, c)] =

  combinevaluesbykey(iter, null)

  def combinevaluesbykey(iter: iterator[_ <: product2[k, v]],

                     context: taskcontext): iterator[(k, c)] = {

  if (!isspillenabled) {

     /* 创建appendonlymap对象存储了combine集合,每个combine是一个key及对应key的元素seq */

     val combiners = new appendonlymap[k, c]

     var kv: product2[k, v] = null

     val update = (hadvalue: boolean, oldvalue: c) => {

     /* 检查是否处理的是第一个元素,如果是则先创建集合结构,如果不是则直接插入 */

     if (hadvalue) mergevalue(oldvalue, kv._2) else createcombiner(kv._2)

  while (iter.hasnext) {

     kv = iter.next()

     /* 当不采用外排时,利用appendonlymap结构存储数据 */

     combiners.changevalue(kv._1, update)

  combiners.iterator

  } else {

     val combiners = new externalappendonlymap[k, v, c](createcombiner, mergevalue, mergecombiners)

     /* 如果采用外排时,使用externalappendonlymap结构存储聚集数据 */

     combiners.insertall(iter)

     updatemetrics(context, combiners)

     combiners.iterator

……

本节就shuffle的概念与原理先介绍到这里。在下一章讲解spark源码时,会对shuffle的核心机制——shuffle存储做代码层面的讲解。相信学习完本章和第4章的shuffle存储机制后,读者会对shuffle机制掌握得更加深入。

3.7 本章小结

本章主要讲述了spark的工作机制与原理。首先剖析了spark的提交和执行时的具体机制,重点强调了spark程序的宏观执行过程: 提交后的job在spark中形成了rdd dag(有向无环图),然后进入一系列切分调度的过程。在剖析过程中,结合spark的源码呈现了这些调度过程的代码细节。本章后半部分接着剖析了spark的存储及io、spark通信机制,最后讲述了spark的容错机制及shuffle机制。 本章内容比较多,希望读者仔细体会。

继续阅读