天天看點

附錄D Metrics簡介

注:本文是為了配合《Spark核心設計的藝術——架構設計與實作》一書的内容而編寫,目的是為了節省成本、友善讀者查閱。書中附錄D的内容都在本文呈現。

Metrics是codahale提供的第三方度量倉庫。Metrics作為一款監控名額的度量類庫,可以為第三方庫提供輔助統計資訊,還可以将度量資料發送給Ganglia和Graphite以提供圖形化的監控。

      Metrics也采用了監聽器模式,提供了Gauge、Counter、Meter、Histogram、Timer等度量工具類以及健康檢查(HealthCheck)功能。想了解更多Metrics的内容,讀者可以通路Metrics官網:http://metrics.dropwizard.io/3.2.2/

本文将對Metrics中的核心類進行介紹,友善讀者對Spark度量系統更加細緻深入的了解。

MetricRegistry

MetricRegistry是Metrics提供的度量容器,這裡先列出MetricRegistry的主要結構。

public class MetricRegistry implements MetricSet {
  private final ConcurrentMap<String, Metric> metrics;
  private final List<MetricRegistryListener> listeners;
}
           

從上面代碼可以看出MetricRegistry中會緩存各種度量和監聽器,下面對MetricRegistry中的一些方法進行介紹。

1、name

功能描述:建構形如“字元串1.字元串2…字元串N-1.字元串N”這樣的字元串。任何空值或空字元串都将被過濾。

public static String name(String name, String... names) {
        final StringBuilder builder = new StringBuilder();
        append(builder, name);
        if (names != null) {
            for (String s : names) {
                append(builder, s);
            }
        }
        return builder.toString();
    }
           

2、notifyListenerOfAddedMetric

功能描述:當有新的Metric添加到ConcurrentMap<String, Metric> metrics時,調用此方法。根據Metric的子接口的不同,調用不同方法。例如:Gauge則調用監聽器的onGaugeAdded;Counter則調用監聽器的onCounterAdded;Histogram則調用監聽器的onHistogramAdded。

private void notifyListenerOfAddedMetric(MetricRegistryListener listener, Metric metric, String name) {
        if (metric instanceof Gauge) {
            listener.onGaugeAdded(name, (Gauge<?>) metric);
        } else if (metric instanceof Counter) {
            listener.onCounterAdded(name, (Counter) metric);
        } else if (metric instanceof Histogram) {
            listener.onHistogramAdded(name, (Histogram) metric);
        } else if (metric instanceof Meter) {
            listener.onMeterAdded(name, (Meter) metric);
        } else if (metric instanceof Timer) {
            listener.onTimerAdded(name, (Timer) metric);
        } else {
            throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
        }
    }
           

3、onMetricAdded

功能描述:當有新的Metric添加到ConcurrentMap<String, Metric> metrics時,調用此方法。周遊調用監聽器緩存List<MetricRegistryListener> listeners中的所有監聽器,調用notifyListenerOfAddedMetric。

private void onMetricAdded(String name, Metric metric) {
        for (MetricRegistryListener listener : listeners) {
            notifyListenerOfAddedMetric(listener, metric, name);
        }
    }
           

4、register

功能描述:如果metric的類型是Metric并且metrics中還沒有此metric,則将它添加到metrics;

如果Metric的類型是MetricSet,則MetricSet中包含的所有新的Metric添加到緩存ConcurrentMap<String, Metric> metrics;以上添加過程都伴随onMetricAdded的調用。

public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
        if (metric instanceof MetricSet) {
            registerAll(name, (MetricSet) metric);
        } else {
            final Metric existing = metrics.putIfAbsent(name, metric);
            if (existing == null) {
                onMetricAdded(name, metric);
            } else {
                throw new IllegalArgumentException("A metric named " + name + " already exists");
            }
        }
        return metric;
	}
	private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
        for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
            if (entry.getValue() instanceof MetricSet) {
                registerAll(name(prefix, entry.getKey()), (MetricSet) entry.getValue());
            } else {
                register(name(prefix, entry.getKey()), entry.getValue());
            }
        }
}
           

Gauge

Gauge是Metrics提供的用于估計路徑成本的特質,其實作如下:

public interface Gauge<T> extends Metric {
    /**
     * Returns the metric's current value.
     *
     * @return the metric's current value
     */
    T getValue();
}
           

Slf4jReporter

         Slf4jReporter 是Metrics提供的使用實作了Slf4j接口的實作類的方法,将度量輸出到日志的類。

1、report

         功能描述:将度量輸出到日志的方法。

@Override
    public void report(SortedMap<String, Gauge> gauges,
                       SortedMap<String, Counter> counters,
                       SortedMap<String, Histogram> histograms,
                       SortedMap<String, Meter> meters,
                       SortedMap<String, Timer> timers) {
        if (loggerProxy.isEnabled(marker)) {
            for (Entry<String, Gauge> entry : gauges.entrySet()) {
                logGauge(entry.getKey(), entry.getValue());
            }

            for (Entry<String, Counter> entry : counters.entrySet()) {
                logCounter(entry.getKey(), entry.getValue());
            }

            for (Entry<String, Histogram> entry : histograms.entrySet()) {
                logHistogram(entry.getKey(), entry.getValue());
            }

            for (Entry<String, Meter> entry : meters.entrySet()) {
                logMeter(entry.getKey(), entry.getValue());
            }

            for (Entry<String, Timer> entry : timers.entrySet()) {
                logTimer(entry.getKey(), entry.getValue());
            }
        }
    }
           

2、logGauge

         功能描述:将估計度量輸出到日志的方法。(備注: Slf4jReporter的 report 方法中分别對 Gauge 、 Counter 、 Histogram 、 Meter 及 Timer 進行輸出,為說明問題,附錄 D 隻挑選了對 Gauge 的輸出作為介紹,其它種類度量的輸出,讀者可查閱相關文檔或者閱讀 Metrics 源碼。)

private void logGauge(String name, Gauge gauge) {
        loggerProxy.log(marker, "type=GAUGE, name={}, value={}", prefix(name), gauge.getValue());
    }
           

3、LoggerProxy

         Slf4jReporter的日志輸出依賴于LoggerProxy,根據LoggerProxy的類名,我們知道這是一個有關日志輸出的代理類,其實作如下:

/* private class to allow logger configuration */
    static abstract class LoggerProxy {
        protected final Logger logger;

        public LoggerProxy(Logger logger) {
            this.logger = logger;
        }

        abstract void log(Marker marker, String format, Object... arguments);

        abstract boolean isEnabled(Marker marker);
    }
           

可以看到LoggerProxy實際上不過是代理了org.slf4j.Logger接口對日志輸出。

ScheduledReporter

ScheduledReporter是ConsoleReporter、CsvReporter、Slf4jReporter及GraphiteReporter的共同父類,ScheduledReporter中的很多方法被子類所共用。

1、start

         功能描述:啟動度量輸出工作,實質為定時器不斷地調用report方法輸出。

public void start(long period, TimeUnit unit) {
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    report();
                } catch (RuntimeException ex) {
                    LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
                }
            }
        }, period, period, unit);
    }
           

2、stop

         功能描述:停止度量輸出工作,實質為停止定時器。

public void stop() {
        executor.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            executor.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
           

關于《Spark核心設計的藝術 架構設計與實作》

經過近一年的準備,《 Spark核心設計的藝術 架構設計與實作 》一書現已出版發行,圖書如圖:

附錄D Metrics簡介

紙質版售賣連結如下:

京東: https://item.jd.com/12302500.html

電子版售賣連結如下: 京東: https://e.jd.com/30389208.html