天天看點

Flume詳解1、Flume簡介

1、Flume簡介

         Flume是Cloudera 開發的分布式日志收集系統,是 hadoop 周邊元件之一。提供分布式、高可靠和高可用的海量日志聚合的服務,支援在系統中定制各類資料發送方;同時,Flume提供對資料進行簡單處理,并寫到各種資料接收方(可定制)。目前 Flume 已納入 apache 旗下,cloudera Flume 改名為 Apache Flume。

1.1、設計目标

1.1.1、可靠性

         當節點出現故障時,日志能夠被傳送到其他節點上而不會丢失。Flume提供了三種級别的可靠性保障,從強到弱以此分别為:end-to-end(收集資料agent首先将event寫到磁盤上,當資料傳送成功後,再删除;如果資料發送失敗,可以重新發送)、Store on failure(這也是scribe采用的政策,當資料接受方crash時,将資料寫到本地,待恢複後,繼續發送)、best effort(資料發送到接收方後,不會進行确認)。

1.1.2、可擴充性

         Flume采用了三層架構,分别為agent、collector和storage,每一層均可以水準擴充。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用zookeeper進行管理和負載均衡),這就避免了單點故障問題。

1.1.3、可管理性

         所有agent和collector由master統一管理,這使得系統便于維護。多master情況,flume利用zookeeper和gossip,保證動态配置資料的一緻性。使用者可以在master上檢視各個資料源或者資料流執行情況,且可以對各個資料源配置和動态加載。Flume提供了web和shell script command兩種形式對資料流進行管理。

1.1.4、功能可擴充性

         使用者可以根據需要添加自己的agent、collector或者storage。此外,flume自帶了很多元件,包括各種agent(file、syslog等),collector和storage(file、HDFS等)。

1.2、Flume架構

1.2.1、Flume邏輯架構

Flume詳解1、Flume簡介

         正如前面提到的,flume采用了分層架構,分别為:agent、collector和storage。其中,agent和collector均由兩部分組成:source和sink,source是資料來源,sink是資料去向。

         Flume使用兩個元件:master和node。Node根據在master shell或者web中動态配置,決定其作為agent還是collector。

1.2.2、Agent詳解

         Agent的作用是将資料源的資料發送給collector。

  1、Flume自帶了很多直接可用的資料源(source),例如:

  • Text(“filename”):将檔案filename作為資料源,按行發送
  • tail(“filename”):将探測filename新産生的資料,按行發送出去
  • fsyslogTcp(5140):監聽TCP的5140端口,并且接受到的資料發送出去
  • tailDir(“dirname[,fileregex=”.*”[,startFromEnd=false[,recurseDepth=0]]]):監聽目錄中的檔案末尾,使用這則去標明需要監聽的檔案(不包括目錄),recurseDepth為遞歸監聽其子目錄的深度。

     注:更多可參見 http://www.cnblogs.com/zhangmiao-chp/archive/2011/05/18/2050465.html

  2、 同時提供了很多sink,如:

  • console(“format”):直接将資料顯示在console上
  • text(“txtfile”):将資料寫到檔案txtfile中
  • dfs(“dfsfile”):将資料寫到HDFS上的dfsfile檔案中
  • syslogTcp(“host”,prot):将資料通過tcp傳遞給host節點
  • agentSink[(“machine”[,port])]:等價于agentE2ESink,如果省略machine參數,預設使用flume.collector.event.host;如果省略port參數,預設使用flume.collector.event.port。
  • agentDFOSink[("machine" [,port])]:本地熱備agent,agent發現collector節點故障後,不斷檢查collector的存活狀态以便重新發送event,在此間産生的資料将緩存到本地磁盤中
  • agentBESink[("machine"[,port])]:不負責的agent,如果collector故障,将不做任何處理,它發送的資料也将被直接丢棄
  • agentE2EChain:指定多個collector提高可用性。當向主collector發送event失效後,轉向第二個collector發送,當所有的collector失敗後,它會非常執着的再來一遍

     注:想了解更多見http://www.cnblogs.com/zhangmiao-chp/archive/2011/05/18/2050472.html

1.2.3、Collector詳解

         Collector的作用是将多個agent的資料彙總後,添加到storage中,它的source和sink與agent類似。

1)資料源(source)如:

  • collectorSource[(port)]:Collector source,監聽端口彙聚資料
  • autoCollectorSource:通過master協調實體節點自動彙聚資料
  • logicalSource:邏輯source,由master配置設定端口并監聽rpcSink

 2)Sink如:

  • collectorSink( "fsdir","fsfileprefix",rollmillis):collectorSink,資料通過collector彙聚之後發送到hdfs, fsdir 是hdfs目錄,fsfileprefix為檔案字首碼
  • customdfs(“hdfspath”[,”format”]):自定義格式dfs

1.2.4、Storage詳解

         Storage是存儲系統,可以是一個普通file,也可以是hdfs、hive、hbase等分布式存儲。

1.2.5、Master詳解

         Master是管理協調agent和collector的配置等資訊,是flume叢集的控制器。

1.3、flume資料流

         在flume中,最重要的抽象是data flow(資料流),data flow描述了資料從産生、傳輸、處理并最終寫入目标的一條路徑。

Flume詳解1、Flume簡介

      1、對于agent資料流配置就是從那裡得到資料,并把資料發送到那個collector。

      2、對于collector是接收agent發送過來的資料,并把資料發送到指定的目标機器上。

注:flume架構對hadoop和zookeeper的依賴隻是在jar包上,并不要求flume啟動時必須将hadoop和zookeeper服務也啟動。

2、Flume-NG詳解

2.1、Flume-ng架構

Flume詳解1、Flume簡介

         Flume NG是一個從flume繼承保留來的,是以大部分概念是相同的,官網給出的解釋如下:

  • you still have sources and sinks and they still do the same thing.they are now connected by channels.
  • channels are pluggable and dictate durability.flume NG ships with an in-memory channel for fast,but non-durable event;delivery and a jdbc-based channel for durable event delivery.we have recently added a file-based durable channel too.
  • there’s no more logical or physical nodes.we call all physical nodes agents and agents can run zero or more sources and sinks.
  • There's no master and no ZooKeeper dependency anymore. At this time, Flume runs with a simple file-based configuration system
  • Just about everything is a plugin, some end user facing, some for tool and system developers. (Specifically, sources, sinks, channels, configuration providers, lifecycle management policies, input and output formats, compression, source and sink channel adapters, and the kitchen sink.)
  • Tons of things are not yet implemented. Please file JIRAs and / or vote for features you deem important.

2.1.1、event

         事件是flume ng中一種廣義的資料機關。事件是類似于JMS和類似郵件系統的郵件,一般都比較小。事件是在一個更大的資料集常用單記錄。事件被做成頭和身體的,前者是一個鍵/值映射,後者是一個任意位元組數組。

2.1.2、source

A source of data from which Flume NG receives data. Sources can be pollable or event driven. Pollable sources, like they sound, are repeatedly polled by Flume NG source runners where as event driven sources are expected to be driven by some other force. An example of a pollable source is the sequence generator which simple generates events whose body is a monotonically increasing integer. Event driven sources include the Avro source which accepts Avro RPC calls and converts the RPC payload into a Flume event and the netcat source which mimics the nc command line tool running in server mode. Sources are a user accessible API extension point.

2.1.3、sink

A sink is the counterpart to the source in that it is a destination for data in Flume NG. Some of the builtin sinks that are included with Flume NG are the Hadoop Distributed File System sink which writes events to HDFS in various ways, the logger sink which simply logs all events received, and the null sink which is Flume NG's version of /dev/null. Sinks are a user accessible API extension point.

2.1.4、channel

         通道是一個源和一個接收器之間的管道事件。管道也決定了一個源和一個接收器之間的事件持久性。例如,一個通道可能會在記憶體中,在記憶體雖然快,但不作任何保證防止資料丢失,它也可以全面持久的(進而可靠),其中每一個事件,保證傳遞連接配接的接收器,即使在失敗的案例,如斷電。管道是一個使用者通路API的擴充點。

2.1.5、agent

Flume NG 歸納代理的概念:代理人是任何實體的JVM中運作的Flume NG。一般每台機器運作一個agent,但是在一個單一的agent中可以運作任意數量的source、sink和channel。

2.1.6、client

用戶端并不一定是一個Flume NG元件盡可能連接配接到Flume 和發送資料到源。一個流行和良好的用戶端的一個例子将是一個像的Log4j Appender直接發送事件到flume avro源的日志記錄。另一個例子可能是syslog守護程序。

2.2、flume NG安裝配置

Flume-ng來源于cloudera公司開發的flume-og系統,flume-ng對flume-og系統進行了重構差生的。是以如果我們采用cloudera manager安裝的hadoop叢集,則安裝flume-ng的過程非常簡單。

結合實際生成環境的配置,在這裡主要講解一下source、channel、sink和interceptors的配置以及他們的作用,如果想深入了解可以檢視如下連結:

https://flume.apache.org/FlumeUserGuide.html

2.2.1、flume-ng安裝

         在采用hadoop建構叢集的過程中,為了保證伺服器版本的穩定性,我們一般都會自己搭建hadoop的yum源,采用yum的方式安裝flume-ng是一個非常愉快的過程,我們需要的就是在/etc/yum.repo.d/中配置我們自己搭建yum源(如果是cloudera的話,配置cdh的源),然後執行yum search flume-ng檢視是否能找到所需的依賴包。然後執行yum install flume-ng就可以完成flume-ng的安裝。

         Flume-ng比較複雜的地方就是flume配置檔案的配置。如果采用cloudera mananger cdh4源安裝出來的配置檔案位于:/usr/lib/flume-ng/conf目錄下的flume.conf檔案中;

2.2.2、flume-ng配置source源

         下圖是flume-ng支援的所有source的配置,在生産環境中,不是所有的配置都能用到,常用的有avro和exec配置。因為我們主要講解的就是這兩種源的配置。

Flume詳解1、Flume簡介

      ① 關exec配置方式:

Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as datewill probably not - the former two commands produce streams of data where as the latter produces a single event and exits.

Flume詳解1、Flume簡介

配置執行個體:

Flume詳解1、Flume簡介

         注意:在使用的command的時候,最好用tial –n 0 –F /var/log/source,這樣配置隻會監聽到後續重新整理到日志檔案中的最新日志内容。

       ②關于avro源配置方式

Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.

Flume詳解1、Flume簡介

         配置執行個體:

Flume詳解1、Flume簡介

         注意:當源為avro源配置時,需要采用avro-client或者配置avro sink将日志資料發送到該源。通常在把該agent配置為collector的時候,會把source配置為avro源,用來接收上一個源(agent)發送來的資料。

2.2.3、flume-ng配置channel管道

         下圖是flume-ng支援的所有channel類型,在生産環境中,我們主要需要考慮兩個方面,一個是資料傳輸速度,另一個是持久化。這兩方面是一個沖突體,隻能根據我們的實際情況來決定,channel更偏重哪一個方向。是以在這裡講解一下memory和file兩種方式的配置:

Flume詳解1、Flume簡介

      ①基于memory的channel配置

The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.

Flume詳解1、Flume簡介

         配置執行個體:

Flume詳解1、Flume簡介

注意:是實際應用中,主要配置capacity和transactionCapacity這兩個參數。記憶體方式傳輸速度快,但資料沒有持久化,一旦發生異常,則存儲在裡面的資料丢失。但是在實際的應用中,該配置是應用最廣泛的。

      ②基于file的channel配置

File的配置項非常多,但是常用的配置項隻有type、checkpointDir和dataDirs。其中type是必須的,就是聲明管道類型為file;checkpointDir是配置用來存儲checkpoint(在File header裡前8個位元組存儲了版本号,接下來24個位元組是sequeuece no,接下來4個位元組存儲了checkpoint的狀态)的目錄;dataDir就是配置存儲日志檔案的路徑。

Flume詳解1、Flume簡介

注:如果想深入了解filechannel,可以檢視該文章:http://blog.csdn.net/xiaochawan/article/details/8996102

2.2.4、flume-ng配置sink

         Sink就是用來配置日期最終儲存在那裡,在hadoop叢集應用中,sink通常有兩種配置方式比較常用,分别是avro和hdfs。如果配置為avro通常是将日志發送給下一個agent(collector)處理;如果配置為hdfs通常是把日志存儲到hdfs中。

         下圖是sink支援的所有配置類型:

Flume詳解1、Flume簡介

      ① Avro配置詳解

      在前面也講解過了,配置為avro用來中繼日志傳輸。配置執行個體如下:

Flume詳解1、Flume簡介

      注:這裡的hostname和port,不是說該伺服器會監聽該ip位址和端口号,而是将日志發送到的主機和對應的端口号。

      ② Hdfs配置詳解

如果配置為hdfs類型,則是将日志儲存到hdfs檔案系統中。在hdfs配置中,我們需要先了解一些hdfs配置類型中,會自動解析的變量。

Flume詳解1、Flume簡介

         常用的配置執行個體如下:

Flume詳解1、Flume簡介

         注:在hdfs配置中還有這個參數比較重要就是useLocalTimeStamp,預設情況下該變量為false,即系統中所取的timestamp來源于header中。講到這裡,需要強調兩點,一個是host也是從header中提取的,兩一個是預設情況下header中存在timestamp和host兩個參數選項,但都為空,是以我們在hdfs中是擷取到的都是空的,這通常是通過最初的agent在source配置中,添加上interceptors配置解決(下面會講解)。而且如果event中間經過多個agent進行中繼,如果中繼的agent沒有再顯示的配置interceptors重寫header中的資訊,則event中的header資訊就是來源于最初的agent,中繼過程不會對其修改。

2.2.5、flume-ng配置interceptors

         為什麼要配置interceptors,在hdfs配置這一章節中已經詳解說明了原因。其實通過給source添加interceptors配置,可以靈活的傳遞很多有用的資訊。在interceptors配置這一章節中,主要講解timestamp、host和static配置方式,這也是我們生産環境中常用的配置,其他的方式感興趣就去自己看吧。

         下面是flume-ng支援的所有interceptors方式:

Flume詳解1、Flume簡介

      ①Timestamp配置:

通過interceptors的timestamp配置,将time時間插入到event header中,并且timestamp的精确度為millis (普通的timestamp精确度為10位,而這個為13位)。配置參數如下:

Flume詳解1、Flume簡介

         配置執行個體:

Flume詳解1、Flume簡介

         注:預設情況下event header中是存在timestamp的,不過值為空;是以上面的即使配置了也不能改變event header中的timestamp的值,是以還需要添加一個參數,即preserveExisting=true,替換掉原有的timestamp參數。

     ② Host配置

通過配置該項,将agent的hostname或者ip位址插入到event header的host變量中,常用的配置參數如下:

Flume詳解1、Flume簡介

      配置執行個體:

Flume詳解1、Flume簡介

      注意:我們是時間的配置過程中,需要最好顯示的指明是用ip位址還是hostname;還有就是痛timestamp一樣的問題,預設情況下event header的hostname為空,是以隻是簡單的如執行個體中的那種配置,對實際的hostname不會産生任何影響,需要配置preserveExisting=true,替換掉原有的hostname參數。

      ③Static配置

      Static配置是為使用者自定義變量和其值傳入到event header中傳輸。他的配置參數如下:

Flume詳解1、Flume簡介

     配置執行個體:

Flume詳解1、Flume簡介

     注意:在這裡我着重強調的還是preserveExisting問題。為了配置的變量能夠生效,我們最好是配置preserveExisting變量,并設定為true。

3、flume-ng的一個完成配置執行個體

# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1 sink2

# configurate source
tier1.sources.source1.channels = channel1
tier1.sources.source1.type     = exec
tier1.sources.source1.command = tail --follow=name /usr/local/openresty/nginx/logs/logapi/api.log
# configurate channel
tier1.channels.channel1.type   = memory
tier1.channels.channel1.capacity = 500
tier1.channels.channel1.transactionCapacity = 400
# configurate sink
tier1.sinks.sink1.channel      = channel1
tier1.sinks.sink1.type         = hdfs
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.path = /logs/orignal/%Y%m%d/
tier1.sinks.sink1.hdfs.rollInterval = 3600
tier1.sinks.sink1.hdfs.fileType = CompressedStream
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.filePrefix = api-152.%Y%m%d%H%M
tier1.sinks.sink1.hdfs.inUseSuffix = .tmp
tier1.sinks.sink1.hdfs.codeC = gzip
tier1.sinks.sink1.hdfs.round = true
tier1.sinks.sink1.hdfs.roundValue = 5
tier1.sinks.sink1.hdfs.roundUnit = minute

tier1.sinks.sink2.channel      = channel1
tier1.sinks.sink2.type         = avro
tier1.sinks.sink2.hostname = *.*.*.*
tier1.sinks.sink2.port = 9999      

在該執行個體中source、channel、sink和interceptors都存在了。Source為avro類型,即可以接收avro-agent讀取日志檔案發送過來的資料,也可以接受來源于上一個agent通過avro方式發送來的資料;interceptors的配置是為event header中添加timestamp和host,并且如果event需要穿過多個agent,且中繼agent沒有在配置interceptors替換掉event header中的值,則源agent配置的event header的值會最終傳輸到sink中使用;channel配置為memory類型,為了使保證傳輸效率,可以容忍部分日志的丢失;sink配置為hdfs類型,就是将日志儲存到我們指定路徑下的hdfs檔案中,并且每隔1小時對日志拆分,并壓縮儲存的日志。