前面解讀了Worker的啟動流程,5. Spark源碼解析之worker啟動流程解析
這裡解讀下Worker的執行個體化流程。
Worker執行個體化
通過RPC中new Worker,開始真正的執行個體化Worker。
private[deploy] class Worker(
// 定義worker需要的參數,不用解讀了
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
// endpointName=Worker
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {
// 擷取RPC的host和port
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port
// 檢測host和port
Utils.checkHost(host)
assert (port > 0)
// A scheduled executor used to send messages at the specified time.
// 定時發送消息的排程器
private val forwordMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
// A separated thread to clean up the workDir and the directories of finished applications.
// Used to provide the implicit parameter of `Future` methods.
// 清理workerDir和已完成任務的子線程
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
// For worker and executor IDs
// 擷取時間作為worker及executor的ID
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
// 每15秒發送一次心跳報告
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
// Model retries to connect to the master, after Hadoop's model.
// The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
// Afterwards, the next 10 attempts are between 30 and 90 seconds.
// A bit of randomness is introduced so that not all of the workers attempt to reconnect at
// the same time.
// 連接配接master失敗後的重連設定
// 前5次重試間隔5-15秒之間,後面10次間隔在30-90秒之間
// 引入了随機性,是以基本不存多個worker在同一時間重連
// 後10次重連的開始數
private val INITIAL_REGISTRATION_RETRIES = 6
// 總重試15次
private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
// 重試最低間隔0.5秒
private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
// 每次重連間隔+0.5秒+随機數
randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
}
// 前5次的重連間隔時間 5-15秒
private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 *
REGISTRATION_RETRY_FUZZ_MULTIPLIER))
// 後10次的重連間隔 60-90秒
private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
* REGISTRATION_RETRY_FUZZ_MULTIPLIER))
// 預設不清理,對應上面的cleanupThreadExecutor
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
// 清理設定,清理檔案時間,預設半小時
private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
// 程式和資料儲存時間,到期才會進行清理,預設永久儲存
private val APP_DATA_RETENTION_SECONDS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
// 是否開啟測試
private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None
/**
* Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker
* will just use the address received from Master.
*/
// 是否使用RPC中的master的位址,否則使用從mastr擷取的位址,也就是host:port,預設關閉
private val preferConfiguredMasterAddress =
conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
/**
* The master address to connect in case of failure. When the connection is broken, worker will
* use this address to connect. This is usually just one of `masterRpcAddresses`. However, when
* a master is restarted or takes over leadership, it will be an address sent from master, which
* may not be in `masterRpcAddresses`.
*/
// 斷開後重連的master位址
// 一般是重連RPC中注冊的master位址,但master重新開機或選主後的新的位址可能不在RPC中
private var masterAddressToConnect: Option[RpcAddress] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private var workerWebUiUrl: String = ""
// 通過RPC擷取的workerUri資訊
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
// worker的ID,就是注冊host和port的系統時間
private val workerId = generateWorkerId()
// spark目錄
private val sparkHome =
if (testing) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
new File(sys.props("spark.test.home"))
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
var workDir: File = null
// executor完成後的資訊
val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
// driver資訊
val drivers = new HashMap[String, DriverRunner]
// executors資訊
val executors = new HashMap[String, ExecutorRunner]
// driver執行完的資訊
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
// app存放目錄
val appDirectories = new HashMap[String, Seq[String]]
// app送出完成的資訊
val finishedApps = new HashSet[String]
// webUI中儲存executor數量,預設1000
val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
// webUI中儲存Driver程式數量,預設1000
val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
// The shuffle service is not actually started unless configured.
// shuffle服務,需要配置才會啟用,預設關閉
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
private var webUi: WorkerWebUI = null
private var connectionAttemptCount = 0
// 向metrics注冊,metrics系統後續解析
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
private val workerSource = new WorkerSource(this)
// 是否啟用反向代理,預設關閉
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
// master注冊的備用位址
private var registerMasterFutures: Array[JFuture[_]] = null
// 注冊的時間記錄
private var registrationRetryTimer: Option[JScheduledFuture[_]] = None
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
// 向多個master同時注冊用的線程池
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"worker-register-master-threadpool",
masterRpcAddresses.length // Make sure we can register with all masters at the same time
)
// cpu記憶體使用資訊
var coresUsed = 0
var memoryUsed = 0
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
}
上面的參數在Worker執行個體化時初始化好。
Worker繼承了ThreadSafeRpcEndpoint ,而ThreadSafeRpcEndpoint繼承的RpcEndpoint
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
RpcEndpoint是通過RpcEnvFactory工廠模式建立的RpcEnv執行個體。
private[spark] trait RpcEnvFactory {
def create(config: RpcEnvConfig): RpcEnv
}
這裡Worker同樣重寫了RpcEnv的onStart、receive和onStop方法,是以初始化Worker時也會執行onStart方法。
onStart()
override def onStart() {
// 判斷registered是否定義
// 上面初始化了 private var registered = false
assert(!registered)
//啟動worker的提示資訊
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
// 調用方法建立WorkerDir
createWorkDir()
// 啟動shuffle服務
startExternalShuffleService()
// 初始化webUI
webUi = new WorkerWebUI(this, workDir, webUiPort)
// web設定到http伺服器
webUi.bind()
// 綁定webUI端口
workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
// 向master注冊worker
registerWithMaster()
// 注冊metrics服務
metricsSystem.registerSource(workerSource)
// 啟動metrics服務
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
// 将metrics服務添加到webUI
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
createWorkDir()
private def createWorkDir() {
// 如果沒有設定workDir,這個目錄預設在spark目錄下建立
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
// This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
// So attempting to create and then check if directory was created or not.
// 建立目錄,如果存在報錯退出
workDir.mkdirs()
if ( !workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
// 判斷路徑是否目錄,不是則提示錯誤資訊
assert (workDir.isDirectory)
} catch {
// 捕獲錯誤,提示退出
case e: Exception =>
logError("Failed to create work directory " + workDir, e)
System.exit(1)
}
}
startExternalShuffleService()
external shuffle服務主要是協調多個Worker共享Spark叢集資源,後面再解讀,這裡是檢查開啟狀态。
private def startExternalShuffleService() {
try {
shuffleService.startIfEnabled()
} catch {
case e: Exception =>
logError("Failed to start external shuffle service", e)
System.exit(1)
}
}
繼續看onStart往下,webUi = new WorkerWebUI(this, workDir, webUiPort),初始化了WorkerWebUI
WorkerWebUI,設定webUI布置等資訊。
private[worker]
class WorkerWebUI(
val worker: Worker,
val workDir: File,
requestedPort: Int)
extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"),
requestedPort, worker.conf, name = "WorkerUI")
with Logging {
// 連接配接逾時,參數在下面object中初始化
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
// 初始化webUI
initialize()
/** Initialize all components of the server. */
// 執行初始化web服務的所有元件
def initialize() {
// LogPage類初始化頁面布置
val logPage = new LogPage(this)
// 将web頁面添加到UI設定
attachPage(logPage)
// 将worker的配置加載到UI設定
attachPage(new WorkerPage(this))
// 指定靜态目錄用來提供檔案服務
attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
// 指定固定通路路徑
attachHandler(createServletHandler("/log",
(request: HttpServletRequest) => logPage.renderLog(request),
worker.securityMgr,
worker.conf))
}
}
private[worker] object WorkerWebUI {
// 靜态目錄位置
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
// webUI中儲存的driver數量
val DEFAULT_RETAINED_DRIVERS = 1000
// webUI中儲存的executor數量
val DEFAULT_RETAINED_EXECUTORS = 1000
}
然後 webUi.bind() 将webUI設定綁定到http伺服器。
/** Bind to the HTTP server behind this web interface. */
def bind(): Unit = {
// 如果設定為空,提示多次綁定
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
try {
// 擷取host,并從jetty擷取資訊
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name))
logInfo(s"Bound $className to $host, and started at $webUrl")
} catch {
case e: Exception =>
// 無法擷取配置,提示失敗退出
logError(s"Failed to bind $className", e)
System.exit(1)
}
}
再往下,registerWithMaster()開始注冊,這個方法也會被re-registerWithMaster() 多次重試調用。
// 注冊到master
private def registerWithMaster() {
// onDisconnected may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
// 判斷如果有服務正在連接配接,不要再次連接配接
registrationRetryTimer match {
// 沒有正在連接配接的服務,正常嘗試注冊到所有master
case None =>
registered = false
// 通過registerMasterThreadPool線程池注冊到所有master
registerMasterFutures = tryRegisterAllMasters()
// 沒有連接配接,是以連接配接重試為0
connectionAttemptCount = 0
// 注冊逾時重試
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// 周遊所有master,發送注冊資訊再次重連
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
// 初始化時有設定,前5次重試時間間隔為5-15秒
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
// 後10次間隔為60-90秒
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
// 如果有已經在連接配接的服務,提示等待連接配接完成
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
tryRegisterAllMasters(),先嘗試注冊
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
// 擷取RPC中注冊的所有master位址進行注冊
masterRpcAddresses.map { masterAddress =>
// 線上程池中開啟線程對周遊的每個master進行注冊
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
// 擷取RPC中注冊的所有master
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 發送注冊資訊到每個master
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
sendRegisterMessageToMaster(masterEndpoint)
調用了RpcEndpointRef類的send()将Worker的配置資訊發送給Master進行注冊。這個才是真正的發送注冊資訊。
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.send(RegisterWorker(
workerId,
host,
port,
self,
cores,
memory,
workerWebUiUrl,
masterEndpoint.address))
}
參數self,其實就是RpcEndpointRef 執行個體。
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
send()
private[spark] abstract class RpcEndpointRef(conf: SparkConf){
...
/**
* Sends a one-way asynchronous message. Fire-and-forget semantics.
*/
def send(message: Any): Unit
}
這裡可以傳回Master的 receive() 看看怎麼接收的。
override def receive: PartialFunction[Any, Unit] = {
// 接收選主的資訊等等...
case ElectedLeader =>
...
}
case CompleteRecovery => completeRecovery()
// 接收撤銷主資訊
case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
// 其實跟Worer發送的資訊相對應
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
// 注冊的log資訊
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
// STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY
// 如果master是standby狀态,調用傳入的workerRef(RpcEndpointRef)的send()傳回master不存在的資訊
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
// 不是standby,判斷workerId是否存在,idToWorker是master中初始化的workerID,存在提示worker存在
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
// 正常注冊worker
} else {
// 先通過傳入的參數建立worker執行個體
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
// 持久化判斷,如果worker執行個體正常建立,持久化worker,添加worker
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
schedule()
} else {
// worker執行個體沒有正常建立,擷取worker位址,提示并發送重試錯誤資訊
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
如果注冊成功,通過workerRef.send()發送Worker序列化執行個體。
private[deploy] sealed trait DeployMessage extends Serializable
/** Contains messages sent between Scheduler endpoint nodes. */
// 排程的終端發送到各個節點的worker資訊
private[deploy] object DeployMessages {
case class RegisteredWorker(
master: RpcEndpointRef,
masterWebUiUrl: String,
masterAddress: RpcAddress) extends DeployMessage with RegisterWorkerResponse
...
}
schedule(),資源的排程,具體排程元件後續解析。
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
// 資源排程,如果等待的app或者新app加入時,有可用資源就會排程這個方法
// 如果master不存活,退出
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
// driver的執行優先于executors
// 這裡是判斷driver的執行情況
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
注冊階段完成,傳回onStart()中,metricsSystem類後續解讀,onStart基本就完成了。
繼續傳回org.apache.spark.deploy.worker.Worker的startRpcEnvAndEndpoint()中,Worker的執行個體化也就完成了。
Worker裡面還有一些其他的方法,以後調用的時候再解讀。