天天看點

Flume-ng的原理和使用1. 介紹2. 架構3. 使用場景4. 安裝和使用5. 開發相關6. 最佳實踐

flume ng是cloudera提供的一個分布式、可靠、可用的系統,它能夠将不同資料源的海量日志資料進行高效收集、聚合、移動,最後存儲到一個中心化資料存儲系統中。由原來的flume og到現在的flume ng,進行了架構重構,并且現在ng版本完全不相容原來的og版本。經過架構重構後,flume ng更像是一個輕量的小工具,非常簡單,容易适應各種方式日志收集,并支援failover和負載均衡。

flume 使用 java 編寫,其需要運作在 java1.6 或更高版本之上。

flume的架構主要有一下幾個核心概念:

event:一個資料單元,帶有一個可選的消息頭

flow:event從源點到達目的點的遷移的抽象

client:操作位于源點處的event,将其發送到flume agent

agent:一個獨立的flume程序,包含元件source、channel、sink

source:用來消費傳遞到該元件的event

channel:中轉event的一個臨時存儲,儲存有source元件傳遞過來的event

sink:從channel中讀取并移除event,将event傳遞到flow pipeline中的下一個agent(如果有的話)

flume 的核心是把資料從資料源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存資料,待資料真正到達目的地後,删除自己緩存的資料。

flume 傳輸的資料的基本機關是 event,如果是文本檔案,通常是一行記錄,這也是事務的基本機關。event 從 source,流向 channel,再到 sink,本身為一個 byte 數組,并可攜帶 headers 資訊。event 代表着一個資料流的最小完整單元,從外部資料源來,向外部的目的地去。

flume 運作的核心是 agent。它是一個完整的資料收集工具,含有三個核心元件,分别是 source、channel、sink。通過這些元件,event 可以從一個地方流向另一個地方,如下圖所示。

Flume-ng的原理和使用1. 介紹2. 架構3. 使用場景4. 安裝和使用5. 開發相關6. 最佳實踐

source 可以接收外部源發送過來的資料。不同的 source,可以接受不同的資料格式。比如有目錄池(spooling directory)資料源,可以監控指定檔案夾中的新檔案變化,如果目錄中有檔案産生,就會立刻讀取其内容。

channel 是一個存儲地,接收 source 的輸出,直到有 sink 消費掉 channel 中的資料。channel 中的資料直到進入到下一個channel中或者進入終端才會被删除。當 sink 寫入失敗後,可以自動重新開機,不會造成資料丢失,是以很可靠。

sink 會消費 channel 中的資料,然後送給外部源或者其他 source。如資料可以寫入到 hdfs 或者 hbase 中。

client端操作消費資料的來源,flume 支援 avro,log4j,syslog 和 http post(body為json格式)。可以讓應用程式同已有的source直接打交道,如avrosource,syslogtcpsource。也可以 寫一個 source,以 ipc 或 rpc 的方式接入自己的應用,avro和 thrift 都可以(分别有 nettyavrorpcclient 和 thriftrpcclient 實作了 rpcclient接口),其中 avro 是預設的 rpc 協定。具體代碼級别的 client 端資料接入,可以參考官方手冊。

對現有程式改動最小的使用方式是使用是直接讀取程式原來記錄的日志檔案,基本可以實作無縫接入,不需要對現有程式進行任何改動。 對于直接讀取檔案 source,有兩種方式:

execsource: 以運作 linux 指令的方式,持續的輸出最新的資料,如 <code>tail -f 檔案名</code> 指令,在這種方式下,取的檔案名必須是指定的。 execsource 可以實作對日志的實時收集,但是存在flume不運作或者指令執行出錯時,将無法收集到日志資料,無法保證日志資料的完整性。

spoolsource: 監測配置的目錄下新增的檔案,并将檔案中的資料讀取出來。需要注意兩點:拷貝到 spool 目錄下的檔案不可以再打開編輯;spool 目錄下不可包含相應的子目錄。

spoolsource 雖然無法實作實時的收集資料,但是可以使用以分鐘的方式分割檔案,趨近于實時。

如果應用無法實作以分鐘切割日志檔案的話, 可以兩種收集方式結合使用。 在實際使用的過程中,可以結合 log4j 使用,使用 log4j的時候,将 log4j 的檔案分割機制設為1分鐘一次,将檔案拷貝到spool的監控目錄。

log4j 有一個 timerolling 的插件,可以把 log4j 分割檔案到 spool 目錄。基本實作了實時的監控。flume 在傳完檔案之後,将會修改檔案的字尾,變為 .completed(字尾也可以在配置檔案中靈活指定)。

flume source 支援的類型:

source類型

說明

avro source

支援avro協定(實際上是avro rpc),内置支援

thrift source

支援thrift協定,内置支援

exec source

基于unix的command在标準輸出上生産資料

jms source

從jms系統(消息、主題)中讀取資料,activemq已經測試過

spooling directory source

監控指定目錄内資料變更

twitter 1% firehose source

通過api持續下載下傳twitter資料,試驗性質

netcat source

監控某個端口,将流經端口的每一個文本行資料作為event輸入

sequence generator source

序列生成器資料源,生産序列資料

syslog sources

讀取syslog資料,産生event,支援udp和tcp兩種協定

http source

基于http post或get方式的資料源,支援json、blob表示形式

legacy sources

相容老的flume og中source(0.9.x版本)

目前有幾個 channel 可供選擇,分别是 memory channel, jdbc channel , file channel,psuedo transaction channel。比較常見的是前三種 channel。

memorychannel 可以實作高速的吞吐,但是無法保證資料的完整性。

memoryrecoverchannel 在官方文檔的建議上已經建義使用filechannel來替換。

filechannel保證資料的完整性與一緻性。在具體配置filechannel時,建議filechannel設定的目錄和程式日志檔案儲存的目錄設成不同的磁盤,以便提高效率。

file channel 是一個持久化的隧道(channel),它持久化所有的事件,并将其存儲到磁盤中。是以,即使 java 虛拟機當掉,或者作業系統崩潰或重新開機,再或者事件沒有在管道中成功地傳遞到下一個代理(agent),這一切都不會造成資料丢失。memory channel 是一個不穩定的隧道,其原因是由于它在記憶體中存儲所有事件。如果 java 程序死掉,任何存儲在記憶體的事件将會丢失。另外,記憶體的空間收到 ram大小的限制,而 file channel 這方面是它的優勢,隻要磁盤空間足夠,它就可以将所有事件資料存儲到磁盤上。

flume channel 支援的類型:

channel類型

memory channel

event資料存儲在記憶體中

jdbc channel

event資料存儲在持久化存儲中,目前flume channel内置支援derby

file channel

event資料存儲在磁盤檔案中

spillable memory channel

event資料存儲在記憶體中和磁盤上,當記憶體隊列滿了,會持久化到磁盤檔案(目前試驗性的,不建議生産環境使用)

pseudo transaction channel

測試用途

custom channel

自定義channel實作

sink在設定存儲資料時,可以向檔案系統、資料庫、hadoop存資料,在日志資料較少時,可以将資料存儲在檔案系中,并且設定一定的時間間隔儲存資料。在日志資料較多時,可以将相應的日志資料存儲到hadoop中,便于日後進行相應的資料分析。

flume sink支援的類型

sink類型

hdfs sink

資料寫入hdfs

logger sink

資料寫入日志檔案

avro sink

資料被轉換成avro event,然後發送到配置的rpc端口上

thrift sink

資料被轉換成thrift event,然後發送到配置的rpc端口上

irc sink

資料在irc上進行回放

file roll sink

存儲資料到本地檔案系統

null sink

丢棄到所有資料

hbase sink

資料寫入hbase資料庫

morphline solr sink

資料發送到solr搜尋伺服器(叢集)

elasticsearch sink

資料發送到elastic search搜尋伺服器(叢集)

kite dataset sink

寫資料到kite dataset,試驗性質的

custom sink

自定義sink實作

flume 使用事務性的方式保證傳送event整個過程的可靠性。sink 必須在 event 被存入 channel 後,或者,已經被傳達到下一站agent裡,又或者,已經被存入外部資料目的地之後,才能把 event 從 channel 中 remove 掉。這樣資料流裡的 event 無論是在一個 agent 裡還是多個 agent 之間流轉,都能保證可靠,因為以上的事務保證了 event 會被成功存儲起來。而 channel 的多種實作在可恢複性上有不同的保證。也保證了 event 不同程度的可靠性。比如 flume 支援在本地儲存一份檔案 channel 作為備份,而memory channel 将 event 存在記憶體 queue 裡,速度快,但丢失的話無法恢複。

下面,根據官網文檔,我們展示幾種flow pipeline,各自适應于什麼樣的應用場景:

多個 agent 順序連接配接:

Flume-ng的原理和使用1. 介紹2. 架構3. 使用場景4. 安裝和使用5. 開發相關6. 最佳實踐

可以将多個agent順序連接配接起來,将最初的資料源經過收集,存儲到最終的存儲系統中。這是最簡單的情況,一般情況下,應該控制這種順序連接配接的agent的數量,因為資料流經的路徑變長了,如果不考慮failover的話,出現故障将影響整個flow上的agent收集服務。

多個agent的資料彙聚到同一個agent:

Flume-ng的原理和使用1. 介紹2. 架構3. 使用場景4. 安裝和使用5. 開發相關6. 最佳實踐

這種情況應用的場景比較多,比如要收集web網站的使用者行為日志,web網站為了可用性使用的負載均衡的叢集模式,每個節點都産生使用者行為日志,可以為每個節點都配置一個agent來單獨收集日志資料,然後多個agent将資料最終彙聚到一個用來存儲資料存儲系統,如hdfs上。

多路(multiplexing)agent

Flume-ng的原理和使用1. 介紹2. 架構3. 使用場景4. 安裝和使用5. 開發相關6. 最佳實踐

這種模式,有兩種方式,一種是用來複制(replication),另一種是用來分流(multiplexing)。replication方式,可以将最前端的資料源複制多份,分别傳遞到多個channel中,每個channel接收到的資料都是相同的。

配置格式示例如下:

上面指定了selector的type的值為replication,其他的配置沒有指定,使用的replication方式,source1會将資料分别存儲到channel1和channel2,這兩個channel裡面存儲的資料是相同的,然後資料被傳遞到sink1和sink2。

multiplexing方式,selector可以根據header的值來确定資料傳遞到哪一個channel,配置格式,如下所示:

上面selector的type的值為multiplexing,同時配置selector的header資訊,還配置了多個selector的mapping的值,即header的值:如果header的值為value1、value2,資料從source1路由到channel1;如果header的值為value2、value3,資料從source1路由到channel2。

實作load balance功能

Flume-ng的原理和使用1. 介紹2. 架構3. 使用場景4. 安裝和使用5. 開發相關6. 最佳實踐

load balancing sink processor能夠實作load balance功能,上圖agent1是一個路由節點,負責将channel暫存的event均衡到對應的多個sink元件上,而每個sink元件分别連接配接到一個獨立的agent上,示例配置,如下所示:

實作failover能

failover sink processor能夠實作failover功能,具體流程類似load balance,但是内部處理機制與load balance完全不同:failover sink processor維護一個優先級sink元件清單,隻要有一個sink元件可用,event就被傳遞到下一個元件。如果一個sink能夠成功處理event,則會加入到一個pool中,否則會被移出pool并計算失敗次數,設定一個懲罰因子,示例配置如下所示:

flume 的 rpm 安裝方式很簡單,這裡不做說明。

安裝成功之後,在 /etc/flume/conf 目錄建立f1.conf 檔案,内容如下:

接下來啟動 agent:

參數說明:

<code>-n</code> 指定agent名稱

<code>-c</code> 指定配置檔案目錄

<code>-f</code> 指定配置檔案

<code>-dflume.root.logger=debug,console</code> 設定日志等級

下面可以啟動一個 avro-client 用戶端生産資料:

在 /etc/flume/conf 目錄建立 f2.conf 檔案,内容如下:

然後,手動拷貝一個檔案到 /root/log 目錄,觀察日志輸出以及/root/log 目錄下的變化。

在 /etc/flume/conf 目錄建立 f3.conf 檔案,内容如下:

說明:

通過 interceptors 往 header 裡添加 timestamp,這樣做,可以在 hdfs.path 引用系統内部的時間變量或者主機的 hostname。

通過設定 <code>hdfs.inuseprefix</code>,例如設定為 <code>.</code>時,hdfs 會把該檔案當做隐藏檔案,以避免在 mr 過程中讀到這些臨時檔案,引起一些錯誤

如果使用 lzo 壓縮,則需要手動建立 lzo 索引,可以通過修改 hdfssink 的代碼,通過代碼建立索引

filechannel 的目錄最好是和 spooldir 的資料目錄處于不同磁盤。

從 github 下載下傳源代碼并編譯:

如果提示找不到 hadoop-test 的 jar 包,則修改 pom.xml 中的版本,如改為 <code>2.0.0-mr1-cdh4.7.0</code>,具體版本視你使用的分支版本而定,我這裡是 cdh4.7.0。

如果提示找不到 uanodeset-parser 的 jarb,則在 pom.xml 中添加下面倉庫:

子產品命名規則:所有的 source 以 src 開頭,所有的 channel 以 ch 開頭,所有的 sink 以 sink 開頭;

子產品之間内部通信統一使用 avro 接口;

将日志采集系統系統分為三層:agent 層,collector 層和 store 層,其中 agent 層每個機器部署一個程序,負責對單機的日志收集工作;collector 層部署在中心伺服器上,負責接收agent層發送的日志,并且将日志根據路由規則寫到相應的 store 層中;store 層負責提供永久或者臨時的日志存儲服務,或者将日志流導向其它伺服器。

擴充 memorychannel 和 filechannel ,提供 dualchannel 的實作,以提供高吞吐和大緩存

監控 collector hdfssink寫資料到 hdfs 的速度、filechannel 中擁堵的 events 數量,以及寫 hdfs 狀态(檢視是否有 .tmp 檔案生成)