天天看點

Flume入門筆記------架構以及應用介紹

在具體介紹本文内容之前,先給大家看一下Hadoop業務的整體開發流程: 

Flume入門筆記------架構以及應用介紹

從Hadoop的業務開發流程圖中可以看出,在大資料的業務處理過程中,對于資料的采集是十分重要的一步,也是不可避免的一步,進而引出我們本文的主角—Flume。本文将圍繞Flume的架構、Flume的應用(日志采集)進行詳細的介紹。 

(一)Flume架構介紹 

1、Flume的概念 

Flume入門筆記------架構以及應用介紹

flume是分布式的日志收集系統,它将各個伺服器中的資料收集起來并送到指定的地方去,比如說送到圖中的HDFS,簡單來說flume就是收集日志的。 

2、Event的概念 

在這裡有必要先介紹一下flume中event的相關概念:flume的核心是把資料從資料源(source)收集過來,在将收集到的資料送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存資料(channel),待資料真正到達目的地(sink)後,flume在删除自己緩存的資料。 

在整個資料的傳輸的過程中,流動的是event,即事務保證是在event級别進行的。那麼什麼是event呢?—–event将傳輸的資料進行封裝,是flume傳輸資料的基本機關,如果是文本檔案,通常是一行記錄,event也是事務的基本機關。event從source,流向channel,再到sink,本身為一個位元組數組,并可攜帶headers(頭資訊)資訊。event代表着一個資料的最小完整單元,從外部資料源來,向外部的目的地去。 

為了友善大家了解,給出一張event的資料流向圖: 

Flume入門筆記------架構以及應用介紹

一個完整的event包括:event headers、event body、event資訊(即文本檔案中的單行記錄),如下是以: 

Flume入門筆記------架構以及應用介紹

其中event資訊就是flume收集到的日記記錄。 

3、flume架構介紹 

flume之是以這麼神奇,是源于它自身的一個設計,這個設計就是agent,agent本身是一個Java程序,運作在日志收集節點—所謂日志收集節點就是伺服器節點。 

agent裡面包含3個核心的元件:source—->channel—–>sink,類似生産者、倉庫、消費者的架構。 

source:source元件是專門用來收集資料的,可以處理各種類型、各種格式的日志資料,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。 

channel:source元件把資料收集來以後,臨時存放在channel中,即channel元件在agent中是專門用來存放臨時資料的——對采集到的資料進行簡單的緩存,可以存放在memory、jdbc、file等等。 

sink:sink元件是用于把資料發送到目的地的元件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。 

4、flume的運作機制 

flume的核心就是一個agent,這個agent對外有兩個進行互動的地方,一個是接受資料的輸入——source,一個是資料的輸出sink,sink負責将資料發送到外部指定的目的地。source接收到資料之後,将資料發送給channel,chanel作為一個資料緩沖區會臨時存放這些資料,随後sink會将channel中的資料發送到指定的地方—-例如HDFS等,注意:隻有在sink将channel中的資料成功發送出去之後,channel才會将臨時資料進行删除,這種機制保證了資料傳輸的可靠性與安全性。 

5、flume的廣義用法 

flume之是以這麼神奇—-其原因也在于flume可以支援多級flume的agent,即flume可以前後相繼,例如sink可以将資料寫到下一個agent的source中,這樣的話就可以連成串了,可以整體處理了。flume還支援扇入(fan-in)、扇出(fan-out)。所謂扇入就是source可以接受多個輸入,所謂扇出就是sink可以将資料輸出多個目的地destination中。 

Flume入門筆記------架構以及應用介紹

(二)flume應用—日志采集 

對于flume的原理其實很容易了解,我們更應該掌握flume的具體使用方法,flume提供了大量内置的Source、Channel和Sink類型。而且不同類型的Source、Channel和Sink可以自由組合—–組合方式基于使用者設定的配置檔案,非常靈活。比如:Channel可以把事件暫存在記憶體裡,也可以持久化到本地硬碟上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。下面我将用具體的案例詳述flume的具體用法。 

其實flume的用法很簡單—-書寫一個配置檔案,在配置檔案當中描述source、channel與sink的具體實作,而後運作一個agent執行個體,在運作agent執行個體的過程中會讀取配置檔案的内容,這樣flume就會采集到資料。 

配置檔案的編寫原則: 

1>從整體上描述代理agent中sources、sinks、channels所涉及到的元件

# Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1           
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

2>較長的描述agent中每一個source、sink與channel的具體實作:即在描述source的時候,需要 

指定source到底是什麼類型的,即這個source是接受檔案的、還是接受http的、還是接受thrift 

的;對于sink也是同理,需要指定結果是輸出到HDFS中,還是Hbase中啊等等;對于channel 

需要指定是記憶體啊,還是資料庫啊,還是檔案啊等等。

# Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 
    a1.channels.c1.transactionCapacity =            
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3>通過channel将source與sink連接配接起來

# Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1           
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

啟動agent的shell操作:

flume-ng  agent -n a1  -c  ../conf   -f  ../conf/example.file  
    -Dflume.root.logger=DEBUG,console             
  • 1
  • 2
  • 1
  • 2

參數說明: -n 指定agent名稱(與配置檔案中代理的名字相同) 

-c 指定flume中配置檔案的目錄 

-f 指定配置檔案 

-Dflume.root.logger=DEBUG,console 設定日志等級

具體案例: 

案例1: NetCat Source:監聽一個指定的網絡端口,即隻要應用程式向這個端口裡面寫資料,這個source元件就可以擷取到資訊。 其中 Sink:logger Channel:memory 

flume官網中NetCat Source描述:

Property Name Default     Description
channels       –     
type           –     The component type name, needs to be netcat
bind           –  日志需要發送到的主機名或者Ip位址,該主機運作着netcat類型的source在監聽          
port           –  日志需要發送到的端口号,該端口号要有netcat類型的source在監聽                 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

a) 編寫配置檔案:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 
a1.sources.r1.port = 

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 
a1.channels.c1.transactionCapacity = 

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

b) 啟動flume agent a1 服務端

flume-ng  agent -n a1  -c ../conf  -f ../conf/netcat.conf   -Dflume.root.logger=DEBUG,console           
  • 1
  • 1

c) 使用telnet發送資料

telnet      big data world!(windows中運作的)           
  • 1
  • 1

d) 在控制台上檢視flume收集到的日志資料: 

Flume入門筆記------架構以及應用介紹

案例2:NetCat Source:監聽一個指定的網絡端口,即隻要應用程式向這個端口裡面寫資料,這個source元件就可以擷取到資訊。 其中 Sink:hdfs Channel:file (相比于案例1的兩個變化) 

flume官網中HDFS Sink的描述: 

Flume入門筆記------架構以及應用介紹

a) 編寫配置檔案:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 
a1.sources.r1.port = 

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 
a1.sinks.k1.hdfs.rollSize = 
a1.sinks.k1.hdfs.rollCount = 
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

b) 啟動flume agent a1 服務端

flume-ng  agent -n a1  -c ../conf  -f ../conf/netcat.conf   -Dflume.root.logger=DEBUG,console           
  • 1
  • 1

c) 使用telnet發送資料

telnet      big data world!(windows中運作的)           
  • 1
  • 1

d) 在HDFS中檢視flume收集到的日志資料: 

Flume入門筆記------架構以及應用介紹

案例3:Spooling Directory Source:監聽一個指定的目錄,即隻要應用程式向這個指定的目錄中添加新的檔案,source元件就可以擷取到該資訊,并解析該檔案的内容,然後寫入到channle。寫入完成後,标記該檔案已完成或者删除該檔案。其中 Sink:logger Channel:memory 

flume官網中Spooling Directory Source描述:

Property Name       Default      Description
channels              –  
type                  –          The component type name, needs to be spooldir.
spoolDir              –          Spooling Directory Source監聽的目錄
fileSuffix         .COMPLETED    檔案内容寫入到channel之後,标記該檔案
deletePolicy       never         檔案内容寫入到channel之後的删除政策: never or immediate
fileHeader         false         Whether to add a header storing the absolute path filename.
ignorePattern      ^$           Regular expression specifying which files to ignore (skip)
interceptors          –          指定傳輸中event的head(頭資訊),常用timestamp           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Spooling Directory Source的兩個注意事項:

①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
即:拷貝到spool目錄下的檔案不可以再打開編輯
②If a file name is reused at a later time, Flume will print an error to its log file and stop processing.
即:不能将具有相同檔案名字的檔案拷貝到這個目錄下           
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

a) 編寫配置檔案:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datainput
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 
a1.channels.c1.transactionCapacity = 

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

b) 啟動flume agent a1 服務端

flume-ng  agent -n a1  -c ../conf  -f ../conf/spool.conf   -Dflume.root.logger=DEBUG,console           
  • 1
  • 1

c) 使用cp指令向Spooling Directory 中發送資料

cp datafile  /usr/local/datainput   (注:datafile中的内容為:big data world!)           
  • 1
  • 1

d) 在控制台上檢視flume收集到的日志資料: 

Flume入門筆記------架構以及應用介紹

從控制台顯示的結果可以看出event的頭資訊中包含了時間戳資訊。 

同時我們檢視一下Spooling Directory中的datafile資訊—-檔案内容寫入到channel之後,該檔案被标記了:

[root@hadoop80 datainput]# ls
datafile.COMPLETED           
  • 1
  • 2
  • 1
  • 2

案例4:Spooling Directory Source:監聽一個指定的目錄,即隻要應用程式向這個指定的目錄中添加新的檔案,source元件就可以擷取到該資訊,并解析該檔案的内容,然後寫入到channle。寫入完成後,标記該檔案已完成或者删除該檔案。 其中 Sink:hdfs Channel:file (相比于案例3的兩個變化)

a) 編寫配置檔案:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datainput
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 
a1.sinks.k1.hdfs.rollSize = 
a1.sinks.k1.hdfs.rollCount = 
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

b) 啟動flume agent a1 服務端

flume-ng  agent -n a1  -c ../conf  -f ../conf/spool.conf   -Dflume.root.logger=DEBUG,console           
  • 1
  • 1

c) 使用cp指令向Spooling Directory 中發送資料

cp datafile  /usr/local/datainput   (注:datafile中的内容為:big data world!)           
  • 1
  • 1

d) 在控制台上可以參看sink的運作進度日志: 

Flume入門筆記------架構以及應用介紹

d) 在HDFS中檢視flume收集到的日志資料: 

Flume入門筆記------架構以及應用介紹
Flume入門筆記------架構以及應用介紹

從案例1與案例2、案例3與案例4的對比中我們可以發現:flume的配置檔案在編寫的過程中是非常靈活的。

案例5:Exec Source:監聽一個指定的指令,擷取一條指令的結果作為它的資料源 

常用的是tail -F file指令,即隻要應用程式向日志(檔案)裡面寫資料,source元件就可以擷取到日志(檔案)中最新的内容 。 其中 Sink:hdfs Channel:file 

這個案列為了友善顯示Exec Source的運作效果,結合Hive中的external table進行來說明。

a) 編寫配置檔案:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/log.file

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 
a1.sinks.k1.hdfs.rollSize = 
a1.sinks.k1.hdfs.rollCount = 
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

b)在hive中建立外部表—–hdfs://hadoop80:9000/dataoutput的目錄,友善檢視日志捕獲内容

hive> create external table t1(infor  string)
    > row format delimited
    > fields terminated by '\t'
    > location '/dataoutput/';
OK
Time taken:  seconds           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

c) 啟動flume agent a1 服務端

flume-ng  agent -n a1  -c ../conf  -f ../conf/exec.conf   -Dflume.root.logger=DEBUG,console           
  • 1
  • 1

d) 使用echo指令向/usr/local/datainput 中發送資料

echo  big data > log.file           
  • 1
  • 1

d) 在HDFS和Hive分别中檢視flume收集到的日志資料: 

Flume入門筆記------架構以及應用介紹
hive> select * from t1;
OK
big data
Time taken:  seconds           
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

e)使用echo指令向/usr/local/datainput 中在追加一條資料

echo big data world! >> log.file           
  • 1
  • 1

d) 在HDFS和Hive再次分别中檢視flume收集到的日志資料: 

Flume入門筆記------架構以及應用介紹
Flume入門筆記------架構以及應用介紹
hive> select * from t1;
OK
big data
big data world!
Time taken:  seconds           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

總結Exec source:Exec source和Spooling Directory Source是兩種常用的日志采集的方式,其中Exec source可以實作對日志的實時采集,Spooling Directory Source在對日志的實時采集上稍有欠缺,盡管Exec source可以實作對日志的實時采集,但是當Flume不運作或者指令執行出錯時,Exec source将無法收集到日志資料,日志會出現丢失,進而無法保證收集日志的完整性。

案例6:Avro Source:監聽一個指定的Avro 端口,通過Avro 端口可以擷取到Avro client發送過來的檔案 。即隻要應用程式通過Avro 端口發送檔案,source元件就可以擷取到該檔案中的内容。 其中 Sink:hdfs Channel:file 

(注:Avro和Thrift都是一些序列化的網絡端口–通過這些網絡端口可以接受或者發送資訊,Avro可以發送一個給定的檔案給Flume,Avro 源使用AVRO RPC機制) 

Avro Source運作原理如下圖: 

Flume入門筆記------架構以及應用介紹

flume官網中Avro Source的描述:

Property     Name   Default Description
channels      –  
type          –     The component type name, needs to be avro
bind          –     日志需要發送到的主機名或者ip,該主機運作着ARVO類型的source
port          –     日志需要發送到的端口号,該端口要有ARVO類型的source在監聽           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

1)編寫配置檔案

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 
a1.sources.r1.port = 

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 
a1.sinks.k1.hdfs.rollSize = 
a1.sinks.k1.hdfs.rollCount = 
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

b) 啟動flume agent a1 服務端

flume-ng  agent -n a1  -c ../conf  -f ../conf/avro.conf   -Dflume.root.logger=DEBUG,console           
  • 1
  • 1

c)使用avro-client發送檔案

flume-ng avro-client -c  ../conf  -H   -p  -F /usr/local/log.file           
  • 1
  • 1

注:log.file檔案中的内容為:

[[email protected] local]# more log.file
big data
big data world!           
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

d) 在HDFS中檢視flume收集到的日志資料: 

Flume入門筆記------架構以及應用介紹
Flume入門筆記------架構以及應用介紹
Flume入門筆記------架構以及應用介紹

通過上面的幾個案例,我們可以發現:flume配置檔案的書寫是相當靈活的—-不同類型的Source、Channel和Sink可以自由組合!

最後對上面用的幾個flume source進行适當總結: 

① NetCat Source:監聽一個指定的網絡端口,即隻要應用程式向這個端口裡面寫資料,這個source元件 

就可以擷取到資訊。 

②Spooling Directory Source:監聽一個指定的目錄,即隻要應用程式向這個指定的目錄中添加新的文 

件,source元件就可以擷取到該資訊,并解析該檔案的内容,然後寫入到channle。寫入完成後,标記 

該檔案已完成或者删除該檔案。 

③Exec Source:監聽一個指定的指令,擷取一條指令的結果作為它的資料源 

常用的是tail -F file指令,即隻要應用程式向日志(檔案)裡面寫資料,source元件就可以擷取到日志(檔案)中最新的内容 。 

④Avro Source:監聽一個指定的Avro 端口,通過Avro 端口可以擷取到Avro client發送過來的檔案 。即隻要應用程式通過Avro 端口發送檔案,source元件就可以擷取到該檔案中的内容。

如有問題,歡迎留言指正!

轉載:原文位址http://blog.csdn.net/a2011480169/article/details/51544664