Kafka端到端審計是指生産者生産的消息存入至broker,以及消費者從broker中消費消息這個過程之間消息個數及延遲的審計,以此可以檢測是否有資料丢失,是否有資料重複以及端到端的延遲等。
目前主要調研了3個産品:
Chaperone (Uber)
Confluent Control Center(非開源,收費)
Kafka Monitor (LinkedIn)
對于Kafka端到端的審計主要通過:
消息payload中内嵌時間戳timestamp
消息payload中内嵌全局index
消息payload中内嵌timestamp和index
内嵌timestamp的方式
主要是通過設定一個審計的時間間隔(這裡稱之為time_bucket_interval,可以設定幾秒或者幾分鐘,這個可以自定義), 每個timestamp都會被配置設定到相應的桶中,算法有:
timestamp - timestamp%time_bucket_interval
floor((timestamp /15)*15)
這樣可以獲得相應time_bucket的起始時間time_bucket_start,一個time_bucket的區間可以記錄為[time_bucket_start, time_bucket_start+time_bucket_interval]。
每發送或者消費一條消息可以根據消息payload内嵌的時間戳,配置設定到相應桶中,然後對桶進行計數,之後進行存儲,簡單的可以存儲到,比如:Map<long time_bucket_start, long count>之中。
内嵌index的方式
這種方式就更容易了解了,對于每條消息都配置設定一個全局唯一的index,如果topic及相應的partition固定的話,可以為每一個topic-partition設定一個全局的index,當有消息發送到某個topic-partition中,那麼首先擷取其topic-partition對應的index, 然後内嵌到payload中,之後再發送到broker。消費者進行消費審計,可以判斷出哪個消息丢失,哪個消息重複等等。如果要計算端到端延遲的話,還需要在payload中内嵌timestamp以作相應的計算。
下面來簡要分析下三個産品。
Chaperone進行消息端到端的校驗主要是基于message内置timestamp實作的,根據timestamp将message配置設定到不同的bucket中。之後就是對這個bucket中的消息進行計數等一系列的audit操作,然後将這個audit操作之後的資訊auditMessage儲存起來,auditMessage的内容:
topicName:被audit的topic
time_bucket_start:bucket的起始時間
time_bucket_end
metrics_count:time_bucket中的個數
metrics_mean_latency, metrics_p95_latency, metrics_p99_latency,metrics_max_latency:延遲
tier
hostname
datacenter
uuid
注意這裡的latency的計算規則是:currentTimeMillis - (timestamp*1000)。

Chaperone的整體架構分為:AuditLibrary, ChaperoneService, ChaperoneCollector和WebService, 它們會收集資料,并進行相關計算,自動檢測出丢失和延遲的資料,并展示審計結果。
從Chaperone的github上的源碼來看:
Chaperone分為ChaperoneClient, ChaperoneCollector, ChaperoneDistribution, ChaperoneServiceController, ChaperoneServiceWorker這5個子項目。對比着上面的架構圖來分析。
ChaperoneClient對應着AuditLibrary,主要是用來audit message的庫(library),并不以實際服務運作,可以在Producer或者Consumer用戶端中調用,預設使用10mins的滾動時間bucket來不斷地從每個主題收集消息。然後發送到kafka的chaperone-audit這個topic中。官方文檔介紹說AuditLibrary會被ChaperoneService, ChaperoneCollector和WebService這三個元件所依賴,但代碼中來看并非完全如此,略有出入。
ChaperoneDistribution可以忽略
ChaperoneServiceController和ChaperoneServiceWorker對應架構圖中的ChaperoneService,ChaperoneServiceController主要用來檢測topics并配置設定topic-partitions給ChaperoneServiceWorker用以審計(audit)。ChaperoneServiceWorker主要是audit message的一個服務。
ChaperoneServiceWorker采用scala語言編寫,内部又将ChaperoneClient或者說AuditLibrary又重新用Scala實作了一番,并豐富了一下應用,比如采用hsqldb存儲資料,zk存取offsets來實作WAL(預寫式日志,具體可見下段介紹)
Chaperone認為message中内嵌timestamp是十分必須的,但是從ChaperoneServiceWorker的代碼來看消息沒有timestamp也能運作,當消息沒有時間戳,那麼會記錄noTimeMsgCount,Chaperone介紹會有一個牛逼的算法來分析消息中的timestamp(其實就是讀取消息的開頭部分,而不是全部整條消息,類似封包截斷解析,下面也有涉及介紹),如果解析timestamp失敗,會記錄malformedMsgCount。
ChaperoneCollector對是用來讀取audit的資料,然後持久化操作,預設存入mysql中,看代碼也可選存入redis中。
源碼中沒有WebService這個東西,估計是uber内部的web系統,讀取下mysql中的内容展示到頁面而已。
如果程式段内嵌Audit Library(ChaperoneClient),那麼整個audit過程如下:
如果producer端或者consumer端需要進行消息審計,那麼可以内嵌Audit Library。就以發送端為例,消息先發送到kafka中,然後對這條消息進行審計,将審計的資訊存入到kafka中,之後有專門的ChaperoneServiceCollector進行資料消費,之後存入mysql中,也可以選擇存入redis中。頁面系統webService可以查詢mysql(redis)中的資料,之後進而在頁面中展示。
如果使用ChanperoneServiceWork,整個流轉過程如下:
上面是對broker端進行審計的過程。首先從存儲消息的kafka(圖中上面的kafka)中消費資料,之後對收到的消息進行審計操作,之後将審計消息auditmsg以及相應的offset存儲起來(auditmsg存入hsqldb中,offset存到用來存儲審計資料的kafka的zk之中),之後再将審計消息auditmsg存入kafka(圖中下面的kafka)中,最後成功存儲并傳回給消費端(Consumer1,即ChaperoneServiceWork),之後再把hsqldb中的auditmsg标記為已統計。之後ChaperoneServiceCollector和producer端(consumer端)内嵌Audit Library時相同。
官方文檔部分介紹如下:
每個消息隻被審計一次
為了確定每個消息隻被審計一次,ChaperoneService使用了預寫式日志(WAL)。ChaperoneService每次在觸發Kafka審計消息時,會往審計消息裡添加一個UUID。這個帶有相關偏移量的消息在發送到Kafka之前被儲存在WAL裡。在得到Kafka的确認之後,WAL裡的消息被标記為已完成。如果ChaperoneService崩潰,在重新開機後它可以重新發送WAL裡未被标記的審計消息,并定位到最近一次的審計偏移量,然後繼續消費。WAL確定了每個Kafka消息隻被審計一次,而且每個審計消息至少會被發送一次。
接下來,ChaperoneCollector使用ChaperoneService之前添加過的UUID來移除重複消息。有了UUID和WAL,我們可以確定審計的一次性。在代理用戶端和伺服器端難以實作一次性保證,因為這樣會給它們帶來額外的開銷。我們依賴它們的優雅關閉操作,這樣它們的狀态才會被沖刷出去。
在層間使用一緻性的時間戳
因為Chaperone可以在多個層裡看到相同的Kafka消息,是以為消息内嵌時間戳是很有必要的。如果沒有這些時間戳,在計數時會發生時間錯位。在Uber,大部分發送到Kafka的資料要麼使用avro風格的schema編碼,要麼使用JSON格式。對于使用schema編碼的消息,可以直接擷取時間戳。而對于JSON格式的消息,需要對JSON資料進行解碼才能拿到時間戳。為了加快這個過程,我們實作了一個基于流的JSON消息解析器,這個解析器無需預先解碼整個消息就可以掃描到時間戳。這個解析器用在ChaperoneService裡是很高效的,不過對代理用戶端和伺服器來說仍然需要付出很高代價。是以在這兩個層裡,我們使用的是消息的處理時間戳。因為時間戳的不一緻造成的層間計數差異可能會觸發錯誤的資料丢失警告。我們正在着手解決時間戳不一緻問題,之後也會把解決方案公布出來。
溫馨提示: github上的quickstart中,如果不能根據腳本自動安裝kafka和zk,而是自己安裝kafka和zk的話,需要改動腳本、配置檔案甚至源碼才能将服務運作起來。另外需要安裝hsqldb和mysql(redis)。
這是個收費産品,文檔中介紹的并不多。和Chaperone相同,主要也是根據消息payload内嵌timestamp來實作,計算time_bucket的算法是:floor((timestamp /15)*15)。
架構圖如下:
主要是在producer端或者consumer端内嵌審計程式(相當于Chaperone的Audit Library)繼續審計,最終将審計消息同樣存入kafka中,最後的web系統是直接消費kafka中的審計消息進行内容呈現。
web系統部分呈現如下:
Kafka Monitor是基于在消息payload内嵌index和timestamp來實作審計:消息丢失,消息重複以及端到端延遲等。
幾種典型的metrics解釋:
name
description
produce-avaliablility-avg
The average produce availability
consume-avaliability-avg
The average consume availability
records-produced-total
The total number of records that are produced
records-consumed-total
The total number of records that are consumed
records-lost-total
The total number of records that are lost
records-duplicated-total
The total number of records that are duplicated
records-delay-ms-avg
The average latency of records from producer to consumer
records-produced-rate
The average number of records per second that are produced
produce-error-rate
The average number of errors per second
consume-error-rate
records-delay-ms-99th
The 99th percentile latency of records from producer to consu
records-delay-ms-999th
The 999th percentile latency of records from producer to consumer
records-delay-ms-max
The maximum latency of records from producer to consumer