天天看點

spark源碼分析一(Master)Master

Master

源碼版本2.4.7

start-master.sh

啟動主類:

org.apache.spark.deploy.master.Master

spark源碼分析一(Master)Master

Master是一個伴生類

首先找到main方法

def main(argStrings: Array[String]) {
    Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
      exitOnUncaughtException = false))
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    //建構rpc通信環境,核心代碼
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }
           

RpcEnv是關鍵:

def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    //建立nettyrpcenv
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    //啟動rpc通信,并且異步調起Master OnStart方法
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
           

RpcEnv.create傳回一個是一個NettyRpcEnv,可知spark2.4.7通信是基于netty, 後續網絡互動都是通過RpcEnv(NettyRpcEnv).

rpcEnv.setupEndpoint 最終會調用Master的onStart方法–異步

rpcEnv裡面啟動了Transportserver  通信元件
注冊了rpcendpoint->dispatcher分發器->endpointdata->inbox->process
           

master的消息處理 《偏函數PartialFunction》

worker注冊、銷毀
Application
Driver
launchExecutor
           
spark源碼分析一(Master)Master

繼續閱讀