Spark的Metrics System的度量系統,有兩個部分組成:source,sink,建立的時候需要制定instance。度量系統會周期的将source的名額資料被sink周期性的拉去,sink可以有很多。
Instance代表着使用度量系統的角色。在spark内部,目前master,worker,Executor,client driver,這些角色都會因為要去做監控而建立使用度量系統。目前,spark内部實作的instance有:master,worker,Executor,Driver,Applications。
Source指定定義了如何去收取度量名額。目前,已經存在以下兩種source:
1.Spark内部的source,比如MasterSource,WorkerSource,ExecutorSource,
DAGSchedulerSource,BlockManagerSource,ApplicationSource。這些source會收集spark内部部件的狀态。這些source都跟instance相關,在建立度量系統的時候會被加入。
2.公共的source,比如JVMSource,收集的是更加底層的狀态,可以用配置檔案配置并且是通過反射機制加載的。
Sink定義了度量名額資料輸出的位置。同時可以共存很多sinks,名額資料會發給所有的sinks。
Source和sink的綁定
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
registerSources()
registerSinks()
sinks.foreach(_.start)
}
複制
名額配置的格式如下:
[instance].[sink|source].[name].[options] = xxxx
[instance]可以是master,worker,executor,driver,applications.配置了就意味着隻有指定的instance由此屬性。可以粗犷的用*代替instance name,這就意味着所有的instance都将由此屬性。
[sink|source].代表着該屬性是source還是sink。隻能是二選一。
[name]指定sink或者source的名字。
[options]指定sink或者source的屬性
具體例子如下:
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Polling period for ConsoleSink
#*.sink.console.period=10
#*.sink.console.unit=seconds
# Master instance overlap polling period
#master.sink.console.period=15
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
#*.sink.csv.unit=minutes
# Polling directory for CsvSink
#*.sink.csv.directory=/tmp/
# Worker instance overlap polling period
#worker.sink.csv.period=10
#worker.sink.csv.unit=minutes
# Enable Slf4jSink for all instances by class name
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
# Polling period for Slf4JSink
#*.sink.slf4j.period=1
#*.sink.slf4j.unit=minutes
複制
注意事項:
1,添加新的sink的時候,設定class option時需要是全名。
2,有些sink支援周期的拉去資料。最小拉去資料的周期是1秒鐘。
3,有些特殊的屬性支援通配符,例如:master.sink.console.period->*.sink.console.period
4,metrics.properties檔案如果放在 ${SPARK_HOME}/conf目錄下可以被自動加載
如果想自定義目錄需要用-Dspark.metrics.conf=xxx,指定java屬性配置的方式去指定。
5,MetricsServlet作為預設的sink,隻支援,master,worker,client driver,可以通過發送http請求 /metrics/json,可以以json的格式擷取所有已經注冊的名額資料。
由于Spark生産中大部分運作于yarn上
Driver端的度量名額的請求方式
/proxy/application_1494227937369_0084/metrics/json
主要source源是:
StreamingSource,DAGSchedulerSource,BlockManagerSource,
ExecutorAllocationManagerSource
driver端的度量系統的初始化細節
在SparkContext裡面
初始化度量系統
建構度量系統對象是在Sparkenv中做的
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
複制
SparkContext隻是引用了SparkEnv的對象
metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
複制
啟動度量系統并且綁定ServletHandler
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
複制
注冊source
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
複制
Executor端的Source:
ExecutorSource
Executor端度量系統的初始化機啟動
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
複制
建構ExecutorSource并注冊
private val executorSource = new ExecutorSource(threadPool, executorId)
if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
env.blockManager.initialize(conf.getAppId)
}
複制
可以看到Executor端并沒有綁定ServletHandler,故而無法通過http請求到度量名額。