前提條件:
安裝好hadoop2.7.3(Linux系統下)
安裝好Flume,參考:Flume安裝配置
原理:
Flume資料流模型
題目:
完成通過Avro Source接收外部資料源,資料緩存在memory channel中,然後通過Logger sink将列印出資料,即:
avro source --> memory channel --> logger sink
步驟:
1.進入有權限的目錄,例如~目錄
$ cd ~
2.建立配置檔案avro.conf(關鍵)
$ nano avro.conf
内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 配置sink
a1.sinks.k1.type = logger
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.啟動Flume agent
flume-ng agent --conf ./ --conf-file avro.conf --name a1 -Dflume.root.logger=INFO,console
注意: --conf為配置檔案所在目錄,這裡配置為"./"表示目前目錄; --conf-file表示配置檔案名稱; --name表示 flume代理名稱,其他的為日志級别
4.測試
4.1 打開新的終端(重要)
4.2 建立一個檔案夾testFlume(如果已存在該檔案夾,請跳過mkdir testFlume指令)
$ cd ~
$ mkdir testFlume
4.3 向檔案log.00輸入一些資訊,例如:“hello world”
echo "hello world" > ~/testFlume/log.00
5.使用avro-client發送檔案
flume-ng avro-client -c ./ -H 0.0.0.0 -p 4141 -F testFlume/log.00
注意:-c為conf所在目錄,-H為主機, -p為端口号 -F為要發送檔案所在的路徑
在監聽終端(啟動Flume agent指令的終端)看到log.00的内容“hello world”。
更多的案例:
1. netcat source --> memory channel --> logger sink
nc.conf
# 設定agent
b1.sources = r1
b1.sinks = k1
b1.channels = c1
# 配置source
b1.sources.r1.type = netcat
b1.sources.r1.bind = localhost
b1.sources.r1.port = 44444
# 配置sink
b1.sinks.k1.type = logger
# 配置channel
b1.channels.c1.type = memory
b1.channels.c1.capacity = 1000
b1.channels.c1.transactionCapacity = 100
#将source和sink綁定到channel
b1.sources.r1.channels = c1
b1.sinks.k1.channel = c1
啟動flume
$ flume-ng agent --conf ./ --conf-file nc.conf --name b1 -Dflume.root.logger=INFO,console
啟動另一個終端,執行如下指令
$ telnet localhost 44444
進入監聽狀态後,輸入一些資料,按回車發送資料,在flume終端檢視接收到的資料。
2. exec source --> memory channel --> HDFS sink
exec source表示用執行指令的輸出作為資料源,案例中執行的指令為 tail -F /home/hadoop/1.log
HDFS sink表示将資料發送到HDFS中
hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/hadoop/1.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H%M/%S
#上傳檔案的字首
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動檔案夾
a2.sinks.k2.hdfs.round = true
#多少時間機關建立一個新的檔案夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間機關
a2.sinks.k2.hdfs.roundUnit = minute
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 3
#設定檔案類型,可支援壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的檔案
a2.sinks.k2.hdfs.rollInterval = 600
#設定每個檔案的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#檔案的滾動與 Event 數量無關
a2.sinks.k2.hdfs.rollCount = 0
#最小備援數
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
啟動hdfs
$ start-dfs.sh
啟動flume
$ flume-ng agent --conf ./ --conf-file hdfs.conf --name a2 -Dflume.root.logger=INFO,console
另外啟動一個終端,發送些資料到1.log
$ echo "something different 44" >> 1.log
$ echo "something different 55" >> 1.log
檢視hdfs的内容,注意cat後面的檔案路徑需要從flume終端得到。
$ hdfs dfs -cat /flume/20201116/2326/00/logs-.1605540379735.tmp
看到了每一分鐘會生成一個資料目錄,檔案名的字首為logs- 加上毫秒的時間戳(13位).tmp,檔案内容為剛才一分鐘内給1.log發送的資料。
更多的配置案例,可參考:flume官方文檔
完成! enjoy it!