天天看點

日志系統之Flume日志收集flume簡介Source的擴充Sink 的擴充Interceptor使用Selector 的使用

最近接手維護一個日志系統,它用于對應用伺服器上的日志進行收集然後提供實時分析、處理并最後将日志存儲到目标存儲引擎。針對這三個環節,業界已經有一套元件來應對各自的需求需求,它們是flume+kafka+hdfs/hbase。我們在實時分析、存儲這兩個環節,選擇跟業界的實踐相同,但agent是團隊自己寫的,出于對多種資料源的擴充需求以及原來收集日志的方式存在的一些不足,于是調研了一下flume的agent。結果是flume非常契合我們的實際需求,并且擁有良好的擴充性與穩定性。于是打算采用flume的agent替換我們原先的實作。

本文介紹我們如何使用flume

agent以及為了滿足我們的需求進行了哪些擴充。備注:全文所指的flume均指flume-ng,版本基于1.6.0。

flume

通過Agent對各個伺服器上的日志進行收集,它依賴三大核心元件,它們分别是:source,channel,sink。它們之間的串聯關系如下圖:

日志系統之Flume日志收集flume簡介Source的擴充Sink 的擴充Interceptor使用Selector 的使用

如果你的需求是接近“準實時”的日志收集并且你非要用這個souce,應對的方案是:你隻能選擇将應用程式的日志架構(比如常用的log4j)的appender的“滾動機制”設定為按分鐘滾動(也就是每分鐘産生一個新日志檔案)。這種機制不是不可行,但有些不足的地方,比如日志檔案過多:當日志除了要被日志系統收集,還需要本地保留時,這種機制将非常難以接受。

我們希望日志檔案按天滾動産生新的日志檔案,當天的日志以追加的方式寫入當天的日志檔案并且Agent還要能夠以接近實時的速度收集新産生的日志(追加)的。如果agent挂掉或者伺服器當機,日志檔案不能丢失,agent能夠自動跨日期收集。其實,spooling

directory source已經為我們的實作提供了模闆,但要進行一些改造,主要是以下幾點:

(1)原先的Spooling Directory Source不支援對收集的日志檔案的内容進行追加:

日志系統之Flume日志收集flume簡介Source的擴充Sink 的擴充Interceptor使用Selector 的使用

如果檔案有任何改動,将以異常的形式抛出。此處需要移除異常

(2)對當日日志檔案進行持續監控

原先的實作,當擷取不到event直接删除或者重命名目前檔案,并自動混動到下一個檔案:

修改後的實作,當目前檔案不是當天的日志檔案時才處理目前檔案并自動滾動到下一個檔案,如果是當日檔案,則繼續跟蹤:

另外此處,我們判斷是否是目标檔案(當日日志檔案)的處理方式是比對伺服器日期跟檔案名中包含的日期是否一緻:

是以在新的配置裡還需要加入日期格式的配置,通常是:yyyy-MM-dd。

Sink在Flume的agent元件中充當資料輸出的作用。在flume之前的版本(1.5.2)中已經對多個資料持久化系統提供了内置支援(比如hdfs/HBase等),但預設是沒有kafka的。如果我們想将日志消息發送到kafka,就需要自己擴充一個kafkaSink。後來通過搜尋發現在最新的stable

release版本:1.6.0中,官方已經內建了kafkaSink。不過1.6.0是5月20号剛剛釋出,官方的Download頁面以及User

Guide還沒有進行更新,是以請在版本清單頁面下載下傳1.6.0版本。在下載下傳到的安裝包内有最新的KafkaSink介紹。

核心的配置有:brokerList(為了高可用性,flume建議至少填寫兩個broker配置)、topic。詳見清單:

日志系統之Flume日志收集flume簡介Source的擴充Sink 的擴充Interceptor使用Selector 的使用

出于好奇心,在github上大概浏覽了官方實作kafkaSink的源碼,發現Event的Header部分并沒有被打包進消息發送走:

這一點,可能并不滿足我們的需求:我們需要消息頭裡的資訊成為消息的一部分,然後在storm裡針對header資訊進行一些處理。比如:

(1)我們會預設在頭裡加入産生日志的伺服器的Host,以便對日志進行分流或對沒有存儲host的日志進行“補償”

(2)我們會預設在頭裡加入日志類型的辨別,以便區分不同的日志并分流到不同的解析器進行解析

因為日志的來源以及形式是多樣的,是以header裡這些攜帶的資訊是必要的。而flume官方的KafkaSink卻過濾掉了header中的資訊。是以,我們選擇對其進行簡單的擴張,将Event的header跟body打包成一個完整的json對象。具體的實作:

上面提到日志的源以及格式多種多樣,我們不可能将所有工具、元件的日志格式按照我們想要的方式作格式化,特别是一些封閉的元件或線上的系統。很顯然source跟sink隻負責日志的收集和發送,并不會區分日志内容。而flume提供的Interceptor這一功能,給flume提供了更強大的擴充性。而我們攔擊日志,并給其添加特定的header就是通過flume内置的幾個interceptor實作的。我們應用了這麼幾個interceptor:

(1)host:往header中設定目前主機的Host資訊;

(2)static:往header中設定一個預先配好的key-value對,我們用它來鑒别不同的日志源

(3)regex:通過将Event的body轉換成一個UTF-8的字元串,然後比對正規表達式,如果比對成功,則可以選擇放行或者選擇删除

前兩個interceptor我們之前已經提及過它的用途,而第三個我們用它來比對日志中是否存在“DEGUG”字樣的tag,如此存在,則删除該日志(這個是可選的)。

原文釋出時間為:2015-06-06

本文作者:vinoYang