天天看點

從源碼角度看Spark on yarn client & cluster模式的本質差別

<a href="http://s4.51cto.com/wyfs02/M00/A6/14/wKioL1nIwW2hLNraAAG9DgGZxQk485.jpg-wh_651x-s_3797747829.jpg" target="_blank"></a>

首先區分下AppMaster和Driver,任何一個yarn上運作的任務都必須有一個AppMaster,而任何一個Spark任務都會有一個Driver,Driver就是運作SparkContext(它會建構TaskScheduler和DAGScheduler)的程序,當然在Driver上你也可以做很多非Spark的事情,這些事情隻會在Driver上面執行,而由SparkContext上牽引出來的代碼則會由DAGScheduler分析,并形成Job和Stage交由TaskScheduler,再由TaskScheduler交由各Executor分布式執行。

是以Driver和AppMaster是兩個完全不同的東西,Driver是控制Spark計算和任務資源的,而AppMaster是控制yarn app運作和任務資源的,隻不過在Spark on Yarn上,這兩者就出現了交叉,而在standalone模式下,資源則由Driver管理。在Spark on Yarn上,Driver會和AppMaster通信,資源的申請由AppMaster來完成,而任務的排程和執行則由Driver完成,Driver會通過與AppMaster通信來讓Executor的執行具體的任務。

client與cluster的差別

對于yarn-client和yarn-cluster的唯一差別在于,yarn-client的Driver運作在本地,而AppMaster運作在yarn的一個節點上,他們之間進行遠端通信,AppMaster隻負責資源申請和釋放(當然還有DelegationToken的重新整理),然後等待Driver的完成;而yarn-cluster的Driver則運作在AppMaster所在的container裡,Driver和AppMaster是同一個程序的兩個不同線程,它們之間也會進行通信,AppMaster同樣等待Driver的完成,進而釋放資源。

Spark裡AppMaster的實作:org.apache.spark.deploy.yarn.ApplicationMaster Yarn裡MapReduce的AppMaster實作:org.apache.hadoop.mapreduce.v2.app.MRAppMaster

在yarn-client模式裡,優先運作的是Driver(我們寫的應用代碼就是入口),然後在初始化SparkContext的時候,會作為client端向yarn申請AppMaster資源,當AppMaster運作後,它會向yarn注冊自己并申請Executor資源,之後由本地Driver與其通信控制任務運作,而AppMaster則時刻監控Driver的運作情況,如果Driver完成或意外退出,AppMaster會釋放資源并登出自己。是以在該模式下,如果運作spark-submit的程式退出了,整個任務也就退出了

在yarn-cluster模式裡,本地程序則僅僅隻是一個client,它會優先向yarn申請AppMaster資源運作AppMaster,在運作AppMaster的時候通過反射啟動Driver(我們的應用代碼),在SparkContext初始化成功後,再向yarn注冊自己并申請Executor資源,此時Driver與AppMaster運作在同一個container裡,是兩個不同的線程,當Driver運作完畢,AppMaster會釋放資源并登出自己。是以在該模式下,本地程序僅僅是一個client,如果結束了該程序,整個Spark任務也不會退出,因為Driver是在遠端運作的

下面從源碼的角度看看SparkSubmit的代碼調用(基于Spark2.0.0):

代碼公共部分

SparkSubmit#main =&gt;

val appArgs = new SparkSubmitArguments(args) 

appArgs.action match { 

  // normal spark-submit 

  case SparkSubmitAction.SUBMIT =&gt; submit(appArgs) 

  // use --kill specified 

  case SparkSubmitAction.KILL =&gt; kill(appArgs) 

  // use --status specified 

  case SparkSubmitAction.REQUEST_STATUS =&gt; requestStatus(appArgs) 

SparkSubmit的main方法是在使用者使用spark-submit腳本送出Spark app的時候調用的,可以看到正常情況下,它會調用SparkSubmit#submit方法

SparkSubmit#submit =&gt;

val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 

// 此處省略掉代理賬戶,異常處理,送出失敗的重送出邏輯,隻看主幹代碼 

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 

在submit方法内部,會先進行送出環境相關的處理,調用的是SparkSubmit#prepareSubmitEnvironment方法,之後利用拿到的mainClass等資訊,再調用SparkSubmit#runMain方法來執行對于主函數

SparkSubmit#prepareSubmitEnvironment =&gt;

主幹相關的代碼如下:

// yarn client mode 

if (deployMode == CLIENT) { 

  // client 模式下,運作的是 --class 後指定的mainClass,也即我們的代碼 

  childMainClass = args.mainClass 

  if (isUserJar(args.primaryResource)) { 

    childClasspath += args.primaryResource 

  } 

  if (args.jars != null) { childClasspath ++= args.jars.split(",") } 

  if (args.childArgs != null) { childArgs ++= args.childArgs } 

// yarn cluster mode 

val isYarnCluster = clusterManager == YARN &amp;&amp; deployMode == CLUSTER 

if (isYarnCluster) { 

  // cluster 模式下,運作的是Client類 

  childMainClass = "org.apache.spark.deploy.yarn.Client" 

  if (args.isPython) { 

    childArgs += ("--primary-py-file", args.primaryResource) 

    childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") 

  } else if (args.isR) { 

    val mainFile = new Path(args.primaryResource).getName 

    childArgs += ("--primary-r-file", mainFile) 

    childArgs += ("--class", "org.apache.spark.deploy.RRunner") 

  } else { 

    if (args.primaryResource != SparkLauncher.NO_RESOURCE) { 

      childArgs += ("--jar", args.primaryResource) 

    } 

    // 這裡 --class 指定的是AppMaster裡啟動的Driver,也即我們的代碼 

    childArgs += ("--class", args.mainClass) 

  if (args.childArgs != null) { 

    args.childArgs.foreach { arg =&gt; childArgs += ("--arg", arg) } 

在 prepareSubmitEnvironment 裡,主要負責解析使用者參數,設定環境變量env,處理python/R等依賴,然後針對不同的部署模式,比對不同的運作主類,比如: yarn-client&gt;args.mainClass,yarn-cluster&gt;o.a.s.deploy.yarn.Client

SparkSubmit#runMain =&gt;

骨幹代碼如下

try { 

  mainClass = Utils.classForName(childMainClass) 

} catch { 

  // ... 

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) 

  // childArgs就是使用者自己傳給Spark應用代碼的參數 

  mainMethod.invoke(null, childArgs.toArray) 

在runMain方法裡,會設定ClassLoader,根據使用者代碼優先的設定(spark.driver.userClassPathFirst)來加載對應的類,然後反射調用prepareSubmitEnvironment方法傳回的主類,并調用其main方法

從所反射的不同主類,我們來看看具體調用方式的不同:

對于yarn-cluster

o.a.s.deploy.yarn.Client#main =&gt;

val sparkConf = new SparkConf  

val args = new ClientArguments(argStrings) 

new Client(args, sparkConf).run() 

在Client伴生對象裡建構了Client類的對象,然後調用了Client#run方法

o.a.s.deploy.yarn.Client#run =&gt;

this.appId = submitApplication() 

// report application ... 

run方法核心的就是送出任務到yarn,其調用了Client#submitApplication方法,拿到送出完的appID後,監控app的狀态

o.a.s.deploy.yarn.Client#submitApplication =&gt;

  // 擷取送出使用者的Credentials,用于後面擷取delegationToken 

  setupCredentials() 

  yarnClient.init(yarnConf) 

  yarnClient.start() 

  // Get a new application from our RM 

  val newApp = yarnClient.createApplication() 

  val newAppResponse = newApp.getNewApplicationResponse() 

  // 拿到appID 

  appId = newAppResponse.getApplicationId() 

  // 報告狀态 

  reportLauncherState(SparkAppHandle.State.SUBMITTED) 

  launcherBackend.setAppId(appId.toString) 

  // Verify whether the cluster has enough resources for our AM 

  verifyClusterResources(newAppResponse) 

  // 建立AppMaster運作的context,為其準備運作環境,java options,以及需要運作的java指令,AppMaster通過該指令在yarn節點上啟動 

  val containerContext = createContainerLaunchContext(newAppResponse) 

  val appContext = createApplicationSubmissionContext(newApp, containerContext) 

  // Finally, submit and monitor the application 

  logInfo(s"Submitting application $appId to ResourceManager") 

  yarnClient.submitApplication(appContext) 

  appId 

  case e: Throwable =&gt; 

    if (appId != null) { 

      cleanupStagingDir(appId) 

    throw e 

在 submitApplication 裡完成了app的申請,AppMaster context的建立,最後完成了任務的送出,對于cluster模式而言,任務送出後本地程序就隻是一個client而已,Driver就運作在與AppMaster同一container裡,對于client模式而言,執行 submitApplication 方法時,Driver已經在本地運作,這一步就隻是送出任務到yarn而已

o.a.s.deploy.yarn.Client#createContainerLaunchContext

val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) 

// 非pySpark時,pySparkArchives為Nil 

val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) 

// 這一步會進行delegationtoken的擷取,存于Credentials,在AppMasterContainer建構完的最後将其存入到context裡 

val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) 

val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) 

// 設定AppMaster container運作的資源和環境 

amContainer.setLocalResources(localResources.asJava) 

amContainer.setEnvironment(launchEnv.asJava) 

// 設定JVM參數 

val javaOpts = ListBuffer[String]() 

javaOpts += "-Djava.io.tmpdir=" + tmpDir 

// other java opts setting... 

// 對于cluster模式,通過 --class 指定AppMaster運作我們的Driver端,對于client模式則純作為資源申請和配置設定的工具 

val userClass = 

  if (isClusterMode) { 

    Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) 

    Nil 

// 設定AppMaster運作的主類 

val amClass = 

    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName 

    // ExecutorLauncher隻是ApplicationMaster的一個warpper 

    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName 

val amArgs = 

  Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ 

    userArgs ++ Seq( 

      "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), 

        LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) 

// Command for the ApplicationMaster 

val commands = prefixEnv ++ Seq( 

    YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" 

  ) ++ 

  javaOpts ++ amArgs ++ 

  Seq( 

    "1&gt;", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", 

    "2&gt;", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") 

val printableCommands = commands.map(s =&gt; if (s == null) "null" else s).toList 

// 設定需運作的指令 

amContainer.setCommands(printableCommands.asJava) 

val securityManager = new SecurityManager(sparkConf) 

// 設定應用權限 

amContainer.setApplicationACLs( 

      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) 

// 設定delegationToken 

setupSecurityToken(amContainer) 

對于yarn-client

args.mainClass =&gt;

在我們的Spark代碼裡,需要建立一個SparkContext來執行Spark任務,而在其構造器裡建立TaskScheduler的時候,對于client模式就會向yarn申請資源送出任務,如下

// 調用createTaskScheduler方法,對于yarn模式,master=="yarn" 

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 

_schedulerBackend = sched 

_taskScheduler = ts 

// 建立DAGScheduler 

_dagScheduler = new DAGScheduler(this) 

SparkContext#createTaskScheduler =&gt;

這裡會根據master比對不同模式,比如local/standalone/yarn,在yarn模式下會利用ServiceLoader裝載YarnClusterManager,然後由它建立TaskScheduler和SchedulerBackend,如下:

// 當為yarn模式的時候 

case masterUrl =&gt; 

  // 利用目前loader裝載YarnClusterManager,masterUrl為"yarn" 

  val cm = getClusterManager(masterUrl) match { 

    case Some(clusterMgr) =&gt; clusterMgr 

    case None =&gt; throw new SparkException("Could not parse Master URL: '" + master + "'") 

  try { 

    // 建立TaskScheduler,這裡masterUrl并沒有用到 

    val scheduler = cm.createTaskScheduler(sc, masterUrl) 

    // 建立SchedulerBackend,對于client模式,這一步會向yarn申請AppMaster,送出任務 

    val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 

    cm.initialize(scheduler, backend) 

    (backend, scheduler) 

  } catch { 

    case se: SparkException =&gt; throw se 

    case NonFatal(e) =&gt; 

      throw new SparkException("External scheduler cannot be instantiated", e) 

YarnClusterManager#createSchedulerBackend

sc.deployMode match { 

  case "cluster" =&gt; 

    new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) 

  case "client" =&gt; 

    new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) 

  case  _ =&gt; 

    throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") 

可以看到yarn下的SchedulerBackend實作對于client和cluster模式是不同的,yarn-client模式為YarnClientSchedulerBackend,yarn-cluster模式為 YarnClusterSchedulerBackend,之是以不同,是因為在client模式下,YarnClientSchedulerBackend 相當于 yarn application 的client,它會調用o.a.s.deploy.yarn.Client#submitApplication 來準備環境,申請資源并送出yarn任務,如下:

val driverHost = conf.get("spark.driver.host") 

val driverPort = conf.get("spark.driver.port") 

val hostport = driverHost + ":" + driverPort 

sc.ui.foreach { ui =&gt; conf.set("spark.driver.appUIAddress", ui.appUIAddress) } 

val argsArrayBuf = new ArrayBuffer[String]() 

argsArrayBuf += ("--arg", hostport) 

val args = new ClientArguments(argsArrayBuf.toArray) 

totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) 

// 建立o.a.s.deploy.yarn.Client對象 

client = new Client(args, conf) 

// 調用submitApplication準備環境,申請資源,送出任務,并把appID儲存下來 

// 對于submitApplication,前文有詳細的分析,這裡與前面是一緻的 

bindToYarn(client.submitApplication(), None) 

而在 YarnClusterSchedulerBackend 裡,由于 AppMaster 已經運作起來了,是以它并不需要再做申請資源等等工作,隻需要儲存appID和attemptID并啟動SchedulerBackend即可.

本文作者:佚名

來源:51CTO