Apache Flume是什麼
Flume
是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統,通常用于海量資料的實施采集和傳輸,如:海量日志資料。目前主要使用Flume-NG版本。
下面這張圖是Flume的基礎架構圖:
外部資料源以特定格式向 Flume 發送
events
(事件),當
source
接收到
events
時,它将其存儲到一個或多個
channel
,
channe
會一直儲存
events
直到它被
sink
所消費。
sink
的主要功能從
channel
中讀取
events
,并将其存入外部存儲系統或轉發到下一個
source
,成功後再從
channel
中移除
events
。
Flume架構中的幾個主要概念:
- Event:一個資料單元,要傳遞的消息資料。一個
由标題和正文組成:前者是鍵/值映射,後者是任意位元組數組。Evnet
- Client:操作位于源點處的Event,将其發送到Flume
- Agent:一個獨立的Flume程序,包含三個元件
、Source
、Channel
Sink
Source:用來消費傳遞到該元件的Event
Channel:中轉Event的一個臨時存儲,儲存有Source元件傳遞過來的Event
Sink:從Channel中讀取并移除Event,将Event傳遞到Flow Pipeline中的下一個Agent(如果有的話)或将其存入外部存儲系統
Flume的使用模式:
- 單Agent資料流模型
-
多 Agent 串行傳輸資料流模型
為了使資料跨多個代理或躍點流動,前一個代理的接收器和目前節點的源必須為avro類型,接收器指向源的主機名(或IP位址)和端口。
-
收集資料流模型
日志收集中的一種非常常見的情況是,大量的日志生成用戶端将資料發送到連接配接到存儲子系統的幾個消費者代理。例如,從數百台Web伺服器收集的日志發送到許多寫入HDFS群集的代理。
-
多路資料流模型
Flume支援将事件流複用到一個或多個目的地。這是通過定義一種流多路複用器來實作的,該流多路複用器可以将事件複制或選擇性地路由到一個或多個通道。
Apache Flume安裝方式
詳見我的部落格https://blog.csdn.net/qq_35885488/article/details/103071424
Apache Flume簡單使用
需求1:從指定網絡端口采集資料輸出到控制台
參考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
此配置定義了一個名為a1的
Agent
。a1具有偵聽端口44444上的資料的源,在記憶體中緩沖事件資料的通道以及将事件資料記錄到控制台的接收器。配置檔案為各個元件命名,然後描述它們的類型和配置參數。給定的配置檔案可能會定義幾個命名
Agent
。當啟動給定的
Flume
程序時,會傳遞一個标志,告訴它要顯示哪個命名的代理。
配置規則參考:
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#netcat-tcp-source和
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#logger-sink
# Name the components on this agent
# example.conf:單節點Flume配置
#使用Flume關鍵就是配置檔案
#1)配置Source
#2)配置channel
#3)配置Sink
#4)将三個元件連起來
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
然後啟動Agent
flume-ng agent \
--name a1 \
--conf conf \
--conf-file $FLUME_HOME/conf/example.conf
-Dflume.root.logger=INFO,console
接下來新開一個視窗,使用
telnet hadoop000 44444
連接配接本機的44444端口
并發送三條資料
hello flume
OK
hello flink
OK
hello spark
OK
在開啟
Flume
的視窗可以看到采集的資料被列印到控制台了
19/11/15 10:15:00 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 0D hello flume. }
19/11/15 10:15:06 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 69 6E 6B 0D hello flink. }
19/11/15 10:15:10 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 73 70 61 72 6B 0D hello spark. }
過程如下圖所示:
需求2:監控一個檔案實時采集新增的資料輸出到控制台
監控
/home/hadoop/data
下的data.log檔案,如果新增檔案内容,則列印到控制台
具體配置:參考http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#exec-source
在conf中建立檔案
exec-memory-logger.conf
# Name the components on this agent
#exec-memory-logger.conf:單節點Flume配置
#使用Flume關鍵就是配置檔案
#1)配置Source
#2)配置channel
#3)配置Sink
#4)将三個元件連起來
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
#使用tail -F監控的日志目錄
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sources.r1.shell=/bin/sh -c
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動:
flume-ng agent \
--name a1 \
--conf conf \
--conf-file $FLUME_HOME/conf/exec-memory-logger.conf
-Dflume.root.logger=INFO,console
建立一個視窗:新增内容到
data.log
檔案
echo hello spark >> data.log
需求三:将A伺服器的日志實時采集到B伺服器上
整體需求如下圖所示:
技術選型:
exec source+memory channel+avro sink
avro source+memory channel +logger sink
參考
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#exec-source
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#avro-source
配置檔案
exec-memory-avro.conf
和
avro-memory-logger.conf
#exec-memory-avro.conf
# Name the components on this agent
#example.conf:單節點Flume配置
#使用Flume關鍵就是配置檔案
#1)配置Source
#2)配置channel
#3)配置Sink
#4)将三個元件連起來
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
#使用tail -F監控的日志目錄
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell=/bin/sh -c
# 修改為avro
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444
# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
avro-memory-logger.conf
#Name the components on this agent
#example.conf:單節點Flume配置
#使用Flume關鍵就是配置檔案
#1)配置Source
#2)配置channel
#3)配置Sink
#4)将三個元件連起來
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind= hadoop000
avro-memory-logger.sources.avro-source.port=44444
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel
先啟動:
avro-memory-logger
flume-ng agent \
--name avro-memory-logger \
--conf conf \
--conf-file $FLUME_HOME/conf/avro-memory-logger.conf
-Dflume.root.logger=INFO,console
再啟動:
exec-memory-avro
flume-ng agent \
--name exec-memory-avro \
--conf conf \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf
-Dflume.root.logger=INFO,console
過程:
- 機器A上監控一個檔案,當我們通路主站會有使用者行為日志記錄在access.log中
- avro sink把新産生的日志輸出到avro source指定的hostname和port上
- 通過avro source對應的agent将我們的日志輸出到控制台