flume 作為 cloudera 開發的實時日志收集系統,受到了業界的認可與廣泛應用。flume 初始的發行版本目前被統稱為 flume og(original generation),屬于 cloudera。但随着 flume 功能的擴充,flume og 代碼工程臃腫、核心元件設計不合理、核心配置不标準等缺點暴露出來,尤其是在 flume og 的最後一個發行版本 0.94.0 中,日志傳輸不穩定的現象尤為嚴重,為了解決這些問題,2011 年 10 月 22 号,cloudera 完成了 flume-728,對 flume 進行了裡程碑式的改動:重構核心元件、核心配置以及代碼架構,重構後的版本統稱為 flume ng(next generation);改動的另一原因是将 flume 納入 apache 旗下,cloudera flume 改名為 apache flume。ibm 的這篇文章:《 flume ng:flume 發展史上的第一次革命 》,從基本元件以及使用者體驗的角度闡述 flume og 到 flume ng 發生的革命性變化。本文就不再贅述各種細枝末節了,不過這裡還是簡要提下 flume ng (1.x.x)的主要變化:
sources和sinks 使用channels 進行連結
兩個主要channel:
a) in-memory channel 非持久性支援,速度快
b) jdbc-based channel 持久性支援
不再區分邏輯和實體node,所有實體節點統稱為agent,每個agent都能運作0個或多個sources和sinks
不再需要master節點和對zookeeper的依賴,配置檔案簡單化。
插件化,一部分面對使用者,工具或系統開發人員。
使用thrift、avro flume sources 可以從flume0.9.4 發送 events 到flume 1.x
注:本文所使用的 flume 版本為 flume-1.4.0-cdh4.7.0,不需要額外的安裝過程,解壓縮即可用。
元件
功能
agent
使用jvm 運作flume。每台機器運作一個agent,但是可以在一個agent中包含多個sources和sinks
client
生産資料,運作在一個獨立的線程
source
從client收集資料,傳遞給channel
sink
從channel收集資料,運作在一個獨立線程
channel
連接配接 sources 和 sinks ,這個有點像一個隊列
events
可以是日志記錄、 avro 對象等
flume以agent為最小的獨立運作機關。一個agent就是一個jvm。單agent由source、sink和channel三大元件構成,如下圖:

flume的資料流由事件(event)貫穿始終。事件是flume的基本資料機關,它攜帶日志資料(位元組數組形式)并且攜帶有頭資訊,這些event由agent外部的source,比如上圖中的web server生成。當source捕獲事件後會進行特定的格式化,然後source會把事件推入(單個或多個)channel中。你可以把channel看作是一個緩沖區,它将儲存事件直到sink處理完該事件。sink負責持久化日志或者把事件推向另一個source。
很直白的設計,其中值得注意的是,flume提供了大量内置的source、channel和sink類型。不同類型的source,channel和sink可以自由組合。組合方式基于使用者設定的配置檔案,非常靈活。比如:channel可以把事件暫存在記憶體裡,也可以持久化到本地硬碟上。sink可以把日志寫入hdfs, hbase,甚至是另外一個source等等。
如果你以為flume就這些能耐那就大錯特錯了。flume支援使用者建立多級流,也就是說,多個agent可以協同工作,并且支援fan-in、fan-out、contextual routing、backup routes。如下圖所示:
作為生産環境運作的軟體,高可靠性是必須的。
從單agent來看,flume使用基于事務的資料傳遞方式來保證事件傳遞的可靠性。source和sink被封裝進一個事務。事件被存放在channel中直到該事件被處理,channel中的事件才會被移除。這是flume提供的點到點的可靠機制。
從多級流來看,前一個agent的sink和後一個agent的source同樣有它們的事務來保障資料的可靠性。
還是靠channel。推薦使用filechannel,事件持久化在本地檔案系統裡(性能較差)。
flume架構整體上看就是 source -->c hannel --> sink 的三層架構,類似生成者和消費者的架構,他們之間通過queue(channel)傳輸,解耦。
source:完成對日志資料的收集,分成 transtion 和 event 打入到channel之中
channel:主要提供一個隊列的功能,對source提供中的資料進行簡單的緩存
sink:取出channel中的資料,進行相應的存儲檔案系統,資料庫,或者送出到遠端伺服器
對現有程式改動最小的使用方式是使用是直接讀取程式原來記錄的日志檔案,基本可以實作無縫接入,不需要對現有程式進行任何改動。
對于直接讀取檔案source, 主要有兩種方式:
可通過寫unix command的方式組織資料,最常用的就是tail -f [file]。
可以實作實時傳輸,但在flume不運作和腳本錯誤時,會丢資料,也不支援斷點續傳功能。因為沒有記錄上次檔案讀到的位置,進而沒辦法知道,下次再讀時,從什麼地方開始讀。特别是在日志檔案一直在增加的時候。flume的source挂了。等flume的source再次開啟的這段時間内,增加的日志内容,就沒辦法被source讀取到了。不過flume有一個execstream的擴充,可以自己寫一個監控日志增加情況,把增加的日志,通過自己寫的工具把增加的内容,傳送給flume的node。再傳送給sink的node。要是能在tail類的source中能支援,在node挂掉這段時間的内容,等下次node開啟後在繼續傳送,那就更完美了。
spoolsource:是監測配置的目錄下新增的檔案,并将檔案中的資料讀取出來,可實作準實時。需要注意兩點:
1) 拷貝到spool目錄下的檔案不可以再打開編輯
2) spool目錄下不可包含相應的子目錄。在實際使用的過程中,可以結合log4j使用,使用log4j的時候,将log4j的檔案分割機制設為1分鐘一次,将檔案拷貝到spool的監控目錄。log4j有一個timerolling的插件,可以把log4j分割的檔案到spool目錄。基本實作了實時的監控。flume在傳完檔案之後,将會修改檔案的字尾,變為.completed(字尾也可以在配置檔案中靈活指定)
execsource,spoolsource對比:execsource可以實作對日志的實時收集,但是存在flume不運作或者指令執行出錯時,将無法收集到日志資料,無法何證日志資料的完整性。spoolsource雖然無法實作實時的收集資料,但是可以使用以分鐘的方式分割檔案,趨近于實時。如果應用無法實作以分鐘切割日志檔案的話,可以兩種收集方式結合使用。
channel有多種方式:有memorychannel, jdbc channel, memoryrecoverchannel, filechannel。memorychannel可以實作高速的吞吐,但是無法保證資料的完整性。memoryrecoverchannel在官方文檔的建議上已經建義使用filechannel來替換。filechannel保證資料的完整性與一緻性。在具體配置filechannel時,建議filechannel設定的目錄和程式日志檔案儲存的目錄設成不同的磁盤,以便提高效率。
sink在設定存儲資料時,可以向檔案系統中,資料庫中,hadoop中儲資料,在日志資料較少時,可以将資料存儲在檔案系中,并且設定一定的時間間隔儲存資料。在日志資料較多時,可以将相應的日志資料存儲到hadoop中,便于日後進行相應的資料分析。
将上述配置存為:example.conf
然後我們就可以啟動flume了:
ps:-dflume.root.logger=info,console 僅為 debug 使用,請勿生産環境生搬硬套,否則大量的日志會傳回到終端
然後我們再開一個 shell 終端視窗,telnet 上配置中偵聽的端口,就可以發消息看到效果了:
flume 終端視窗此時會列印出如下資訊,就表示成功了:
自此,咱們的第一個 flume agent 算是部署成功了!
啟動如下指令,就可以在 hdfs 上看到效果了。
ps:實際環境中有這樣的需求,通過在多個agent端tail日志,發送給collector,collector再把資料收集,統一發送給hdfs存儲起來,當hdfs檔案大小超過一定的大小或者超過在規定的時間間隔會生成一個檔案。
flume 實作了兩個trigger,分别為sizetriger(在調用hdfs輸出流寫的同時,count該流已經寫入的大小總和,若超過一定大小,則建立新的檔案和輸出流,寫入操作指向新的輸出流,同時close以前的輸出流)和timetriger(開啟定時器,當到達該點時,自動建立新的檔案和輸出流,新的寫入重定向到該流中,同時close以前的輸出流)。
a fan-in flow using avro rpc to consolidate events in one place
上面采用的就是類似 cs 架構,各個flume agent 節點先将各台機器的日志彙總到 consolidation 節點,然後再由這些節點統一寫入 hdfs,并且采用了負載均衡的方式,你還可以配置高可用的模式等等。
flume 報錯 java.lang.outofmemoryerror: gc overhead limit exceeded
或者:java.lang.outofmemoryerror: java heap space
exception in thread "sinkrunner-pollingrunner-defaultsinkprocessor" java.lang.outofmemoryerror: java heap space
試試在 .bashrc 或 env.sh 中添加啟動參數:
具體參見:
<a href="http://stackoverflow.com/questions/1393486/error-java-lang-outofmemoryerror-gc-overhead-limit-exceeded">http://stackoverflow.com/questions/1393486/error-java-lang-outofmemoryerror-gc-overhead-limit-exceeded</a>
把你的 jdk7 換成 jdk6 試試。