天天看點

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

5 實戰

使用Flume的核心就在于配置檔案

  • 配置Source
  • 配置Channel
  • 配置Sink
  • 組織在一起

5.1 場景1 - 從指定網絡端口收集資料輸出到控制台

看看官網的第一個

案例
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
      

a1:agent名稱

r1:Source名稱

k1:Sink名稱

c1:Channel名稱

看看其中的

Sources : netcat

類似于netcat的源,它偵聽給定端口并将每行文本轉換為事件。 像nc -k -l [host] [port]這樣的行為。 換句話說,它打開一個指定的端口并偵聽資料。 期望是提供的資料是換行符分隔的文本。 每行文本都轉換為Flume事件,并通過連接配接的通道發送。

必需屬性以粗體顯示。

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

Sinks:logger

在INFO級别記錄事件。 通常用于測試/調試目的。 必需屬性以粗體顯示。 此接收器是唯一的例外,它不需要在“記錄原始資料”部分中說明的額外配置。

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

channel:memor

事件存儲在具有可配置最大大小的記憶體中隊列中。 它非常适用于需要更高吞吐量的流量,并且在代理發生故障時準備丢失分階段資料。 必需屬性以粗體顯示。

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

實戰

建立example.conf配置

在conf目錄下

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

啟動一個agent

使用名為

flume-ng

的shell腳本啟動代理程式,該腳本位于Flume發行版的bin目錄中。 您需要在指令行上指定代理名稱,config目錄和配置檔案:

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
      

回顧指令參數的意義

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰
bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console
      

現在,代理将開始運作在給定屬性檔案中配置的源和接收器。

使用telnet進行測試驗證

注意

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰
telnet 127.0.0.1 44444
      

發送了兩條資料

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

這邊接收到了資料

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

讓我們詳細分析下上圖中的資料資訊

2019-06-12 17:52:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] 
Event: { headers:{} body: 4A 61 76 61 45 64 67 65 0D                      JavaEdge. }
      

其中的Event是Fluem資料傳輸的基本單元

Event = 可選的header + byte array

5.2 場景2 - 監控一個檔案實時采集新增的資料輸出到控制台

Exec Source

Exec源在啟動時運作給定的Unix指令,并期望該程序在标準輸出上連續生成資料(stderr被簡單地丢棄,除非屬性logStdErr設定為true)。 如果程序因任何原因退出,則源也會退出并且不會生成其他資料。 這意味着諸如cat [named pipe]或tail -F [file]之類的配置将産生所需的結果,而日期可能不會 - 前兩個指令産生資料流,而後者産生單個事件并退出

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

Agent 選型

exec source + memory channel + logger sink

配置檔案

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /Volumes/doc/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
      

在conf下建立配置檔案如下:

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

data.log檔案内容

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

成功接收

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

5.3 應用場景3 - 将A伺服器上的日志實時采集到B伺服器

分布式日志收集架構Flume下載下傳安裝與使用(四)5 實戰

技術選型

exec s + memory c + avro s

avro s + memory c + loger s

exec-memory-avro.conf

# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444

# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
      
# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444

# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel