天天看點

Spark ListenerBus 和 MetricsSystem 體系分析

監控是一個大系統完成後最重要的一部分。spark整個系統運作情況是由listenerbus以及metricssystem 來完成的。這篇文章重點分析他們之間的工作機制以及如何通過這兩個系統完成更多的名額收集。

spark的事件體系是如何工作的呢?我們先簡要描述下,讓大家有個大概的了解。

首先,大部分類都會引入一個對象叫listenerbus,這個類具體是什麼得看實作,但是都一定繼承自org.apache.spark.util.listenerbus.

假設我們要送出一個任務集。這個動作可能會很多人關心,我就是使用listenerbus把event發出去,類似下面的第二行代碼。

listenerbus裡已經注冊了很多監聽者,我們叫listener,通常listenerbus 會啟動一個線程異步的調用這些listener去消費這個event。而所謂的消費,其實就是觸發事先設計好的回調函數來執行譬如資訊存儲等動作。 

這就是整個listenerbus的工作方式。這裡我們看到,其實類似于埋點,這是有侵入性的,每個你需要關注的地方,如果想讓人知曉,就都需要發出一個特定的event。

這裡的特定實作有:

asynchronouslistenerbus 内部維護了一個queue,事件都會先放到這個queue,然後通過一個線程來讓listener處理event。

sparklistenerbus 也是一個trait,但是裡面有個具體的實作,預先定義了onpostevent 方法對一些特定的事件做了處理。

其他更下面的類則根據需要混入或者繼承sparklistenerbus ,asynchronouslistenerbus來完成他們需要的功能。

不同的listenerbus 需要不同的event 集 和listener,比如你看streaminglistenerbus的簽名,就知道所有的event都必須是streaminglistenerevent,所有的listener都必須是

通常而言,listener 是有狀态的,一般接受到一個event後,可能就會更新内部的某個資料結構。以 org.apache.spark.streaming.ui.streamingjobprogresslistener為例,他是一個streaminglistener,内部就含有一些存儲結構,譬如:

看申明都是普通的 hashmap ,是以操作是需要做synchronized操作。如下:

metricssystem 比較好了解,一般是為了衡量系統的各種名額的度量系統。算是一個key-value形态的東西。舉個比較簡單的例子,我怎麼把目前jvm相關資訊展示出去呢?做法自然很多,通過metricssystem就可以做的更标準化些,具體方式如下:

source 。資料來源。比如對應的有org.apache.spark.metrics.source.jvmsource

sink。  資料發送到哪去。有被動和主動。一般主動的是通過定時器來完成輸出,譬如csvsink,被動的如metricsservlet等需要被使用者主動調用。

橋接source 和sink的則是metricregistry了。

metricssystem的配置有兩種,第一種是 metrics.properties 配置檔案的形态。第二種是通過spark conf完成,參數以spark.metrics.conf.開頭 。

我這裡簡單介紹下第二種方式。

比如我想檢視jvm的資訊,包括gc和memory的使用情況,則我通過類似 

預設情況下,metricssystem 配置了一個全局的sink,metricsservlet。是以你添加的任何source 都可以通過一個path /metrics/json擷取到。如果你的程式設定做了上面的設定,把你的spark-ui的路徑換成/metrics/json,就能看到jvm源的一些資訊了。

通常,如果你要實作一個自定義的source,可以遵循如下步驟(這裡以jvmsource為例)。

<b>-- 建立一個source</b>

其中 sourcename 是為了給配置用的,比如上面我們設定

裡面的jvm 就是jvmsource裡設定的sourcename

每個source 一般會自己建構一個metricregistry。上面的例子,具體的資料收集工作是由garbagecollectormetricset,memoryusagegaugeset完成的。

具體就是寫一個類繼承com.codahale.metrics.metricset,然後實作map&lt;string, metric&gt; getmetrics() 方法就好。

接着通過metricregistry.registerall将寫好的metricset注冊上就行。

<b>-- 添加配置</b>

<b>-- 調用結果</b>

将spark ui 的位址換成/metrics/json,就能看到輸出結果了。當然,這裡是因為預設系統預設提供了一個sink實作:org.apache.spark.metrics.sink.metricsservlet,你可以自己實作一個。

通過之前我寫的spark ui (基于yarn) 分析與定制,你應該學會了如何添加新的頁面到spark ui上。

而這通過這一片文章,你應該了解了資料來源有兩個:

各個listener

metricssystem

你可以組合現有的listener以及metrics source 顯示任何你想要的内容。

如果現有的無法滿足你,通常你的新的需求應該可以通過下面兩種方式來滿足:

你需要監控新的事件,那麼你需要添加新的listenerbus,listener,event,然後到你需要的地方去埋點(post事件)。這肯定需要修改spark-core裡的代碼了。

你需要呈現現有的listener或者已知對象的變量,則使用metricssystem,定義一個新的source 即可。

這樣,把這些對象傳遞到你的page中,就可以進行展示。