天天看點

Flume 使用學習小結概述Flume架構Flume實際使用說說Flume的事務

flume可進行大量日志資料采集、聚合和并轉移到存儲中,并提供資料在流轉中的事務機制;

可适用場景:日志--->flume--->實時計算(如mq+storm) 、日志--->flume--->離線計算(如odps、hdfs、hbase)、日志--->flume--->elasticsearch等。

Flume 使用學習小結概述Flume架構Flume實際使用說說Flume的事務

對照這個圖,作一些說明;

event 一條日志資料在flume中對應一個event對象。不過給他添加了header屬性,就是一個map,放 一些額外資訊,可以針對每條event做特殊處理,比如channel的選擇。這些額外的鍵值對可以在event從source到channel之間的interceptor(攔截器)中set。

source 負責日志流入,比如從檔案、網絡、mq等資料源流入資料。

channel 負責資料聚合/暫存,以供sink消費掉,事務機制主要在這裡實作

sink 負責資料轉移到存儲,比如從channel拿到日志後直接存儲到odps、elasticsearch等。

攔截器 如果配置了攔截器,則event從source 進入channel前,經過攔截器鍊做過濾或其他處理;如識别不需要的資料等

選擇器 flume預設實作有replicatingchannelselector(複制,event可同時發往多個channel)和multiplexingchannelselector(複用,可根據header中某個字段值,發往不同的channel)

下圖是個簡單的多channel、sink情況;flume還包含一些其他的進階的特性和使用方法,有時間可以繼續研究。

Flume 使用學習小結概述Flume架構Flume實際使用說說Flume的事務

遇到的問題:

日志資料中包含空行等不正确格式的記錄,導緻從channel中take日志記錄後儲存到odps失敗;失敗的操作被事務復原,結果是資料流傳在這個地方錯誤循環下去。

日志資料按實際生成的日期為分區儲存在opds表的分區中,導入日志資料的日期為實際日期的後一天。

在嘗試了幾種方法後,最後選擇自定義了一個攔截器實作(ualogfilteringinterceptor),能很好的達到目前的需求,他主要做如下兩件事:

過濾掉不需要、不規範的資料,并且把過濾掉的這些資料存儲到指定的檔案裡,每天一個檔案(如果有異常記錄)。

在每條event的header中加入qt值(qt值為前一天的日期,格式為yyyymmdd),每條event根據該值保持到odps的對應表分區中。

event從source put到channel和從channel take到sink後落地,這兩個步驟都包裹在事務中;我這裡說下memorytransaction大緻實作。

memorytransaction 主要用到了兩個雙向阻塞隊列(linkedblockingdeque)putlist和takelist作為緩沖區,同時配合使用memorychannel中的linkedblockingdeque queue;隊列的大小通過flume的配置初始化好;

put事務

批量資料循環put到putlist中

commit,把putlist隊列中資料offer到queue隊列中,然後釋放信号量,清空(clear)putlist隊列

rollback,清空(clear)putlist隊列

這裡其實沒有做太多事。

take事務

檢查takelist隊列大小是否夠用,從queue隊列中poll event到takelist隊列中

commit,表明被sink正确消費掉,清空(clear)takelist隊列

rollback,異常出現,則把takelist隊列中的event返還到queue隊列頂部