flume可進行大量日志資料采集、聚合和并轉移到存儲中,并提供資料在流轉中的事務機制;
可适用場景:日志--->flume--->實時計算(如mq+storm) 、日志--->flume--->離線計算(如odps、hdfs、hbase)、日志--->flume--->elasticsearch等。
對照這個圖,作一些說明;
event 一條日志資料在flume中對應一個event對象。不過給他添加了header屬性,就是一個map,放 一些額外資訊,可以針對每條event做特殊處理,比如channel的選擇。這些額外的鍵值對可以在event從source到channel之間的interceptor(攔截器)中set。
source 負責日志流入,比如從檔案、網絡、mq等資料源流入資料。
channel 負責資料聚合/暫存,以供sink消費掉,事務機制主要在這裡實作
sink 負責資料轉移到存儲,比如從channel拿到日志後直接存儲到odps、elasticsearch等。
攔截器 如果配置了攔截器,則event從source 進入channel前,經過攔截器鍊做過濾或其他處理;如識别不需要的資料等
選擇器 flume預設實作有replicatingchannelselector(複制,event可同時發往多個channel)和multiplexingchannelselector(複用,可根據header中某個字段值,發往不同的channel)
下圖是個簡單的多channel、sink情況;flume還包含一些其他的進階的特性和使用方法,有時間可以繼續研究。
遇到的問題:
日志資料中包含空行等不正确格式的記錄,導緻從channel中take日志記錄後儲存到odps失敗;失敗的操作被事務復原,結果是資料流傳在這個地方錯誤循環下去。
日志資料按實際生成的日期為分區儲存在opds表的分區中,導入日志資料的日期為實際日期的後一天。
在嘗試了幾種方法後,最後選擇自定義了一個攔截器實作(ualogfilteringinterceptor),能很好的達到目前的需求,他主要做如下兩件事:
過濾掉不需要、不規範的資料,并且把過濾掉的這些資料存儲到指定的檔案裡,每天一個檔案(如果有異常記錄)。
在每條event的header中加入qt值(qt值為前一天的日期,格式為yyyymmdd),每條event根據該值保持到odps的對應表分區中。
event從source put到channel和從channel take到sink後落地,這兩個步驟都包裹在事務中;我這裡說下memorytransaction大緻實作。
memorytransaction 主要用到了兩個雙向阻塞隊列(linkedblockingdeque)putlist和takelist作為緩沖區,同時配合使用memorychannel中的linkedblockingdeque queue;隊列的大小通過flume的配置初始化好;
put事務
批量資料循環put到putlist中
commit,把putlist隊列中資料offer到queue隊列中,然後釋放信号量,清空(clear)putlist隊列
rollback,清空(clear)putlist隊列
這裡其實沒有做太多事。
take事務
檢查takelist隊列大小是否夠用,從queue隊列中poll event到takelist隊列中
commit,表明被sink正确消費掉,清空(clear)takelist隊列
rollback,異常出現,則把takelist隊列中的event返還到queue隊列頂部