天天看點

Flume的配置與使用

前提條件:

安裝好hadoop2.7.3(Linux系統下)

安裝好Flume,參考:Flume安裝配置

原理:

Flume資料流模型

Flume的配置與使用

題目:

完成通過Avro Source接收外部資料源,資料緩存在memory channel中,然後通過Logger sink将列印出資料,即:

avro source --> memory channel --> logger sink

步驟:

1.進入有權限的目錄,例如~目錄

$ cd ~
           

2.建立配置檔案avro.conf(關鍵)

$ nano avro.conf
           

内容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 配置sink
a1.sinks.k1.type = logger
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

 3.啟動Flume agent

flume-ng agent --conf ./ --conf-file avro.conf --name a1 -Dflume.root.logger=INFO,console
           

注意: --conf為配置檔案所在目錄,這裡配置為"./"表示目前目錄; --conf-file表示配置檔案名稱; --name表示 flume代理名稱,其他的為日志級别 

4.測試

4.1 打開新的終端(重要)

4.2 建立一個檔案夾testFlume(如果已存在該檔案夾,請跳過mkdir testFlume指令)

$ cd ~
$ mkdir testFlume
           

4.3 向檔案log.00輸入一些資訊,例如:“hello world”

echo "hello world" > ~/testFlume/log.00
           

5.使用avro-client發送檔案

flume-ng  avro-client  -c ./  -H  0.0.0.0  -p 4141 -F  testFlume/log.00
           

注意:-c為conf所在目錄,-H為主機, -p為端口号 -F為要發送檔案所在的路徑

在監聽終端(啟動Flume agent指令的終端)看到log.00的内容“hello world”。

Flume的配置與使用

更多的案例:

1. netcat source --> memory channel --> logger sink

nc.conf
# 設定agent
b1.sources = r1
b1.sinks = k1
b1.channels = c1

# 配置source
b1.sources.r1.type = netcat
b1.sources.r1.bind = localhost
b1.sources.r1.port = 44444

# 配置sink
b1.sinks.k1.type = logger

# 配置channel
b1.channels.c1.type = memory
b1.channels.c1.capacity = 1000
b1.channels.c1.transactionCapacity = 100

#将source和sink綁定到channel
b1.sources.r1.channels = c1
b1.sinks.k1.channel = c1
           

啟動flume

$ flume-ng agent --conf ./ --conf-file nc.conf --name b1 -Dflume.root.logger=INFO,console
           

啟動另一個終端,執行如下指令

$ telnet localhost 44444
           

進入監聽狀态後,輸入一些資料,按回車發送資料,在flume終端檢視接收到的資料。

2. exec source --> memory channel --> HDFS sink

exec source表示用執行指令的輸出作為資料源,案例中執行的指令為 tail -F /home/hadoop/1.log

HDFS sink表示将資料發送到HDFS中

hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/hadoop/1.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H%M/%S
#上傳檔案的字首
a2.sinks.k2.hdfs.filePrefix = logs- 
#是否按照時間滾動檔案夾
a2.sinks.k2.hdfs.round = true
#多少時間機關建立一個新的檔案夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間機關
a2.sinks.k2.hdfs.roundUnit = minute
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 3
#設定檔案類型,可支援壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的檔案
a2.sinks.k2.hdfs.rollInterval = 600 
#設定每個檔案的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#檔案的滾動與 Event 數量無關
a2.sinks.k2.hdfs.rollCount = 0
#最小備援數
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
           

啟動hdfs

$ start-dfs.sh
           

啟動flume

$ flume-ng agent --conf ./ --conf-file hdfs.conf --name a2 -Dflume.root.logger=INFO,console
           

另外啟動一個終端,發送些資料到1.log

$ echo "something different 44" >> 1.log
$ echo "something different 55" >> 1.log
           

檢視hdfs的内容,注意cat後面的檔案路徑需要從flume終端得到。

$ hdfs dfs -cat /flume/20201116/2326/00/logs-.1605540379735.tmp
           

看到了每一分鐘會生成一個資料目錄,檔案名的字首為logs- 加上毫秒的時間戳(13位).tmp,檔案内容為剛才一分鐘内給1.log發送的資料。

更多的配置案例,可參考:flume官方文檔

完成! enjoy it!

繼續閱讀