Source
rpc遠端過程調用協定,客戶機與服務機的調用模式需要對資料進行序列化。
1:客戶機将參數序列化并以二進制形式通過網絡傳輸到伺服器。
2:伺服器接收到後進行反序列化再調用方法擷取傳回值。
3:伺服器将傳回值序列化後再通過網絡傳輸給客戶機。
4:客戶機接收到結果後再進行反序列化擷取結果。
Avro source:
Avro就是一種序列化形式,avrosource監聽一個端口隻接收avro序列化後的資料,其他類型的不接收。
type:avrosource的類型,必須是avro。
bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一台電腦不是說隻有一個IP。有多網卡的電腦,對應多個IP。
port:綁定的本地的端口。
Thrif source:
和avro一樣是一種資料序列化形式,Thrifsource隻采集thrift資料序列化後的資料
Exec source:
采集linux指令的傳回結果傳輸給channel
type:source的類型:必須是exec。
command:要執行指令。
tail –f 若檔案被删除即使重新建立同名檔案也不會監聽
tail -F 隻要檔案同名就可以繼續監聽
以上可以用在日志檔案切割時的監聽
JMS Source:
Java消息服務資料源,Java消息服務是一個與具體平台無關的API,這是支援jms規範的資料源采集;
Spooling Directory Source:通過檔案夾裡的新增的檔案作為資料源的采集;
Kafka Source:從kafka服務中采集資料。
NetCat Source:綁定的端口(tcp、udp),将流經端口的每一個文本行資料作為Event輸入
type:source的類型,必須是netcat。
bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一台電腦不是說隻有一個IP。有多網卡的電腦,對應多個IP。
port:綁定的本地的端口。
HTTP Source:監聽HTTP POST和 GET産生的資料的采集
Chanel
是一個資料存儲池,中間通道,從source中接收資料再向sink目的地傳輸,如果sink寫入失敗會自動重寫是以不會造成資料丢失。
Memory:用記憶體存儲,但伺服器當機會丢失資料。
Typechannel的類型:必須為memory
capacity:channel中的最大event數目
transactionCapacity:channel中允許事務的最大event數目
File:使用檔案存儲資料不會丢失資料但會耗費io。
Typechannel的類型:必須為 file
checkpointDir :檢查點的資料存儲目錄
dataDirs :資料的存儲目錄
transactionCapacity:channel中允許事務的最大event數目
SpillableMemory Channel:記憶體檔案綜合使用,先存入記憶體達到閥值後flush到檔案中。
Typechannel的類型:必須為SPILLABLEMEMORY
memoryCapacity:記憶體的容量event數
overflowCapacity:資料存到檔案的event閥值數
checkpointDir:檢查點的資料存儲目錄
dataDirs:資料的存儲目錄
Jdbc:使用jdbc資料源來存儲資料。
Kafka:使用kafka服務來存儲資料。
Sink
各種類型的目的地,接收channel寫入的資料并以指定的形式表現出來。Sink有很多種類型。
type:sink的類型 必須是hdfs。
hdfs.path:hdfs的上傳路徑。
hdfs.filePrefix:hdfs檔案的字首。預設是:FlumeData
hdfs.rollInterval:間隔多久産生新檔案,預設是:30(秒) 0表示不以時間間隔為準。
hdfs.rollSize:檔案到達多大再産生一個新檔案,預設是:1024(bytes)0表示不以檔案大小為準。
hdfs.rollCount:event達到多大再産生一個新檔案,預設是:10(個)0表示不以event數目為準。
hdfs.batchSize:每次往hdfs裡送出多少個event,預設為100
hdfs.fileType:hdfs檔案的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要設定壓縮方式。
hdfs.codeC:壓縮方式:gzip,bzip2, lzo, lzop, snappy
注:%{host}可以使用header的key。以及%Y%m%d來表示時間,但關于時間的表示需要在header裡有timestamp這個key。
Logger Sink将資料作為日志處理(根據flume中的設定的日志方式來顯示)
要在控制台顯示在運作agent的時候加入:-Dflume.root.logger=INFO,console。
type:sink的類型:必須是logger。
maxBytesToLog:列印body的最長的位元組數 預設為16
Avro Sink:資料被轉換成Avro Event,然後發送到指定的服務端口上。
type:sink的類型:必須是 avro。
hostname:指定發送資料的主機名或者ip
port:指定發送資料的端口
執行個體
1:監聽一個檔案的增加變化,采集資料并在控制台列印。
在這個例子中我使用exec source,memory chanel,logger sink。可以看我的agent結構圖
以下是我建立的exec_source.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F/usr/local/success.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
執行指令:
bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &
然後更改/usr/local/success.log檔案中的内容後可以看到flume采集到了檔案的變化并在控制台上列印出來。檔案初始内容hello和how are you,剩下的i am fine和ok為新增加内容。
2:監控一個檔案變化并将其發送到另一個伺服器上然後列印
這個例子可以建立在上一個例子之上,但是需要對flume的結構做一些修改,我使用avro序列化資料再發送到指定的伺服器上。詳情看結構圖。
實際上flume可以進行多個節點關聯,本例中我隻使用131向139發送資料
131,139上都必須啟動agent
伺服器131配置
以下是我建立的exec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F/usr/local/success.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.79.139
a1.sinks.k1.port=42424
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
執行指令啟動agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
139伺服器配置
執行指令拷貝flume到139
scp -r apache-flume-1.7.0-bin/[email protected]:/usr/local/
修改exec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=42424
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
執行指令啟動agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
結果可以在139控制台上看到131中修改success.log的變化資訊
3:avro-client執行個體
執行bin/flume-ng會提示有指令如下
help display this help text
agent run aFlume agent
avro-client run anavro Flume client
version show Flume version info
avro-clinet是avro用戶端,可以把本地檔案以avro序列化方式序列化後發送到指定的伺服器端口。本例就是将131的一個檔案一次性的發送到139中并列印。
Agent結構圖如下
131啟動的是一個avro-client,它會建立連接配接,發送資料,斷開連接配接,它隻是一個用戶端。
啟動一個avro用戶端
bin/flume-ngavro-client --conf conf/ --host 192.168.79.139 --port 42424 --filename/usr/local/success.log --headerFile /usr/local/kv.log
--headerFile是用來區分是哪個伺服器發送的資料,kv.log中的内容會被發送到139,可以作為辨別來使用。
139的avro_client.conf如下
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=42424
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動agent
bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &
139控制台顯示如下
可以看到headers的内容headers:{hostname=192.168.79.131}
注意:
1:Flume服務沒有stop指令需要通過kill來殺掉進行,可以使用jps -m來确認是那個agent的number
[[email protected] conf]# jps -m
3610 Jps -m
3512 Application --conf-fileconf/exec_source.conf --name a1
2:修改flume的配置檔案後如avro_client.conf,flume會自動重新開機
3:logger sink預設隻顯示16個位元組
4:flume是以event為機關進行資料傳輸的,其中headers是一個map容器map<string,string>
Event: { headers:{hostname=192.168.79.131}body: 31 61 1a }
5:flume支援多節點關聯但是sink和source的類型要一緻,比如avro-client發送資料那麼接收方的source也必須是avro否則會警告。