天天看点

《深入理解Spark:核心思想与源码分析》——3.2节创建执行环境SparkEnv

本节书摘来自华章社区《深入理解spark:核心思想与源码分析》一书中的第3章,第3.2节创建执行环境sparkenv,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.2 创建执行环境sparkenv

sparkenv是spark的执行环境对象,其中包括众多与executor执行相关的对象。由于在local模式下driver会创建executor,local-cluster部署模式或者standalone部署模式下worker另起的coarsegrainedexecutorbackend进程中也会创建executor,所以sparkenv存在于driver或者coarsegrainedexecutorbackend进程中。创建sparkenv 主要使用sparkenv的createdriverenv,sparkenv.createdriverenv方法有三个参数:conf、islocal和 listenerbus。

上面代码中的conf是对sparkconf的复制,islocal标识是否是单机模式,listenerbus采用监听器模式维护各类事件的处理,在3.4.1节会详细介绍。

sparkenv的方法createdriverenv最终调用create创建sparkenv。sparkenv的构造步骤如下:

1)创建安全管理器securitymanager;

2)创建基于akka的分布式消息系统actorsystem;

3)创建map任务输出跟踪器mapoutputtracker;

4)实例化shufflemanager;

5)创建shufflememorymanager;

6)创建块传输服务blocktransferservice;

7)创建blockmanagermaster;

8)创建块管理器blockmanager;

9)创建广播管理器broadcastmanager;

10)创建缓存管理器cachemanager;

11)创建http文件服务器httpfileserver;

12)创建测量系统metricssystem;

13)创建sparkenv。

3.2.1 安全管理器securitymanager

securitymanager主要对权限、账号进行设置,如果使用hadoop yarn作为集群管理器,则需要使用证书生成 secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用匿名内部类实现,参见代码清单3-2。

代码清单3-2 securitymanager的实现

private val secretkey = generatesecretkey()

3.2.2 基于akka的分布式消息系统actorsystem

actorsystem是spark中最基础的设施,spark既使用它发送分布式消息,又用它实现并发编程。消息系统可以实现并发?要解释清楚这个问题,首先应该简单介绍下scala语言的actor并发编程模型:scala认为java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,降低并发程序的性能,甚至会引入死锁的问题。在scala中只需要自定义类型继承actor,并且提供act方法,就如同java里实现runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(scala发送消息是异步的)传递数据。如:

actor ! message

akka是actor编程模型的高级类库,类似于jdk 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。actorsystem便是akka提供的用于创建分布式消息通信系统的基础类。akka的具体信息见附录b。

正是因为actor轻量级的并发编程、消息发送以及actorsystem支持分布式消息发送等特点,spark选择了actorsystem。

sparkenv中创建actorsystem时用到了akkautils工具类,见代码清单3-3。akkautils.createactorsystem方法用于启动actorsystem,见代码清单3-4。akkautils使用了utils的静态方法startserviceonport, startserviceonport最终会回调方法startservice: int => (t, int),此处的startservice实际是方法docreateactorsystem。真正启动actorsystem是由docreate-actorsystem方法完成的,docreateactorsystem的具体实现细节请见附录b。spark的driver中akka的默认访问地址是akka://sparkdriver,spark的executor中akka的默认访问地址是akka:// sparkexecutor。如果不指定actorsystem的端口,那么所有节点的actorsystem端口在每次启动时随机产生。关于startserviceonport的实现,请见附录a。

代码清单3-3 akkautils工具类创建和启动actorsystem

3.2.3 map任务输出跟踪器mapoutputtracker

mapoutputtracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。每个map任务或者reduce任务都会有其唯一标识,分别为mapid和reduceid。每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务的所在节点上拉取block,这一过程叫做shuffle。每批shuffle过程都有唯一的标识shuffleid。

这里先介绍下mapoutputtrackermaster。mapoutputtrackermaster内部使用mapstatuses:timestampedhashmap[int, array[mapstatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleid,array存储各个map任务对应的状态信息mapstatus。由于mapstatus维护了map输出block的地址blockmanagerid,所以reduce任务知道从何处获取map任务的中间输出。mapoutputtrackermaster还使用cachedserializedstatuses:timestampedhashmap[int, array[byte]]维护序列化后的各个map任务的输出状态。其中key对应shuffleid,array存储各个序列化mapstatus生成的字节数组。

driver和executor处理mapoutputtrackermaster的方式有所不同。

如果当前应用程序是driver,则创建mapoutputtrackermaster,然后创建mapoutputtrackermasteractor,并且注册到actorsystem中。

如果当前应用程序是executor,则创建mapoutputtrackerworker,并从actorsystem中找到mapoutputtrackermasteractor。

无论是driver还是executor,最后都由mapoutputtracker的属性trackeractor持有mapoutputtrackermasteractor的引用,参见代码清单3-5。

代码清单3-5 registerorlookup方法用于查找或者注册actor的实现

在后面章节大家会知道map任务的状态正是由executor向持有的mapoutputtracker-masteractor发送消息,将map任务状态同步到mapoutputtracker的mapstatuses和cached-serializedstatuses的。executor究竟是如何找到mapoutputtrackermasteractor的?registerorlookup方法通过调用akkautils.makedriverref找到mapoutputtrackermasteractor,实际正是利用actorsystem提供的分布式消息机制实现的,具体细节参见附录b。这里第一次使用到了akka提供的功能,以后大家会渐渐感觉到使用akka的便捷。

3.2.4 实例化shufflemanager

shufflemanager负责管理本地及远程的block数据的shuffle操作。shufflemanager默认为通过反射方式生成的sortshufflemanager的实例,可以修改属性spark.shuffle.manager为hash来显式控制使用hashshufflemanager。sortshufflemanager通过持有的indexshuffleblockmanager间接操作blockmanager中的diskblockmanager将map结果写入本地,并根据shuffleid、mapid写入索引文件,也能通过mapoutputtrackermaster中维护的mapstatuses从本地或者其他远程节点读取文件。有读者可能会问,为什么需要shuffle?spark作为并行计算框架,同一个作业会被划分为多个任务在多个节点上并行执行,reduce的输入可能存在于多个节点上,因此需要通过“洗牌”将所有reduce的输入汇总起来,这个过程就是shuffle。这个问题以及对shufflemanager的具体使用会在第5章和第6章详述。shufflemanager的实例化见代码清单3-6。代码清单3-6最后创建的shufflememorymanager将在3.2.5节介绍。

代码清单3-6 shufflemanager的实例化及shufflememorymanager的创建

3.2.5 shuffle线程内存管理器shufflememorymanager

shufflememorymanager负责管理shuffle线程占有内存的分配与释放,并通过thread-memory:mutable.hashmap[long, long]缓存每个线程的内存字节数,见代码清单3-7。

代码清单3-7 shufflememorymanager的数据结构

从上面代码可以看出,shuffle所有线程占用的最大内存的计算公式为:

java运行时最大内存 spark的shuffle最大内存占比 spark的安全内存占比

可以配置属性spark.shuffle.memoryfraction修改spark的shuffle最大内存占比,配置属性spark.shuffle.safetyfraction修改spark的安全内存占比。

shufflememorymanager通常运行在executor中,driver中的shufflememorymanager 只有在local模式下才起作用。

3.2.6 块传输服务blocktransferservice

blocktransferservice默认为nettyblocktransferservice(可以配置属性spark.shuffle.blocktransferservice使用nioblocktransferservice),它使用netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上block的集合。

nettyblocktransferservice的具体实现将在第4章详细介绍。这里大家可能觉得奇怪,这样的网络应用为何也要放在存储体系?大家不妨先带着疑问,直到你真正了解了存储体系。

3.2.7 blockmanagermaster介绍

val blockmanager = new blockmanager(executorid, actorsystem, blockmanagermaster,

val broadcastmanager = new broadcastmanager(isdriver, conf, securitymanager)

private def initialize() {

}

val cachemanager = new cachemanager(blockmanager)

val httpfileserver =

httpfileserver的初始化过程见代码清单3-10,主要包括以下步骤:

1)使用utils工具类创建文件服务器的根目录及临时目录(临时目录在运行时环境关闭时会删除)。utils工具的详细介绍见附录a。

2)创建存放jar包及其他文件的文件目录。

3)创建并启动http服务。

代码清单3-10 httpfileserver的初始化

def initialize() {

httpserver的构造和start方法的实现中,再次使用了utils的静态方法startserviceonport,因此会回调dostart方法,见代码清单3-11。有关jetty的api使用参见附录c。

代码清单3-11 httpserver的启动

def start() {

dostart方法中启动内嵌的jetty所提供的http服务,见代码清单3-12。

代码清单3-12 httpserver的启动功能实现

private def dostart(startport: int): (server, int) = {

val metricssystem = if (isdriver) {

上面调用的createmetricssystem方法实际创建了metricssystem,代码如下。

def createmetricssystem(

构造metricssystem的过程最重要的是调用了metricsconfig的initialize方法,见代码清单3-13。

代码清单3-13 metricsconfig的初始化

从以上实现可以看出,metricsconfig的initialize方法主要负责加载metrics.properties文件中的属性配置,并对属性进行初始化转换。

例如,将属性

{.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, .sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, master.sink.servlet.path=/metrics/master/json}

转换为

map(applications -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/json})

new sparkenv(executorid, actorsystem, serializer, closureserializer, cachemanager,

metricssystem, shufflememorymanager, conf)