天天看點

Spark2.1.0——深入淺出度量系統

對于一個系統而言,首先考慮要滿足一些業務場景,并實作功能。随着系統功能越來越多,代碼量級越來越高,系統的可維護性、可測試性、性能都會成為新的挑戰,這時監控功能就變得越來越重要了。在國内,絕大多數IT公司的項目都以業務為導向,以完成功能為目标,這些項目在立項、設計、開發、上線的各個階段,很少有人會考慮到監控的問題。

Spark2.1.0——深入淺出度量系統

  對于一個系統而言,首先考慮要滿足一些業務場景,并實作功能。随着系統功能越來越多,代碼量級越來越高,系統的可維護性、可測試性、性能都會成為新的挑戰,這時監控功能就變得越來越重要了。在國内,絕大多數IT公司的項目都以業務為導向,以完成功能為目标,這些項目在立項、設計、開發、上線的各個階段,很少有人會考慮到監控的問題。在國内,開發人員能夠認真的在代碼段落中列印日志,就已經屬于最優秀的程式員了。然而,在國外的很多項目則不會這樣,看看久負盛名的Hadoop的監控系統就可見一斑,尤其是在Facebook,更是把功能、日志以及監控列為同等重要,作為一個合格工程師的三駕馬車。

  Spark作為優秀的開源系統,在監控方面也有自己的一整套體系。一個系統有了監控功能後将收獲諸多益處,如可測試性、性能優化、運維評估、資料統計等。Spark的度量系統使用codahale提供的第三方度量倉庫Metrics,本節将着重介紹Spark基于Metrics建構度量系統的原理與實作。對于Metrics感興趣的讀者,可以參考閱讀《附錄D Metrics簡介》中的内容。

  Spark的度量系統中有三個概念:

  • Instance:指定了度量系統的執行個體名。Spark按照Instance的不同,區分為Master、Worker、Application、Driver和Executor;
  • Source:指定了從哪裡收集度量資料,即度量資料的來源。Spark提供了應用的度量來源(ApplicationSource)、Worker的度量來源(WorkerSource)、DAGScheduler的度量來源(DAGSchedulerSource)、BlockManager的度量來源(BlockManagerSource)等諸多實作,對各個服務或元件進行監控。
  • Sink:指定了往哪裡輸出度量資料,即度量資料的輸出。Spark中使用MetricsServlet作為預設的Sink,此外還提供了ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等實作。

為了更加直覺的表現上述概念,我們以圖1來表示Spark中度量系統的工作流程。

Spark2.1.0——深入淺出度量系統

圖1 度量系統的工作流程

Source繼承體系

  任何監控都離不開度量資料的采集,離線的資料采集很容易做到和被采集子產品之間的解耦,但是對于實時度量資料,尤其是那些記憶體中資料的采集就很難解耦。這就類似于網頁監控資料的埋點一樣,你要在網頁中加入一段額外的js代碼(例如Google分析,即便你隻是引入一個js檔案,這很難讓前端工程師感到開心)。還有一類監控,比如在Java Web中增加一個負責監控的Servlet或者一個基于Spring3.0的攔截器,這種方式雖然将耦合度從代碼級别降低到配置級别,但卻無法有效的對記憶體中的資料結構進行監控。Spark的度量系統對系統功能來說是在代碼層面耦合的,這種犧牲對于能夠換取對實時的、處于記憶體中的資料進行更有效的監控是值得的。

  Spark将度量來源抽象為Source,其定義見代碼清單1。

代碼清單1         度量源的定義

private[spark] trait Source {
  def sourceName: String
  def metricRegistry: MetricRegistry
}
      

  Spark中有很多Source的具體實作,可以通過圖2來了解。

Spark2.1.0——深入淺出度量系統

圖2    Source的繼承體系

為了說明Source該如何實作,我們選擇ApplicationSource(也是因為其實作簡單明了,足以說明問題)為例,其實作見代碼清單2。

代碼清單2         ApplicationSource的實作

private[master] class ApplicationSource(val application: ApplicationInfo) extends Source {
  override val metricRegistry = new MetricRegistry()
  override val sourceName = "%s.%s.%s".format("application", application.desc.name,
    System.currentTimeMillis())

  metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
    override def getValue: String = application.state.toString
  })

  metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
    override def getValue: Long = application.duration
  })

  metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] {
    override def getValue: Int = application.coresGranted
  })

}
      

  望文生義,ApplicationSource用于采集Spark應用程式相關的度量。代碼清單2中ApplicationSource重載了metricRegistry和sourceName,并且向自身的系統資料庫注冊了status(即應用狀态,包括:WAITING, RUNNING, FINISHED, FAILED, KILLED, UNKNOWN)、runtime_ms(運作持續時長)、cores(授權的核心數)等度量。這三個度量的取值分别來自于ApplicationInfo的state、duration和coresGranted三個屬性。這三個度量都由Gauge的匿名内部類實作,Gauge是Metrics提供的用于估計路徑成本的特質。有關Gauge、MetricRegistry、MetricRegistry注冊度量的方法register及命名方法name的更詳細介紹請閱讀《附錄D Metrics簡介》。

Sink繼承體系

  Source準備好度量資料後,我們就需要考慮如何輸出和使用的問題。這裡介紹一些常見的度量輸出方式:阿裡資料部門采用的一種度量使用方式就是輸出到日志;在指令行運作過Hadoop任務(例如:mapreduce)的使用者也會發現控制台列印的内容中也包含度量資訊;使用者可能希望将有些度量資訊儲存到檔案(例如CSV),以便将來能夠檢視;如果覺得使用CSV或者控制台等方式不夠直覺,還可以将采集到的度量資料輸出到專用的監控系統界面。這些最終對度量資料的使用,或者說是輸出方式,Spark将它們統一抽象為Sink。Sink的定義見代碼清單3。

代碼清單3         度量輸出的定義

private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}        

從代碼清單3可以看到Sink是一個特質,包含三個接口方法:

  • start:啟動Sink;
  • stop:停止Sink;
  • report:輸出到目的地;

從這三個方法的解釋來看,很難讓讀者獲得更多的資訊。我們先把這些困惑放在一邊,來看看Spark中Sink的類繼承體系,如圖3所示。

Spark2.1.0——深入淺出度量系統

圖3     Sink的類繼承體系

圖3中展示了6種Sink的具體實作。

  • ConsoleSink:借助Metrics提供的ConsoleReporter的API,将度量輸出到System.out,是以可以輸出到控制台。
  • CsvSink:借助Metrics提供的CsvReporter的API,将度量輸出到CSV檔案。
  • MetricsServlet:在Spark UI的jetty服務中建立ServletContextHandler,将度量資料通過Spark UI展示在浏覽器中。
  • JmxSink:借助Metrics提供的JmxReporter的API,将度量輸出到MBean中,這樣就可以打開Java VisualVM,然後打開Tomcat程序監控,給VisualVM安裝MBeans插件後,選擇MBeans标簽頁可以對JmxSink所有注冊到JMX中的對象進行管理。
  • Slf4jSink:借助Metrics提供的Slf4jReporter的API,将度量輸出到實作了Slf4j規範的日志輸出。
  • GraphiteSink:借助Metrics提供的GraphiteReporter的API,将度量輸出到Graphite(一個由Python實作的Web應用,采用django架構,用來收集伺服器狀态的監控系統)。

了解了Sink的類繼承體系,我們挑選Slf4jSink作為Spark中Sink實作類的例子,來了解Sink具體該如何實作。Slf4jSink的實作見代碼清單4。

代碼清單4         Slf4jSink的實作

private[spark] class Slf4jSink(
    val property: Properties,
    val registry: MetricRegistry,
    securityMgr: SecurityManager)
  extends Sink {
  val SLF4J_DEFAULT_PERIOD = 10
  val SLF4J_DEFAULT_UNIT = "SECONDS"

  val SLF4J_KEY_PERIOD = "period"
  val SLF4J_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => SLF4J_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
    case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
  }

  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
    .convertDurationsTo(TimeUnit.MILLISECONDS)
    .convertRatesTo(TimeUnit.SECONDS)
    .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}
      

  從Slf4jSink的實作可以看到Slf4jSink的start、stop及report實際都是代理了Metrics庫中的Slf4jReporter的start、stop及report方法。Slf4jReporter的start方法實際是其父類ScheduledReporter的start實作。而傳遞的兩個參數pollPeriod和pollUnit,正是被ScheduledReporter使用作為定時器擷取資料的周期和時間機關。有關ScheduledReporter中start、stop及Slf4jReporter的report方法的實作可以參閱《附錄D Metrics簡介》。

關于《Spark核心設計的藝術 架構設計與實作》

經過近一年的準備,基于Spark2.1.0版本的《Spark核心設計的藝術 架構設計與實作》一書現已出版發行,圖書如圖:

Spark2.1.0——深入淺出度量系統

紙質版售賣連結如下:

京東:https://item.jd.com/12302500.html

道生一,一生二,二生三,三生萬物。

繼續閱讀