Master
源碼版本2.4.7
start-master.sh
啟動主類:
org.apache.spark.deploy.master.Master
Master是一個伴生類
首先找到main方法
def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
//建構rpc通信環境,核心代碼
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
RpcEnv是關鍵:
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
//建立nettyrpcenv
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
//啟動rpc通信,并且異步調起Master OnStart方法
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
RpcEnv.create傳回一個是一個NettyRpcEnv,可知spark2.4.7通信是基于netty, 後續網絡互動都是通過RpcEnv(NettyRpcEnv).
rpcEnv.setupEndpoint 最終會調用Master的onStart方法–異步
rpcEnv裡面啟動了Transportserver 通信元件
注冊了rpcendpoint->dispatcher分發器->endpointdata->inbox->process
master的消息處理 《偏函數PartialFunction》
worker注冊、銷毀
Application
Driver
launchExecutor