Spark2.2源碼閱讀順序
1. Spark2.2源碼分析:Spark-Submit送出任務
2. Spark2.2源碼分析:Driver的注冊與啟動
用戶端通過spark-submit指令送出作業後,會在spark-submit程序裡做一系列操作(對應圖中0部分)
spark叢集啟動後會幹的事情大概畫圖如下:
概述整體步驟
1.先執行spark-submit腳本,準備參數,選擇叢集管理器
2.啟動driver,注冊application,啟動executor,劃分任務,分發任務
3.傳回(或則落地)計算結果,spark任務計算完成
1.使用者送出Spark指令如下
./bin/spark-submit \
--class cn.face.cdp.run.WordCount \
--master spark://192.168.1.11:7077 \
--deploy-mode cluster \
--executor-memory 4G \
--total-executor-cores 20 \
--supervise \
./face.jar \
city
這個sh内部會執行一個javal類的main方法
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
顯而易見,我們找到這個類的main方法就能一窺究竟
org.apache.spark.deploy.SparkSubmit
override def main(args: Array[String]): Unit = {
//檢查參數封裝後傳回
val appArgs = new SparkSubmitArguments(args)
...
//比對傳過來的類型,這裡走submit case
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
private def submit(args: SparkSubmitArguments): Unit = {
//這裡的prepareSubmitEnvironment非常複雜,主要負責設定環境變量,系統參數,選擇叢集管理器等等
//傳回的childMainClass預設選擇了"org.apache.spark.deploy.Client",由這個類來當作入口送出Driver
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
...
}
} else {
. //runMain方法就直接用反射調用了Client類的main方法
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
org.apache.spark.deploy.Client
def main(args: Array[String]) {
...
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
//執行個體化自身RPC通信終端,執行個體化master引用端
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
ClientEndpoint這個類繼承了ThreadSafeRpcEndpoint,是以會重寫onStart,并自動調用
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
//如果用的Standalone cluster模式,就會啟動這個程序
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
...
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
//建立driver資訊對象,并且發送消息到master進行注冊
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
//這裡發送的caseClass是RequestSubmitDriver,是以去master找到這個case接受邏輯就行
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}
org.apache.spark.deploy.master.Master
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
//此case處理送出Driver請求
case RequestSubmitDriver(description) =>
//如果此master不處于存活狀态,傳回client false狀态
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
//建立Driver資訊,由此master程序維護
val driver = createDriver(description)
//持久化driver資訊,以用于之後的主備切換或者重新開機能重讀driver資訊
persistenceEngine.addDriver(driver)
//加入“等待排程的driver清單”
waitingDrivers += driver
//加入master記憶體中所管理的driver清單
drivers.add(driver)
//由于有新的driver需要運作,是以開始排程資源
schedule()
//傳回消息給Client,Client結束程序
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
case ...
}
org.apache.spark.deploy.Client
override def receive: PartialFunction[Any, Unit] = {
case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
//這個方法主要是,等5秒後再次去master查詢目前driver的狀态,然後列印日志,最後結束目前程序
pollAndReportStatus(driverId.get)
//這個分支是如果請求的master不為live,則直接退出程序
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}
case ...
}
至此,spark-submit所執行的流程結束(Client程序結束,用戶端與叢集斷開連接配接),接下來就是driver注冊與啟動
2. Spark2.2源碼分析:Driver的注冊與啟動