天天看点

深入理解Spark:核心思想与源码分析. 3.1 SparkContext概述

<b>3.1 sparkcontext概述</b>

spark driver用于提交用户应用程序,实际可以看作spark的客户端。了解spark

driver的初始化,有助于读者理解用户应用程序在客户端的处理过程。

spark driver的初始化始终围绕着sparkcontext的初始化。sparkcontext可以算得上是所有spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动。sparkcontext初始化完毕,才能向spark集群提交任务。在平坦的公路上,发动机只需以较低的转速、较低的功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机才能满足你的需求。这些参数都是通过驾驶员操作油门、档位等传送给发动机的,而sparkcontext的配置参数则由sparkconf负责,sparkconf就是你的操作面板。

sparkconf的构造很简单,主要是通过concurrenthashmap来维护各种spark的配置属性。sparkconf代码结构见代码清单3-1。spark的配置属性都是以“spark.”开头的字符串。

代码清单3-1 sparkconf代码结构

class sparkconf(loaddefaults: boolean)

extends cloneable with logging {

import sparkconf._

def this() = this(true)

private val settings = new concurrenthashmap[string, string]()

if (loaddefaults) {

// 加载任何以spark.开头的系统属性

for ((key, value) &lt;- utils.getsystemproperties if

key.startswith("spark.")) {

set(key, value)

}

    }

//其余代码省略

现在开始介绍sparkcontext。sparkcontext的初始化步骤如下:

1)创建spark执行环境sparkenv;

2)创建rdd清理器metadatacleaner;

3)创建并初始化spark ui;

4)hadoop相关配置及executor环境变量的设置;

5)创建任务调度taskscheduler;

6)创建和启动dagscheduler;

7)taskscheduler的启动;

8)初始化块管理器blockmanager(blockmanager是存储体系的主要组件之一,将在第4章介绍);

9)启动测量系统metricssystem;

10)创建和启动executor分配管理器executorallocationmanager;

11)contextcleaner的创建与启动;

12)spark环境更新;

13)创建dagschedulersource和blockmanagersource;

14)将sparkcontext标记为激活。

sparkcontext的主构造器参数为sparkconf,其实现如下。

class sparkcontext(config: sparkconf)

extends logging with executorallocationclient {

private val creationsite: callsite =

utils.getcallsite()

private val allowmultiplecontexts: boolean =

config.getboolean("spark.driver.allowmultiplecontexts", false)

sparkcontext.markpartiallyconstructed(this, allowmultiplecontexts)

上面代码中的callsite存储了线程栈中最靠近栈顶的用户类及最靠近栈底的scala或者spark核心类信息。utils.getcallsite的详细信息见附录a。sparkcontext默认只有一个实例(由属性spark.driver.allowmultiplecontexts来控制,用户需要多个sparkcontext实例时,可以将其设置为true),方法markpartiallyconstructed用来确保实例的唯一性,并将当前sparkcontext标记为正在构建中。

接下来会对sparkconf进行复制,然后对各种配置信息进行校验,代码如下。

private[spark] val conf = config.clone()

conf.validatesettings()

if

(!conf.contains("spark.master")) {

throw new sparkexception("a master url must be set in your

configuration")

(!conf.contains("spark.app.name")) {

throw new sparkexception("an application name must be set in your

从上面校验的代码看到必须指定属性spark.master 和spark.app.name,否则会抛出异常,结束初始化过程。spark.master用于设置部署模式,spark.app.name用于指定应用程序名称。

继续阅读