天天看點

Flume筆記二之source,channel,sink

Source

rpc遠端過程調用協定,客戶機與服務機的調用模式需要對資料進行序列化。

         1:客戶機将參數序列化并以二進制形式通過網絡傳輸到伺服器。

         2:伺服器接收到後進行反序列化再調用方法擷取傳回值。

         3:伺服器将傳回值序列化後再通過網絡傳輸給客戶機。

         4:客戶機接收到結果後再進行反序列化擷取結果。

Avro source:

         Avro就是一種序列化形式,avrosource監聽一個端口隻接收avro序列化後的資料,其他類型的不接收。

         type:avrosource的類型,必須是avro。

         bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一台電腦不是說隻有一個IP。有多網卡的電腦,對應多個IP。

         port:綁定的本地的端口。

Thrif source:

         和avro一樣是一種資料序列化形式,Thrifsource隻采集thrift資料序列化後的資料

Exec source:

         采集linux指令的傳回結果傳輸給channel

         type:source的類型:必須是exec。

        command:要執行指令。

        tail  –f  若檔案被删除即使重新建立同名檔案也不會監聽

        tail  -F  隻要檔案同名就可以繼續監聽

         以上可以用在日志檔案切割時的監聽

JMS Source:

        Java消息服務資料源,Java消息服務是一個與具體平台無關的API,這是支援jms規範的資料源采集;

Spooling Directory Source:通過檔案夾裡的新增的檔案作為資料源的采集;

Kafka Source:從kafka服務中采集資料。

NetCat Source:綁定的端口(tcp、udp),将流經端口的每一個文本行資料作為Event輸入

        type:source的類型,必須是netcat。

       bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一台電腦不是說隻有一個IP。有多網卡的電腦,對應多個IP。

       port:綁定的本地的端口。

HTTP Source:監聽HTTP POST和 GET産生的資料的采集

Chanel

         是一個資料存儲池,中間通道,從source中接收資料再向sink目的地傳輸,如果sink寫入失敗會自動重寫是以不會造成資料丢失。

         Memory:用記憶體存儲,但伺服器當機會丢失資料。

                 Typechannel的類型:必須為memory

                 capacity:channel中的最大event數目

                 transactionCapacity:channel中允許事務的最大event數目

         File:使用檔案存儲資料不會丢失資料但會耗費io。

                 Typechannel的類型:必須為 file

                 checkpointDir :檢查點的資料存儲目錄

                 dataDirs :資料的存儲目錄

                 transactionCapacity:channel中允許事務的最大event數目

         SpillableMemory Channel:記憶體檔案綜合使用,先存入記憶體達到閥值後flush到檔案中。

                Typechannel的類型:必須為SPILLABLEMEMORY

                memoryCapacity:記憶體的容量event數

                overflowCapacity:資料存到檔案的event閥值數

                checkpointDir:檢查點的資料存儲目錄

                dataDirs:資料的存儲目錄

         Jdbc:使用jdbc資料源來存儲資料。

         Kafka:使用kafka服務來存儲資料。

Sink

         各種類型的目的地,接收channel寫入的資料并以指定的形式表現出來。Sink有很多種類型。

                 type:sink的類型 必須是hdfs。

                 hdfs.path:hdfs的上傳路徑。

                 hdfs.filePrefix:hdfs檔案的字首。預設是:FlumeData

                 hdfs.rollInterval:間隔多久産生新檔案,預設是:30(秒) 0表示不以時間間隔為準。

                 hdfs.rollSize:檔案到達多大再産生一個新檔案,預設是:1024(bytes)0表示不以檔案大小為準。

                 hdfs.rollCount:event達到多大再産生一個新檔案,預設是:10(個)0表示不以event數目為準。

                 hdfs.batchSize:每次往hdfs裡送出多少個event,預設為100

                 hdfs.fileType:hdfs檔案的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要設定壓縮方式。

                 hdfs.codeC:壓縮方式:gzip,bzip2, lzo, lzop, snappy

                 注:%{host}可以使用header的key。以及%Y%m%d來表示時間,但關于時間的表示需要在header裡有timestamp這個key。

        Logger Sink将資料作為日志處理(根據flume中的設定的日志方式來顯示)

                 要在控制台顯示在運作agent的時候加入:-Dflume.root.logger=INFO,console。

                 type:sink的類型:必須是logger。

                 maxBytesToLog:列印body的最長的位元組數 預設為16

        Avro Sink:資料被轉換成Avro Event,然後發送到指定的服務端口上。

                 type:sink的類型:必須是 avro。

                 hostname:指定發送資料的主機名或者ip

                 port:指定發送資料的端口

執行個體

1:監聽一個檔案的增加變化,采集資料并在控制台列印。

        在這個例子中我使用exec source,memory chanel,logger sink。可以看我的agent結構圖

Flume筆記二之source,channel,sink

以下是我建立的exec_source.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=exec 

a1.sources.r1.command=tail -F/usr/local/success.log

  a1.channels.c1.type=memory 

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

執行指令:

bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &

然後更改/usr/local/success.log檔案中的内容後可以看到flume采集到了檔案的變化并在控制台上列印出來。檔案初始内容hello和how are you,剩下的i am fine和ok為新增加内容。

Flume筆記二之source,channel,sink
2:監控一個檔案變化并将其發送到另一個伺服器上然後列印

這個例子可以建立在上一個例子之上,但是需要對flume的結構做一些修改,我使用avro序列化資料再發送到指定的伺服器上。詳情看結構圖。

Flume筆記二之source,channel,sink

實際上flume可以進行多個節點關聯,本例中我隻使用131向139發送資料

131,139上都必須啟動agent

伺服器131配置

以下是我建立的exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=exec 

a1.sources.r1.command=tail -F/usr/local/success.log

a1.channels.c1.type=memory 

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.79.139

a1.sinks.k1.port=42424

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

執行指令啟動agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

139伺服器配置

執行指令拷貝flume到139

scp -r apache-flume-1.7.0-bin/[email protected]:/usr/local/

修改exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=avro 

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424

  a1.channels.c1.type=memory 

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

執行指令啟動agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

結果可以在139控制台上看到131中修改success.log的變化資訊

Flume筆記二之source,channel,sink
3:avro-client執行個體

執行bin/flume-ng會提示有指令如下

help                     display this help text

agent                     run aFlume agent

avro-client               run anavro Flume client

version                   show Flume version info

avro-clinet是avro用戶端,可以把本地檔案以avro序列化方式序列化後發送到指定的伺服器端口。本例就是将131的一個檔案一次性的發送到139中并列印。

Agent結構圖如下

Flume筆記二之source,channel,sink

131啟動的是一個avro-client,它會建立連接配接,發送資料,斷開連接配接,它隻是一個用戶端。

啟動一個avro用戶端

bin/flume-ngavro-client --conf conf/ --host 192.168.79.139 --port 42424 --filename/usr/local/success.log --headerFile /usr/local/kv.log

--headerFile是用來區分是哪個伺服器發送的資料,kv.log中的内容會被發送到139,可以作為辨別來使用。

139的avro_client.conf如下

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=avro 

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424

a1.channels.c1.type=memory 

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

啟動agent

bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &

139控制台顯示如下

Flume筆記二之source,channel,sink

可以看到headers的内容headers:{hostname=192.168.79.131}

注意:

1:Flume服務沒有stop指令需要通過kill來殺掉進行,可以使用jps  -m來确認是那個agent的number

[[email protected] conf]# jps -m

3610 Jps -m

3512 Application --conf-fileconf/exec_source.conf --name a1

2:修改flume的配置檔案後如avro_client.conf,flume會自動重新開機

3:logger sink預設隻顯示16個位元組

4:flume是以event為機關進行資料傳輸的,其中headers是一個map容器map<string,string>

Event: { headers:{hostname=192.168.79.131}body: 31 61                                           1a }

5:flume支援多節點關聯但是sink和source的類型要一緻,比如avro-client發送資料那麼接收方的source也必須是avro否則會警告。