5 實戰
使用Flume的核心就在于配置檔案
- 配置Source
- 配置Channel
- 配置Sink
- 組織在一起
5.1 場景1 - 從指定網絡端口收集資料輸出到控制台
看看官網的第一個
案例# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1:agent名稱
r1:Source名稱
k1:Sink名稱
c1:Channel名稱
看看其中的
Sources : netcat
類似于netcat的源,它偵聽給定端口并将每行文本轉換為事件。 像nc -k -l [host] [port]這樣的行為。 換句話說,它打開一個指定的端口并偵聽資料。 期望是提供的資料是換行符分隔的文本。 每行文本都轉換為Flume事件,并通過連接配接的通道發送。
必需屬性以粗體顯示。

Sinks:logger
在INFO級别記錄事件。 通常用于測試/調試目的。 必需屬性以粗體顯示。 此接收器是唯一的例外,它不需要在“記錄原始資料”部分中說明的額外配置。
channel:memor
事件存儲在具有可配置最大大小的記憶體中隊列中。 它非常适用于需要更高吞吐量的流量,并且在代理發生故障時準備丢失分階段資料。 必需屬性以粗體顯示。
實戰
建立example.conf配置
在conf目錄下
啟動一個agent
使用名為
flume-ng
的shell腳本啟動代理程式,該腳本位于Flume發行版的bin目錄中。 您需要在指令行上指定代理名稱,config目錄和配置檔案:
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
回顧指令參數的意義
bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console
現在,代理将開始運作在給定屬性檔案中配置的源和接收器。
使用telnet進行測試驗證
注意
telnet 127.0.0.1 44444
發送了兩條資料
這邊接收到了資料
讓我們詳細分析下上圖中的資料資訊
2019-06-12 17:52:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]
Event: { headers:{} body: 4A 61 76 61 45 64 67 65 0D JavaEdge. }
其中的Event是Fluem資料傳輸的基本單元
Event = 可選的header + byte array
5.2 場景2 - 監控一個檔案實時采集新增的資料輸出到控制台
Exec Source
Exec源在啟動時運作給定的Unix指令,并期望該程序在标準輸出上連續生成資料(stderr被簡單地丢棄,除非屬性logStdErr設定為true)。 如果程序因任何原因退出,則源也會退出并且不會生成其他資料。 這意味着諸如cat [named pipe]或tail -F [file]之類的配置将産生所需的結果,而日期可能不會 - 前兩個指令産生資料流,而後者産生單個事件并退出
Agent 選型
exec source + memory channel + logger sink
配置檔案
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /Volumes/doc/data/data.log
a1.sources.r1.shell = /bin/sh -c
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在conf下建立配置檔案如下:
data.log檔案内容
成功接收
5.3 應用場景3 - 将A伺服器上的日志實時采集到B伺服器
技術選型
exec s + memory c + avro s
avro s + memory c + loger s
exec-memory-avro.conf
# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444
# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444
# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel