天天看點

Spark2.2源碼分析:Spark-Submit送出任務

Spark2.2源碼閱讀順序

1. Spark2.2源碼分析:Spark-Submit送出任務

2. Spark2.2源碼分析:Driver的注冊與啟動

用戶端通過spark-submit指令送出作業後,會在spark-submit程序裡做一系列操作(對應圖中0部分)

spark叢集啟動後會幹的事情大概畫圖如下:

Spark2.2源碼分析:Spark-Submit送出任務

概述整體步驟

1.先執行spark-submit腳本,準備參數,選擇叢集管理器

2.啟動driver,注冊application,啟動executor,劃分任務,分發任務

3.傳回(或則落地)計算結果,spark任務計算完成

1.使用者送出Spark指令如下

./bin/spark-submit \
  --class cn.face.cdp.run.WordCount \
  --master spark://192.168.1.11:7077 \
  --deploy-mode cluster \
  --executor-memory 4G \
  --total-executor-cores 20 \
  --supervise \
  ./face.jar \
  city
           

這個sh内部會執行一個javal類的main方法

export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
           

顯而易見,我們找到這個類的main方法就能一窺究竟

org.apache.spark.deploy.SparkSubmit

override def main(args: Array[String]): Unit = {
	//檢查參數封裝後傳回
    val appArgs = new SparkSubmitArguments(args) 
   ...
    //比對傳過來的類型,這裡走submit case
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }
           
private def submit(args: SparkSubmitArguments): Unit = {
 //這裡的prepareSubmitEnvironment非常複雜,主要負責設定環境變量,系統參數,選擇叢集管理器等等
 //傳回的childMainClass預設選擇了"org.apache.spark.deploy.Client",由這個類來當作入口送出Driver
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
	        ...
        }
      } else {
      .	//runMain方法就直接用反射調用了Client類的main方法
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
    }
           

org.apache.spark.deploy.Client

def main(args: Array[String]) {
	...
    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)

    if (!conf.contains("spark.rpc.askTimeout")) {
      conf.set("spark.rpc.askTimeout", "10s")
    }
    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
	//執行個體化自身RPC通信終端,執行個體化master引用端
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

    rpcEnv.awaitTermination()
  }
           

ClientEndpoint這個類繼承了ThreadSafeRpcEndpoint,是以會重寫onStart,并自動調用

override def onStart(): Unit = {
    driverArgs.cmd match {
      case "launch" =>
      	//如果用的Standalone cluster模式,就會啟動這個程序
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
		...
        val command = new Command(mainClass,
          Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)
		//建立driver資訊對象,并且發送消息到master進行注冊
        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
        ayncSendToMasterAndForwardReply[SubmitDriverResponse](
        //這裡發送的caseClass是RequestSubmitDriver,是以去master找到這個case接受邏輯就行
          RequestSubmitDriver(driverDescription))
      case "kill" =>
        val driverId = driverArgs.driverId
        ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
    }
  }
           

org.apache.spark.deploy.master.Master

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 	//此case處理送出Driver請求
    case RequestSubmitDriver(description) =>
      //如果此master不處于存活狀态,傳回client false狀态
      if (state != RecoveryState.ALIVE) {
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        //建立Driver資訊,由此master程序維護
        val driver = createDriver(description)
        //持久化driver資訊,以用于之後的主備切換或者重新開機能重讀driver資訊
        persistenceEngine.addDriver(driver)
        //加入“等待排程的driver清單”
        waitingDrivers += driver
      	//加入master記憶體中所管理的driver清單
        drivers.add(driver)
        //由于有新的driver需要運作,是以開始排程資源
        schedule()
        //傳回消息給Client,Client結束程序
        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }
      case ...
}
           

org.apache.spark.deploy.Client

override def receive: PartialFunction[Any, Unit] = {

    case SubmitDriverResponse(master, success, driverId, message) =>
      logInfo(message)
      if (success) {
        activeMasterEndpoint = master
        //這個方法主要是,等5秒後再次去master查詢目前driver的狀态,然後列印日志,最後結束目前程序
        pollAndReportStatus(driverId.get)
		//這個分支是如果請求的master不為live,則直接退出程序
      } else if (!Utils.responseFromBackup(message)) {
        System.exit(-1)
      }
    case ...
}
           

至此,spark-submit所執行的流程結束(Client程序結束,用戶端與叢集斷開連接配接),接下來就是driver注冊與啟動

2. Spark2.2源碼分析:Driver的注冊與啟動