當腳本啟動Master流程走到最後的時候,是解析成指令後傳回給spark-class的 exec ${CMD[@]} 真正執行,也就是在這裡真正開始初始化Master。
這裡的指令是:
java -cp ../jar/* org.apache.spark.deploy.master.Master --port 7077 --webui-port 8080
org.apache.spark.deploy.master.Master
先進入master伴生類object master看看。
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
// spark-class傳入參數:--port 7077 --webui-port 8080
def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
// 初始化日志輸出
Utils.initDaemon(log)
// 建立conf對象其實是讀取Spark預設配置的參數
val conf = new SparkConf
// 解析配置,初始化RPC服務和終端
// 通過MasterArguments解析RPC需要的參數:host,port,webui-port
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
/**
* Start the Master and return a three tuple of:
* (1) The Master RpcEnv
* (2) The web UI bound port
* (3) The REST server bound port, if any
*/
// 上面有說明,startRpcEnvAndEndpoint會傳回一個元組包含,RPC,webui,rest服務的綁定端口
// 那麼這裡主要是初始化SecurityManager,RPC,及rest綁定端口
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
// 初始化securityManager
val securityMgr = new SecurityManager(conf)
// 初始化RPC
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
// 向RPC注冊master終端
// 這裡new Master的時候進行了Master的執行個體化
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
// rest的綁定端口
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
SparkConf
可以看到,加載預設配置的時候,是以spark.比對,隻要是spark.開頭的配置都加載了。其實就是之前加載的spark-config.sh,将conf目錄下導入到環境變量中的所有spark配置,主要是spark-default.conf檔案中的設定,下面大概列舉了conf檔案的配置。
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
private val settings = new ConcurrentHashMap[String, String]()
@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader
}
if (loadDefaults) {
loadFromSystemProperties(false)
}
// 加載之前調用spark-config.sh導入的環境變量,比對以spark.開頭的所有配置
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
}
Spark-default.conf
HA環境下,spark-env.sh中的相關配置也會導入到conf中。
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.eventLog.enabled true
spark.eventLog.dir hdfs://spark/sparklog
MasterArguments
接着Master中,MasterArguments方法這裡傳入了spark-class傳入的參數,及預設配置conf,将這些配置進行合并。
private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging {
// 這裡擷取了hostname,及預設定義port7077,webuiport8080
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null
// Check for settings in environment variables
// spark-env.sh配置的參數
// 通過load-spark-env.sh導入到環境變量中,在這裡會加載
// 加載環境變量中的配置,擷取hostname,如果設定了IP,會提示使用hostname
if (System.getenv("SPARK_MASTER_IP") != null) {
logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_HOST")
host = System.getenv("SPARK_MASTER_IP")
}
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
if (System.getenv("SPARK_MASTER_PORT") != null) {
port = System.getenv("SPARK_MASTER_PORT").toInt
}
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
// 将spark-class傳入的參數port,webuiport及預設擷取的host進行解析
parse(args.toList)
// This mutates the SparkConf, so all accesses to it must be made after this line
// 所有預設配置加載,也就是conf,和--properties-file指定的屬性檔案
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
}
@tailrec
// 将傳入參數解析的方法
// 也就是:--port 7077 --webui-port 8080
// scala的list模式比對
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value)
host = value
parse(tail)
case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value)
host = value
parse(tail)
case ("--port" | "-p") :: IntParam(value) :: tail =>
port = value
parse(tail)
case "--webui-port" :: IntParam(value) :: tail =>
webUiPort = value
parse(tail)
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)
case ("--help") :: tail =>
printUsageAndExit(0)
case Nil => // No-op
case _ =>
printUsageAndExit(1)
}
/**
* Print usage and exit JVM with the given exit code.
*/
// 根據退出碼列印幫助資訊及退出
private def printUsageAndExit(exitCode: Int) {
// scalastyle:off println
System.err.println(
"Usage: Master [options]\n" +
"\n" +
"Options:\n" +
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
// scalastyle:on println
System.exit(exitCode)
}
}
loadDefaultSparkProperties
加載預設配置,如果沒有通過--properties-file指定配置檔案,那麼這裡filePath是null。
循環周遊了所有以spark.開頭的配置屬性,加載到path變量中。
/**
* Load default Spark properties from the given file. If no file is provided,
* use the common defaults file. This mutates state in the given SparkConf and
* in this JVM's system properties if the config specified in the file is not
* already set. Return the path of the properties file used.
*/
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
// getDefaultPropertiesFile加載conf目錄下所有配置
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
// 将配置進行周遊,過濾出以spark.開頭的配置
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
// 跟spark-env中導入的配置進行校隊
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}
getDefaultPropertiesFile
通過制定的分隔規則來加載conf目錄下的配置。
/** Return the path of the default Spark properties file. */
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
getPropertiesFromFile
就是對加載的配置進行格式化,解析成k/v。
/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().asScala.map(
k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException =>
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
MasterArguments加載完成,繼續上面Main中,startRpcEnvAndEndpoint()。
初始化SecurityManager,RPC,及rest綁定端口。
SecurityManager安全管理器和RPC機制後面另外再做解析。
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
// 初始化securityManager,賬号和權限管理
val securityMgr = new SecurityManager(conf)
// 初始化RPC
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
// rest綁定端口
// rpc終端端口7077, webUi-port8080,restPort6066
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
繼續往下執行,會啟動RPC,并在向RPC注冊時new Master執行個體化Master,傳回看看class Master到底執行個體化了些什麼。
class Master
初始化了Master的各種參數,并注冊好RPC和Metrics,以及為Worker連接配接準備好了環境。
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
// 發送消息的線程
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
// 加載hadoopconf配置
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
// For application IDs
// 設定app的ID
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
// worker連接配接逾時時間,預設60秒
private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
// 執行完成的app在webUI裡儲存數量,預設200
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
// 儲存執行的driver數量,預設200
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
// 斷開的worker資訊儲存數量,預設15
private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
// worker回複模式,預設none
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
// executor最大運作,預設10
private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
// workers節點資訊
val workers = new HashSet[WorkerInfo]
// appID對應的app資訊
val idToApp = new HashMap[String, ApplicationInfo]
// 隊列中等待執行的app資訊
private val waitingApps = new ArrayBuffer[ApplicationInfo]
// 所有app資訊
val apps = new HashSet[ApplicationInfo]
// workerID對應的worker節點資訊
private val idToWorker = new HashMap[String, WorkerInfo]
// RPC中注冊的ip對應的worker資訊
private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
// app與對應在RPC注冊的資訊
private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
// RPC中注冊的ip與對應的app
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
// 完成的app
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
// driver資訊
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
// Drivers currently spooled for scheduling
private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0
// 檢查host
Utils.checkHost(address.host)
// assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host")
// master注冊Metrics服務
private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
// app注冊Metrics服務
private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
securityMgr)
// 就是包含了上面所有資訊的master
private val masterSource = new MasterSource(this)
// After onStart, webUi will be set
// webUI,在onStart後會設定
private var webUi: MasterWebUI = null
// master的公共通路位址,本地啟動為192.168.2.1
private val masterPublicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else address.host
}
// master的url,即:spark://192.168.2.1:7077
private val masterUrl = address.toSparkURL
private var masterWebUiUrl: String = _
// 恢複狀态為從standby恢複
private var state = RecoveryState.STANDBY
// 持久化引擎
private var persistenceEngine: PersistenceEngine = _
// 選主
private var leaderElectionAgent: LeaderElectionAgent = _
// 恢複已完成tesk
private var recoveryCompletionTask: ScheduledFuture[_] = _
// 逾時的test檢查
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
// 開啟記憶體配置設定之前的臨時解決方案,将app發送到多節點進行計算
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
// 未指定參數的app,預設使用最大cores進行計算
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
// 是否使用反向代理,預設關閉
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
}
// Alternative application submission gateway that is stable across Spark versions
// 備用app送出網關
// 開啟rest服務,預設開啟
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
private var restServer: Option[StandaloneRestServer] = None
private var restServerBoundPort: Option[Int] = None
當new Master到這裡就執行個體化完成,會傳回main中繼續執行,執行到rpcEnv.awaitTermination()會處于等待Worker連接配接狀态。
繼續看看Master繼承了ThreadSafeRpcEndpoint、Logging和LeaderElectable。
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
...
}
}
ThreadSafeRpcEndpoint
ThreadSafeRpcEndpoint是個抽象接口,繼承了的RpcEndpoint。
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
RpcEndpoint是通過RpcEnvFactory工廠模式建立的RpcEnv執行個體。
private[spark] trait RpcEnvFactory {
def create(config: RpcEnvConfig): RpcEnv
}
RPC的生命周期有onStart、receive、onStop,Master重寫了這三個方法,是以Master初始化時也會執行onStart。
onStart
Master的onStart中,主要是将所有資訊準備就緒。
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
// 綁定masterwebUI位址和端口
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
if (reverseProxy) {
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
webUi.addProxy()
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
// rest資訊
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())
// 注冊MetricSystem測量系統
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
// 将MetricSystem測量系統添加到webUI
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
// 序列化配置資訊
val serializer = new JavaSerializer(conf)
// 比對持久化方式ZOOKEEPER、FILESYSTEM、CUSTOM
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
// 這裡調用了選主機制
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
// 預設不做持久化
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
Logging日志系統相關就不在這裡檢視了。
LeaderElectable
選主。master調用的是MonarchyLeaderAgent,而MonarchyLeaderAgent實際是抽象接口LeaderElectable的執行個體對象。
@DeveloperApi
trait LeaderElectionAgent {
val masterInstance: LeaderElectable
def stop() {} // to avoid noops in implementations.
}
@DeveloperApi
trait LeaderElectable {
def electedLeader(): Unit
def revokedLeadership(): Unit
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
// master是單節點實作
private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
extends LeaderElectionAgent {
masterInstance.electedLeader()
}
如果Master注冊選主,通過重寫electedLeader方法,将資訊發送,也就是發送master将進行選主的資訊,如果是關閉,這裡會是發送撤銷。
override def electedLeader() {
self.send(ElectedLeader)
}
override def revokedLeadership() {
self.send(RevokedLeadership)
}
這裡是MasterMessages的ElectedLeader
private[master] object MasterMessages {
// LeaderElectionAgent to Master
case object ElectedLeader
...
}
在下面的receive方法中接收資訊進行處理,判斷選主情況和接受Worker注冊。
// 接受資訊判斷主從情況
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader =>
// 如果選主,通過RPC及持久化資訊進行判斷是否存活
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
// 重新恢複主
case CompleteRecovery => completeRecovery()
// 如果主被撤銷,提示并退出
case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
// 如果是注冊worker
// 這裡需要worker、master資訊和worker配置資訊
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
// 判斷master存活,進行worker提示或者注冊
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
// master存活,根據id判斷worker是否已經存在
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
// master存活,且workerid不重複,進行注冊
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
到這裡Master執行個體化就結束了,所有内部機制和位址端口都已經準備就緒,可以通過webUI進行通路。
Master的位址是啟動腳本的host,端口8080,可以通過 http://host:8080進行通路。
傳回main方法中,繼續往下執行,也就是執行到rpcEnv.awaitTermination(),開始等待Worker連接配接。