注:本文是為了配合《Spark核心設計的藝術——架構設計與實作》一書的内容而編寫,目的是為了節省成本、友善讀者查閱。書中附錄D的内容都在本文呈現。
Metrics是codahale提供的第三方度量倉庫。Metrics作為一款監控名額的度量類庫,可以為第三方庫提供輔助統計資訊,還可以将度量資料發送給Ganglia和Graphite以提供圖形化的監控。
Metrics也采用了監聽器模式,提供了Gauge、Counter、Meter、Histogram、Timer等度量工具類以及健康檢查(HealthCheck)功能。想了解更多Metrics的内容,讀者可以通路Metrics官網:http://metrics.dropwizard.io/3.2.2/
本文将對Metrics中的核心類進行介紹,友善讀者對Spark度量系統更加細緻深入的了解。
MetricRegistry
MetricRegistry是Metrics提供的度量容器,這裡先列出MetricRegistry的主要結構。
public class MetricRegistry implements MetricSet {
private final ConcurrentMap<String, Metric> metrics;
private final List<MetricRegistryListener> listeners;
}
從上面代碼可以看出MetricRegistry中會緩存各種度量和監聽器,下面對MetricRegistry中的一些方法進行介紹。
1、name
功能描述:建構形如“字元串1.字元串2…字元串N-1.字元串N”這樣的字元串。任何空值或空字元串都将被過濾。
public static String name(String name, String... names) {
final StringBuilder builder = new StringBuilder();
append(builder, name);
if (names != null) {
for (String s : names) {
append(builder, s);
}
}
return builder.toString();
}
2、notifyListenerOfAddedMetric
功能描述:當有新的Metric添加到ConcurrentMap<String, Metric> metrics時,調用此方法。根據Metric的子接口的不同,調用不同方法。例如:Gauge則調用監聽器的onGaugeAdded;Counter則調用監聽器的onCounterAdded;Histogram則調用監聽器的onHistogramAdded。
private void notifyListenerOfAddedMetric(MetricRegistryListener listener, Metric metric, String name) {
if (metric instanceof Gauge) {
listener.onGaugeAdded(name, (Gauge<?>) metric);
} else if (metric instanceof Counter) {
listener.onCounterAdded(name, (Counter) metric);
} else if (metric instanceof Histogram) {
listener.onHistogramAdded(name, (Histogram) metric);
} else if (metric instanceof Meter) {
listener.onMeterAdded(name, (Meter) metric);
} else if (metric instanceof Timer) {
listener.onTimerAdded(name, (Timer) metric);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
}
3、onMetricAdded
功能描述:當有新的Metric添加到ConcurrentMap<String, Metric> metrics時,調用此方法。周遊調用監聽器緩存List<MetricRegistryListener> listeners中的所有監聽器,調用notifyListenerOfAddedMetric。
private void onMetricAdded(String name, Metric metric) {
for (MetricRegistryListener listener : listeners) {
notifyListenerOfAddedMetric(listener, metric, name);
}
}
4、register
功能描述:如果metric的類型是Metric并且metrics中還沒有此metric,則将它添加到metrics;
如果Metric的類型是MetricSet,則MetricSet中包含的所有新的Metric添加到緩存ConcurrentMap<String, Metric> metrics;以上添加過程都伴随onMetricAdded的調用。
public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
if (metric instanceof MetricSet) {
registerAll(name, (MetricSet) metric);
} else {
final Metric existing = metrics.putIfAbsent(name, metric);
if (existing == null) {
onMetricAdded(name, metric);
} else {
throw new IllegalArgumentException("A metric named " + name + " already exists");
}
}
return metric;
}
private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
if (entry.getValue() instanceof MetricSet) {
registerAll(name(prefix, entry.getKey()), (MetricSet) entry.getValue());
} else {
register(name(prefix, entry.getKey()), entry.getValue());
}
}
}
Gauge
Gauge是Metrics提供的用于估計路徑成本的特質,其實作如下:
public interface Gauge<T> extends Metric {
/**
* Returns the metric's current value.
*
* @return the metric's current value
*/
T getValue();
}
Slf4jReporter
Slf4jReporter 是Metrics提供的使用實作了Slf4j接口的實作類的方法,将度量輸出到日志的類。
1、report
功能描述:将度量輸出到日志的方法。
@Override
public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
if (loggerProxy.isEnabled(marker)) {
for (Entry<String, Gauge> entry : gauges.entrySet()) {
logGauge(entry.getKey(), entry.getValue());
}
for (Entry<String, Counter> entry : counters.entrySet()) {
logCounter(entry.getKey(), entry.getValue());
}
for (Entry<String, Histogram> entry : histograms.entrySet()) {
logHistogram(entry.getKey(), entry.getValue());
}
for (Entry<String, Meter> entry : meters.entrySet()) {
logMeter(entry.getKey(), entry.getValue());
}
for (Entry<String, Timer> entry : timers.entrySet()) {
logTimer(entry.getKey(), entry.getValue());
}
}
}
2、logGauge
功能描述:将估計度量輸出到日志的方法。(備注: Slf4jReporter的 report 方法中分别對 Gauge 、 Counter 、 Histogram 、 Meter 及 Timer 進行輸出,為說明問題,附錄 D 隻挑選了對 Gauge 的輸出作為介紹,其它種類度量的輸出,讀者可查閱相關文檔或者閱讀 Metrics 源碼。)
private void logGauge(String name, Gauge gauge) {
loggerProxy.log(marker, "type=GAUGE, name={}, value={}", prefix(name), gauge.getValue());
}
3、LoggerProxy
Slf4jReporter的日志輸出依賴于LoggerProxy,根據LoggerProxy的類名,我們知道這是一個有關日志輸出的代理類,其實作如下:
/* private class to allow logger configuration */
static abstract class LoggerProxy {
protected final Logger logger;
public LoggerProxy(Logger logger) {
this.logger = logger;
}
abstract void log(Marker marker, String format, Object... arguments);
abstract boolean isEnabled(Marker marker);
}
可以看到LoggerProxy實際上不過是代理了org.slf4j.Logger接口對日志輸出。
ScheduledReporter
ScheduledReporter是ConsoleReporter、CsvReporter、Slf4jReporter及GraphiteReporter的共同父類,ScheduledReporter中的很多方法被子類所共用。
1、start
功能描述:啟動度量輸出工作,實質為定時器不斷地調用report方法輸出。
public void start(long period, TimeUnit unit) {
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
report();
} catch (RuntimeException ex) {
LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
}
}
}, period, period, unit);
}
2、stop
功能描述:停止度量輸出工作,實質為停止定時器。
public void stop() {
executor.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
關于《Spark核心設計的藝術 架構設計與實作》
經過近一年的準備,《 Spark核心設計的藝術 架構設計與實作 》一書現已出版發行,圖書如圖:
紙質版售賣連結如下:
京東: https://item.jd.com/12302500.html
電子版售賣連結如下: 京東: https://e.jd.com/30389208.html