天天看點

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

Flume簡介及Flume部署、原理和使用介紹

Flume概述

​ Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統。Flume基于流式架構,靈活簡單。

​ Flume最主要的作用就是,實時讀取伺服器本地磁盤的資料,将資料寫入到HDFS。

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

Flume架構

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

Agent

Agent是一個JVM程序,它以事件的形式将資料從源頭送至目的。

Agent主要有3個部分組成,Source、Channel、Sink。

Source

Source是負責接收資料到Flume Agent的元件。Source元件可以處理各種類型、各種格式的日志資料,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

Channel

Channel是位于Source和Sink之間的緩沖區。是以,Channel允許Source和Sink運作在不同的速率上。Channel是線程安全的,可以同時處理幾個Source的寫入操作和幾個Sink的讀取操作。

Flume自帶兩種Channel:Memory Channel和File Channel。

Memory Channel是記憶體中的隊列。Memory Channel在不需要關心資料丢失的情景下适用。如果需要關心資料丢失,那麼Memory Channel就不應該使用,因為程式死亡、機器當機或者重新開機都會導緻資料丢失。

File Channel将所有事件寫到磁盤。是以在程式關閉或機器當機的情況下不會丢失資料。

Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們,并将這些事件批量寫入到存儲或索引系統、或者被發送到另一個Flume Agent。

Sink元件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定義。

Event

傳輸單元,Flume資料傳輸的基本單元,以Event的形式将資料從源頭送至目的地。Event由Header和Body兩部分組成,Header用來存放該event的一些屬性,為K-V結構,Body用來存放該條資料,形式為位元組數組。

​ Header(k=v)

​ Body(byte array)

Flume安裝部署

URL連結

(1) Flume官網位址:http://flume.apache.org/

(2)文檔檢視位址:http://flume.apache.org/FlumeUserGuide.html

(3)下載下傳位址:http://archive.apache.org/dist/flume/

安裝部署

# 下載下傳安裝包
[email protected]:/home/wangting >
[email protected]:/home/wangting >cd /opt/software/
[email protected]:/opt/software >wget http://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
[email protected]:/opt/software >ll | grep flume
-rw-r--r-- 1 wangting     wangting      67938106 Apr 17 14:09 apache-flume-1.9.0-bin.tar.gz
# 解壓apache-flume-1.9.0-bin.tar.gz到/opt/module/目錄下
[email protected]:/opt/software >tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
# 将目錄名字mv改名,精簡目錄
[email protected]:/opt/software >mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
[email protected]:/opt/software >cd /opt/module/flume/
# 目錄結構 [少部分目錄是後續任務生成的,例如datas logs等,不必在意]
[email protected]:/opt/module/flume >ll
total 180
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:14 bin
-rw-rw-r--  1 wangting wangting 85602 Nov 29  2018 CHANGELOG
drwxr-xr-x  2 wangting wangting  4096 Apr 17 16:26 conf
drwxrwxr-x  2 wangting wangting  4096 Apr 17 15:58 datas
-rw-r--r--  1 wangting wangting  5681 Nov 16  2017 DEVNOTES
-rw-r--r--  1 wangting wangting  2873 Nov 16  2017 doap_Flume.rdf
drwxrwxr-x 12 wangting wangting  4096 Dec 18  2018 docs
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:15 lib
-rw-rw-r--  1 wangting wangting 43405 Dec 10  2018 LICENSE
drwxrwxr-x  2 wangting wangting  4096 Apr 17 16:28 logs
-rw-r--r--  1 wangting wangting   249 Nov 29  2018 NOTICE
-rw-r--r--  1 wangting wangting  2483 Nov 16  2017 README.md
-rw-rw-r--  1 wangting wangting  1958 Dec 10  2018 RELEASE-NOTES
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:14 tools
# 将lib檔案夾下的guava-11.0.2.jar删除以相容Hadoop 3.1.3
[email protected]:/opt/module/flume >rm /opt/module/flume/lib/guava-11.0.2.jar
# 配置環境變量 [增加如下内容]
[email protected]:/opt/module/flume >sudo vim /etc/profile

#flume
export FLUME_HOME=/opt/module/flume
export PATH=$PATH:$FLUME_HOME/bin
[email protected]:/opt/module/flume >
[email protected]:/opt/module/flume >
# 引用/etc/profile生效
[email protected]:/opt/module/flume >source /etc/profile
# 驗證flume-ng指令是否可用
[email protected]:/opt/module/flume >flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
           

Flume使用案例1

場景:監控端口資料官方案例

背景需求:

使用Flume監聽一個端口,收集該端口資料,并列印到控制台

  1. 通過編寫Flume配置檔案,定義一個agent任務來持續監聽44444端口
  2. 通過netcat工具向端口44444發送文本資料,nc ip port [這裡的工具僅僅是為了模拟一個應用吐資料]
  3. netcat向flume監聽的44444推送資料,來模拟業務場景實時資料推送的日志或資料
  4. Flume通過source元件讀取44444端口資料
  5. Flume将擷取的資料最終通過Sink寫到控制台
    Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

準備工作,編寫配置

# # # 準備工作 # # #
[email protected]:/home/wangting >sudo yum install -y nc
[email protected]:/home/wangting >
# 判斷44444端口是否被占用
[email protected]:/home/wangting >sudo netstat -tunlp | grep 44444
[email protected]:/home/wangting >cd /opt/module/flume/
# 建立目錄存放定義檔案
[email protected]:/opt/module/flume >mkdir datas
[email protected]:/opt/module/flume >cd datas/
# 在datas檔案夾下建立netcatsource_loggersink.conf
[email protected]:/opt/module/flume/datas >touch netcatsource_loggersink.conf
[email protected]:/opt/module/flume/datas >ls
netcatsource_loggersink.conf
[email protected]:/opt/module/flume/datas >vim netcatsource_loggersink.conf 
#bigdata是agent的名字
#定義的source,channel,sink的個數可以是多個,中間用空格隔開

#定義source
bigdata.sources = r1
#定義channel
bigdata.channels = c1
#定義sink
bigdata.sinks = k1

#聲明source具體的類型和對應的一些配置
bigdata.sources.r1.type = netcat
bigdata.sources.r1.bind = ops01
bigdata.sources.r1.port = 44444

#聲明channel具體的類型和對應的一些配置
bigdata.channels.c1.type = memory
#channel中event的數量
bigdata.channels.c1.capacity = 1000

#聲明sink具體的類型和對應的一些配置
bigdata.sinks.k1.type = logger

#聲明source,sink和channel之間的關系
bigdata.sources.r1.channels = c1
#一個sink隻能對應一個channel,一個channel可以對應多個sink
bigdata.sinks.k1.channel = c1

【注意】: ops01已經在/etc/hosts檔案中作了IP解析 11.8.37.50 ops01
           

啟動agent模拟傳輸

# 啟動agent
[email protected]:/opt/module/flume >cd /opt/module/flume
[email protected]:/opt/module/flume >flume-ng agent --name bigdata --conf conf/ --conf-file datas/netcatsource_loggersink.conf -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata --conf-file datas/netcatsource_loggersink.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-04-22 16:51:44,314 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2021-04-22 16:51:44,320 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:138)] Reloading configuration file:datas/netcatsource_loggersink.conf
2021-04-22 16:51:44,326 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-22 16:51:44,327 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-22 16:51:44,328 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1
2021-04-22 16:51:44,328 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1
2021-04-22 16:51:44,328 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1
2021-04-22 16:51:44,328 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1
2021-04-22 16:51:44,328 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-22 16:51:44,328 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-22 16:51:44,329 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1117)] Added sinks: k1 Agent: bigdata
2021-04-22 16:51:44,329 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'bigdata' has no configfilters.
2021-04-22 16:51:44,349 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:163)] Post-validation flume configuration contains configuration for agents: [bigdata]
2021-04-22 16:51:44,349 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:151)] Creating channels
2021-04-22 16:51:44,356 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
2021-04-22 16:51:44,363 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1
2021-04-22 16:51:44,367 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type netcat
2021-04-22 16:51:44,374 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2021-04-22 16:51:44,377 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:120)] Channel c1 connected to [r1, k1]
2021-04-22 16:51:44,380 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:162)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:o[email protected] counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2021-04-22 16:51:44,382 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2021-04-22 16:51:44,442 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2021-04-22 16:51:44,442 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2021-04-22 16:51:44,442 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2021-04-22 16:51:44,443 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2021-04-22 16:51:44,443 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
2021-04-22 16:51:44,456 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/11.8.37.50:44444]
           

場景實驗

另起一個會話視窗

# 檢視44444端口服務狀态
[email protected]:/home/wangting >netstat -tnlpu|grep 44444
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 11.8.37.50:44444        :::*                    LISTEN      53791/java    
# 44444端口對應程序pid 53791 ,可以看到是flume的程序
[email protected]:/home/wangting >ll /proc/53791 | grep cwd
lrwxrwxrwx  1 wangting wangting 0 Apr 22 16:52 cwd -> /opt/module/flume
[email protected]:/home/wangting >
# 使用nc 向ops01(本機的ip解析向ops01)的44444端口發送資料,場景類似業務應用實時流資料推送
[email protected]:/opt/module/flume/datas >nc ops01 44444
wang
OK
ting
OK
666
OK
okokok
OK
test_sk 
OK
           

控制台輸出内容

# flume-ng agent啟動的控制台會有新的輸出内容
# Event: { headers:{} body: 77 61 6E 67                wang }
# Event: { headers:{} body: 74 69 6E 67                ting }
# Event: { headers:{} body: 36 36 36   	                666 }
# Event: { headers:{} body: 6F 6B 6F 6B 6F 6B        okokok }
# Event: { headers:{} body: 74 65 73 74 5F 73 6B    test_sk }
2021-04-22 17:08:22,500 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67                                     wang }
2021-04-22 17:08:22,501 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 69 6E 67                                     ting }
2021-04-22 17:08:22,501 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 36 36 36                                        666 }
2021-04-22 17:08:24,966 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6F 6B 6F 6B 6F 6B                               okokok }
2021-04-22 17:08:39,968 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 5F 73 6B                            test_sk }
           

結論:使用Flume監聽一個端口,收集該端口資料,并列印到控制台,測試驗證符合場景需求

配置服務日志

[email protected]:/opt/module/flume >cd /opt/module/flume/conf
# 以下幾行配置更改
[email protected]:/opt/module/flume/conf >vim log4j.properties
#flume.root.logger=DEBUG,LOGFILE
flume.root.logger=INFO,LOGFILE
flume.log.dir=/opt/module/flume/logs
flume.log.file=flume.log
[email protected]:/opt/module/flume/conf >cd ..
[email protected]:/opt/module/flume >mkdir logs
[email protected]:/opt/module/flume >touch logs/flume.log
[email protected]:/opt/module/flume >flume-ng agent --name bigdata --conf conf/ --conf-file datas/netcatsource_loggersink.conf
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata --conf-file datas/netcatsource_loggersink.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

           

另起一個會話

[email protected]:/opt/module/flume/ >
[email protected]:/opt/module/flume/ >nc ops01 44444
aaa
OK
bbb
OK
ccc
OK
           

結束agent并檢視日志檔案

[email protected]:/opt/module/flume/logs >cat flume.log 
22 Apr 2021 18:10:53,011 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:62)  - Configuration provider starting
22 Apr 2021 18:10:53,017 INFO  [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:138)  - Reloading configuration file:datas/netcatsource_loggersink.conf
22 Apr 2021 18:10:53,024 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:r1
22 Apr 2021 18:10:53,025 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:r1
22 Apr 2021 18:10:53,025 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:k1
22 Apr 2021 18:10:53,026 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:c1
22 Apr 2021 18:10:53,026 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:k1
22 Apr 2021 18:10:53,026 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:c1
22 Apr 2021 18:10:53,026 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:r1
22 Apr 2021 18:10:53,026 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig:1203)  - Processing:r1
22 Apr 2021 18:10:53,027 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1117)  - Added sinks: k1 Agent: bigdata
22 Apr 2021 18:10:53,027 WARN  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet:623)  - Agent configuration for 'bigdata' has no configfilters.
22 Apr 2021 18:10:53,048 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:163)  - Post-validation flume configuration contains configuration for agents: [bigdata]
22 Apr 2021 18:10:53,048 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:151)  - Creating channels
22 Apr 2021 18:10:53,056 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:42)  - Creating instance of channel c1 type memory
22 Apr 2021 18:10:53,061 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205)  - Created channel c1
22 Apr 2021 18:10:53,064 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41)  - Creating instance of source r1, type netcat
22 Apr 2021 18:10:53,071 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: k1, type: logger
22 Apr 2021 18:10:53,074 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.getConfiguration:120)  - Channel c1 connected to [r1, k1]
22 Apr 2021 18:10:53,078 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:162)  - Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:o[email protected] counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
22 Apr 2021 18:10:53,080 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:169)  - Starting Channel c1
22 Apr 2021 18:10:53,134 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:119)  - Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
22 Apr 2021 18:10:53,135 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95)  - Component type: CHANNEL, name: c1 started
22 Apr 2021 18:10:53,135 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:196)  - Starting Sink k1
22 Apr 2021 18:10:53,135 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:207)  - Starting Source r1
22 Apr 2021 18:10:53,136 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.NetcatSource.start:155)  - Source starting
22 Apr 2021 18:10:53,146 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.NetcatSource.start:166)  - Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/11.8.37.50:44444]
22 Apr 2021 18:11:03,355 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95)  - Event: { headers:{} body: 61 61 61                                        aaa }
22 Apr 2021 18:11:10,021 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95)  - Event: { headers:{} body: 62 62 62                                        bbb }
22 Apr 2021 18:11:11,101 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95)  - Event: { headers:{} body: 63 63 63                                        ccc }
22 Apr 2021 18:11:15,901 INFO  [agent-shutdown-hook] (org.apache.flume.node.Application.stopAllComponents:125)  - Shutting down configuration: { sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:START} }} sinkRunners:{k1=SinkRunner: { policy:o[email protected] counterGroup:{ name:null counters:{runner.backoffs.consecutive=1, runner.backoffs=4} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
22 Apr 2021 18:11:15,902 INFO  [agent-shutdown-hook] (org.apache.flume.node.Application.stopAllComponents:129)  - Stopping Source r1
22 Apr 2021 18:11:15,902 INFO  [agent-shutdown-hook] (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:169)  - Stopping component: EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:START} }
22 Apr 2021 18:11:15,902 INFO  [agent-shutdown-hook] (org.apache.flume.source.NetcatSource.stop:197)  - Source stopping
22 Apr 2021 18:11:16,403 INFO  [agent-shutdown-hook] (org.apache.flume.node.Application.stopAllComponents:139)  - Stopping Sink k1
22 Apr 2021 18:11:16,404 INFO  [agent-shutdown-hook] (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:169)  - Stopping component: SinkRunner: { policy:o[email protected] counterGroup:{ name:null counters:{runner.backoffs.consecutive=1, runner.backoffs=4} } }
22 Apr 2021 18:11:16,404 INFO  [agent-shutdown-hook] (org.apache.flume.node.Application.stopAllComponents:149)  - Stopping Channel c1
22 Apr 2021 18:11:16,404 INFO  [agent-shutdown-hook] (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:169)  - Stopping component: org.apache.flume.channel.MemoryChannel{name: c1}
22 Apr 2021 18:11:16,405 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:149)  - Component type: CHANNEL, name: c1 stopped
22 Apr 2021 18:11:16,405 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:155)  - Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1619086253135
22 Apr 2021 18:11:16,405 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:161)  - Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1619086276405
22 Apr 2021 18:11:16,405 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177)  - Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 1000
22 Apr 2021 18:11:16,406 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177)  - Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 0
22 Apr 2021 18:11:16,406 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 3
22 Apr 2021 18:11:16,406 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 3
22 Apr 2021 18:11:16,406 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 8
22 Apr 2021 18:11:16,407 INFO  [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177)  - Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 3
22 Apr 2021 18:11:16,407 INFO  [agent-shutdown-hook] (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78)  - Stopping lifecycle supervisor 12
22 Apr 2021 18:11:16,411 INFO  [agent-shutdown-hook] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.stop:84)  - Configuration provider stopping
           

Flume使用案例2

場景:實時監控單個追加檔案

背景需求:

實時監控應用Hive日志,當Hive日志中有新内容時,同步上傳到HDFS中

  1. 建立符合條件的flume配置檔案
  2. 執行flume-ng配置檔案,開啟監控
  3. 開啟Hive,檢視Hive日志檔案路徑 /opt/module/hive/logs/hiveServer2.log,用于監控
  4. 檢視驗證HDFS上資料

【注意】: 測試預設已經具備了Hadoop叢集部署以及hive服務等環境條件;

準備工作,編寫配置

在/opt/module/flume/datas目錄下編寫配置檔案,flume-file-hdfs.conf

[email protected]:/opt/module/flume/datas >vim flume-file-hdfs.conf 

# Name the components on this agent
bigdata.sources = r2
bigdata.sinks = k2
bigdata.channels = c2

# Describe/configure the source
bigdata.sources.r2.type = exec
# 注意路徑和日志名根據實際情況配置
bigdata.sources.r2.command = tail -F /opt/module/hive/logs/hiveServer2.log
bigdata.sources.r2.shell = /bin/bash -c

# Describe the sink
bigdata.sinks.k2.type = hdfs
# 注意hdfs根據真實情況配置
bigdata.sinks.k2.hdfs.path = hdfs://ops01:8020/flume/%Y%m%d/%H
#上傳檔案的字首
bigdata.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動檔案夾
bigdata.sinks.k2.hdfs.round = true
#多少時間機關建立一個新的檔案夾
bigdata.sinks.k2.hdfs.roundValue = 1
#重新定義時間機關
bigdata.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
bigdata.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
bigdata.sinks.k2.hdfs.batchSize = 100
#設定檔案類型,可支援壓縮
bigdata.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的檔案
bigdata.sinks.k2.hdfs.rollInterval = 60
#設定每個檔案的滾動大小
bigdata.sinks.k2.hdfs.rollSize = 134217700
#檔案的滾動與Event數量無關
bigdata.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
bigdata.channels.c2.type = memory
bigdata.channels.c2.capacity = 1000
bigdata.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata.sources.r2.channels = c2
bigdata.sinks.k2.channel = c2
           

啟動agent

切入到/opt/module/flume應用目錄下,啟動agent

[email protected]:/opt/module/flume >flume-ng agent --name bigdata --conf datas/  --conf-file datas/flume-file-hdfs.conf -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/module/flume/datas:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata --conf-file datas/flume-file-hdfs.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-04-23 11:18:31,268 INFO  [lifecycleSupervisor-1-0] node.PollingPropertiesFileConfigurationProvider (PollingPropertiesFileConfigurationProvider.java:start(62)) - Configuration provider starting
2021-04-23 11:18:31,275 INFO  [conf-file-poller-0] node.PollingPropertiesFileConfigurationProvider (PollingPropertiesFileConfigurationProvider.java:run(138)) - Reloading configuration file:datas/flume-file-hdfs.conf
2021-04-23 11:18:31,282 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:c2
2021-04-23 11:18:31,283 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:r2
2021-04-23 11:18:31,283 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:k2
2021-04-23 11:18:31,284 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:r2
2021-04-23 11:18:31,284 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addProperty(1117)) - Added sinks: k2 Agent: bigdata
2021-04-23 11:18:31,284 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:c2
2021-04-23 11:18:31,284 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:k2
2021-04-23 11:18:31,284 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:c2
2021-04-23 11:18:31,288 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:k2
2021-04-23 11:18:31,288 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:addComponentConfig(1203)) - Processing:k2
2021-04-23 11:18:31,288 WARN  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:validateConfigFilterSet(623)) - Agent configuration for 'bigdata' has no configfilters.
2021-04-23 11:18:31,309 INFO  [conf-file-poller-0] conf.FlumeConfiguration (FlumeConfiguration.java:validateConfiguration(163)) - Post-validation flume configuration contains configuration for agents: [bigdata]
2021-04-23 11:18:31,310 INFO  [conf-file-poller-0] node.AbstractConfigurationProvider (AbstractConfigurationProvider.java:loadChannels(151)) - Creating channels
2021-04-23 11:18:31,317 INFO  [conf-file-poller-0] channel.DefaultChannelFactory (DefaultChannelFactory.java:create(42)) - Creating instance of channel c2 type memory
2021-04-23 11:18:31,324 INFO  [conf-file-poller-0] node.AbstractConfigurationProvider (AbstractConfigurationProvider.java:loadChannels(205)) - Created channel c2
2021-04-23 11:18:31,326 INFO  [conf-file-poller-0] source.DefaultSourceFactory (DefaultSourceFactory.java:create(41)) - Creating instance of source r2, type exec
2021-04-23 11:18:31,333 INFO  [conf-file-poller-0] sink.DefaultSinkFactory (DefaultSinkFactory.java:create(42)) - Creating instance of sink: k2, type: hdfs
2021-04-23 11:18:31,343 INFO  [conf-file-poller-0] node.AbstractConfigurationProvider (AbstractConfigurationProvider.java:getConfiguration(120)) - Channel c2 connected to [r2, k2]
2021-04-23 11:18:31,346 INFO  [conf-file-poller-0] node.Application (Application.java:startAllComponents(162)) - Starting new configuration:{ sourceRunners:{r2=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r2,state:IDLE} }} sinkRunners:{k2=SinkRunner: { policy:o[email protected] counterGroup:{ name:null counters:{} } }} channels:{c2=org.apache.flume.channel.MemoryChannel{name: c2}} }
2021-04-23 11:18:31,348 INFO  [conf-file-poller-0] node.Application (Application.java:startAllComponents(169)) - Starting Channel c2
2021-04-23 11:18:31,406 INFO  [lifecycleSupervisor-1-0] instrumentation.MonitoredCounterGroup (MonitoredCounterGroup.java:register(119)) - Monitored counter group for type: CHANNEL, name: c2: Successfully registered new MBean.
2021-04-23 11:18:31,406 INFO  [lifecycleSupervisor-1-0] instrumentation.MonitoredCounterGroup (MonitoredCounterGroup.java:start(95)) - Component type: CHANNEL, name: c2 started
2021-04-23 11:18:31,406 INFO  [conf-file-poller-0] node.Application (Application.java:startAllComponents(196)) - Starting Sink k2
2021-04-23 11:18:31,407 INFO  [conf-file-poller-0] node.Application (Application.java:startAllComponents(207)) - Starting Source r2
2021-04-23 11:18:31,408 INFO  [lifecycleSupervisor-1-1] source.ExecSource (ExecSource.java:start(170)) - Exec source starting with command: tail -F /opt/module/hive/logs/hiveServer2.log
2021-04-23 11:18:31,408 INFO  [lifecycleSupervisor-1-0] instrumentation.MonitoredCounterGroup (MonitoredCounterGroup.java:register(119)) - Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2021-04-23 11:18:31,408 INFO  [lifecycleSupervisor-1-0] instrumentation.MonitoredCounterGroup (MonitoredCounterGroup.java:start(95)) - Component type: SINK, name: k2 started
2021-04-23 11:18:31,409 INFO  [lifecycleSupervisor-1-1] instrumentation.MonitoredCounterGroup (MonitoredCounterGroup.java:register(119)) - Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
2021-04-23 11:18:31,409 INFO  [lifecycleSupervisor-1-1] instrumentation.MonitoredCounterGroup (MonitoredCounterGroup.java:start(95)) - Component type: SOURCE, name: r2 started
2021-04-23 11:18:35,425 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] hdfs.HDFSDataStream (HDFSDataStream.java:configure(57)) - Serializer = TEXT, UseRawLocalFileSystem = false
2021-04-23 11:18:35,536 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] hdfs.BucketWriter (BucketWriter.java:open(246)) - Creating hdfs://ops01:8020/flume/20210423/11/logs-.1619147915426.tmp
2021-04-23 11:18:35,873 INFO  [hdfs-k2-call-runner-0] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2021-04-23 11:18:39,736 INFO  [Thread-9] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-04-23 11:19:36,698 INFO  [hdfs-k2-roll-timer-0] hdfs.HDFSEventSink (HDFSEventSink.java:run(393)) - Writer callback called.
2021-04-23 11:19:36,698 INFO  [hdfs-k2-roll-timer-0] hdfs.BucketWriter (BucketWriter.java:doClose(438)) - Closing hdfs://ops01:8020/flume/20210423/11/logs-.1619147915426.tmp
2021-04-23 11:19:36,722 INFO  [hdfs-k2-call-runner-8] hdfs.BucketWriter (BucketWriter.java:call(681)) - Renaming hdfs://ops01:8020/flume/20210423/11/logs-.1619147915426.tmp to hdfs://ops01:8020/flume/20210423/11/logs-.1619147915426
2021-04-23 11:20:03,947 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] hdfs.HDFSDataStream (HDFSDataStream.java:configure(57)) - Serializer = TEXT, UseRawLocalFileSystem = false
2021-04-23 11:20:03,963 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] hdfs.BucketWriter (BucketWriter.java:open(246)) - Creating hdfs://ops01:8020/flume/20210423/11/logs-.1619148003947.tmp
2021-04-23 11:20:06,991 INFO  [Thread-15] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-04-23 11:21:03,984 INFO  [hdfs-k2-roll-timer-0] hdfs.HDFSEventSink (HDFSEventSink.java:run(393)) - Writer callback called.
2021-04-23 11:21:03,985 INFO  [hdfs-k2-roll-timer-0] hdfs.BucketWriter (BucketWriter.java:doClose(438)) - Closing hdfs://ops01:8020/flume/20210423/11/logs-.1619148003947.tmp
2021-04-23 11:21:03,998 INFO  [hdfs-k2-call-runner-2] hdfs.BucketWriter (BucketWriter.java:call(681)) - Renaming hdfs://ops01:8020/flume/20210423/11/logs-.1619148003947.tmp to hdfs://ops01:8020/flume/20210423/11/logs-.1619148003947
           

【注意】: 環境預設已經開啟了hdfs、hive、yarn等叢集服務;這裡不再細說各元件的部署搭建

場景實驗

登入Hive互動指令行

# 登入hive
[email protected]:/opt/module/hive >beeline -u jdbc:hive2://ops01:10000 -n wangting
# 執行個正确的指令
0: jdbc:hive2://ops01:10000> show tables;
INFO  : Compiling command(queryId=wangting_20210423111858_a9428a9d-ee27-48b7-8235-3b0ed75982b4): show tables
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=wangting_20210423111858_a9428a9d-ee27-48b7-8235-3b0ed75982b4); Time taken: 0.02 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=wangting_20210423111858_a9428a9d-ee27-48b7-8235-3b0ed75982b4): show tables
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=wangting_20210423111858_a9428a9d-ee27-48b7-8235-3b0ed75982b4); Time taken: 0.004 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+-------------------------------------+
|              tab_name               |
+-------------------------------------+
| dept                                |
| emp                                 |
| f_dmcp_n013_judicative_doc_content  |
| stu_partition                       |
| test                                |
| test2                               |
+-------------------------------------+
6 rows selected (0.037 seconds)
# 執行個錯誤指令,抛出錯誤cannot recognize input near 'show' 'tablesssssss'
0: jdbc:hive2://ops01:10000> show tablesssssss;
Error: Error while compiling statement: FAILED: ParseException line 1:5 cannot recognize input near 'show' 'tablesssssss' '<EOF>' in ddl statement (state=42000,code=40000)
# 再執行一個指令案例
0: jdbc:hive2://ops01:10000> select count(*) from emp;
INFO  : Compiling command(queryId=wangting_20210423112003_794bd7fd-f4bc-4179-ad34-06e64aee66ea): select count(*) from emp
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=wangting_20210423112003_794bd7fd-f4bc-4179-ad34-06e64aee66ea); Time taken: 0.119 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=wangting_20210423112003_794bd7fd-f4bc-4179-ad34-06e64aee66ea): select count(*) from emp
WARN  : Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
INFO  : Query ID = wangting_20210423112003_794bd7fd-f4bc-4179-ad34-06e64aee66ea
INFO  : Total jobs = 1
INFO  : Launching Job 1 out of 1
INFO  : Starting task [Stage-1:MAPRED] in serial mode
INFO  : Number of reduce tasks determined at compile time: 1
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1615531413182_0098
INFO  : Executing with tokens: []
INFO  : The url to track the job: http://ops02:8088/proxy/application_1615531413182_0098/
INFO  : Starting Job = job_1615531413182_0098, Tracking URL = http://ops02:8088/proxy/application_1615531413182_0098/
INFO  : Kill Command = /opt/module/hadoop-3.1.3/bin/mapred job  -kill job_1615531413182_0098
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
INFO  : 2021-04-23 11:20:12,466 Stage-1 map = 0%,  reduce = 0%
INFO  : 2021-04-23 11:20:20,663 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.53 sec
INFO  : 2021-04-23 11:20:28,849 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.4 sec
INFO  : MapReduce Total cumulative CPU time: 5 seconds 400 msec
INFO  : Ended Job = job_1615531413182_0098
INFO  : MapReduce Jobs Launched: 
INFO  : Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 5.4 sec   HDFS Read: 14007 HDFS Write: 102 SUCCESS
INFO  : Total MapReduce CPU Time Spent: 5 seconds 400 msec
INFO  : Completed executing command(queryId=wangting_20210423112003_794bd7fd-f4bc-4179-ad34-06e64aee66ea); Time taken: 25.956 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+------+
| _c0  |
+------+
| 14   |
+------+
1 row selected (26.095 seconds)
# # 同樣再執行錯誤指令,抛出錯誤Table not found 'empaaaaa'
0: jdbc:hive2://ops01:10000> select count(*) from empaaaaa;
Error: Error while compiling statement: FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'empaaaaa' (state=42S02,code=10001)
0: jdbc:hive2://ops01:10000> 
# 退出ctrl+c
           

如果以上操作沒有出現問題,那麼可以檢視一下hdfs上是否有預期的log檔案,檢視hdfs來驗證

[email protected]:/home/wangting >
# 檢視hdfs根目錄下是否存在flume目錄
[email protected]:/home/wangting >hdfs dfs -ls /
2021-04-23 11:24:55,647 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 9 items
drwxr-xr-x   - wangting supergroup          0 2021-03-17 11:44 /20210317
drwxr-xr-x   - wangting supergroup          0 2021-03-19 10:51 /20210319
drwxr-xr-x   - wangting supergroup          0 2021-04-23 11:18 /flume
-rw-r--r--   3 wangting supergroup  338075860 2021-03-12 11:50 /hadoop-3.1.3.tar.gz
drwxr-xr-x   - wangting supergroup          0 2021-04-04 11:07 /test.db
drwxr-xr-x   - wangting supergroup          0 2021-03-19 11:14 /testgetmerge
drwxr-xr-x   - wangting supergroup          0 2021-04-10 16:23 /tez
drwx------   - wangting supergroup          0 2021-04-02 15:14 /tmp
drwxr-xr-x   - wangting supergroup          0 2021-04-02 15:25 /user
# 檢視/flume目錄下是否按照flume-file-hdfs.conf配置中定義的有日期目錄和小時目錄
[email protected]:/home/wangting >hdfs dfs -ls /flume
2021-04-23 11:25:05,199 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
drwxr-xr-x   - wangting supergroup          0 2021-04-23 11:18 /flume/20210423
[email protected]:/home/wangting >hdfs dfs -ls /flume/20210423/
2021-04-23 11:25:14,685 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
drwxr-xr-x   - wangting supergroup          0 2021-04-23 11:21 /flume/20210423/11
[email protected]:/home/wangting >hdfs dfs -ls /flume/20210423/11
2021-04-23 11:25:19,814 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 2 items
-rw-r--r--   3 wangting supergroup       4949 2021-04-23 11:19 /flume/20210423/11/logs-.1619147915426
-rw-r--r--   3 wangting supergroup       1297 2021-04-23 11:21 /flume/20210423/11/logs-.1619148003947
# 檢視小時目錄11時下的log檔案logs-.1619147915426,可以看到有cannot recognize input near 'show' 'tablesssssss'的相關報錯
[email protected]:/home/wangting >hdfs dfs -cat /flume/20210423/11/logs-.1619147915426
2021-04-23 11:25:37,024 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2021-04-23 11:25:37,749 INFO  [main] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
FAILED: ParseException line 1:5 cannot recognize input near 'show' 'tablessss' '<EOF>' in ddl statement
OK
OK
NoViableAltException(24@[917:1: ddlStatement : ( createDatabaseStatement | switchDatabaseStatement | dropDatabaseStatement | createTableStatement | dropTableStatement | truncateTableStatement | alterStatement | descStatement | showStatement | metastoreCheck | createViewStatement | createMaterializedViewStatement | dropViewStatement | dropMaterializedViewStatement | createFunctionStatement | createMacroStatement | dropFunctionStatement | reloadFunctionStatement | dropMacroStatement | analyzeStatement | lockStatement | unlockStatement | lockDatabase | unlockDatabase | createRoleStatement | dropRoleStatement | ( grantPrivileges )=> grantPrivileges | ( revokePrivileges )=> revokePrivileges | showGrants | showRoleGrants | showRolePrincipals | showRoles | grantRole | revokeRole | setRole | showCurrentRole | abortTransactionStatement | killQueryStatement | resourcePlanDdlStatements );])
	at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
	at org.antlr.runtime.DFA.predict(DFA.java:144)
	at org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:4244)
	at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:2494)
	at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1420)
	at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:220)
	at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:74)
	at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:67)
	at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:616)
	at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1826)
	at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1773)
	at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1768)
	at org.apache.hadoop.hive.ql.reexec.ReExecDriver.compileAndRespond(ReExecDriver.java:126)
	at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:197)
	at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:260)
	at org.apache.hive.service.cli.operation.Operation.run(Operation.java:247)
	at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:541)
	at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:527)
	at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
	at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
	at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
	at com.sun.proxy.$Proxy37.executeStatementAsync(Unknown Source)
	at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:312)
	at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:562)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
FAILED: ParseException line 1:5 cannot recognize input near 'show' 'tablesssssss' '<EOF>' in ddl statement
# 檢視小時目錄11時下的log檔案logs-.1619148003947,可以看到有Table not found 'empaaaaa'的相關報錯
[email protected]:/home/wangting >hdfs dfs -cat /flume/20210423/11/logs-.1619148003947
2021-04-23 11:25:50,566 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2021-04-23 11:25:51,293 INFO  [main] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Query ID = wangting_20210423112003_794bd7fd-f4bc-4179-ad34-06e64aee66ea
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1615531413182_0098, Tracking URL = http://ops02:8088/proxy/application_1615531413182_0098/
Kill Command = /opt/module/hadoop-3.1.3/bin/mapred job  -kill job_1615531413182_0098
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2021-04-23 11:20:12,466 Stage-1 map = 0%,  reduce = 0%
2021-04-23 11:20:20,663 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.53 sec
2021-04-23 11:20:28,849 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.4 sec
MapReduce Total cumulative CPU time: 5 seconds 400 msec
Ended Job = job_1615531413182_0098
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 5.4 sec   HDFS Read: 14007 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 400 msec
OK
FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'empaaaaa'

           

結論:實時監控應用Hive日志,當Hive日志中有新内容時,内容同步上傳到HDFS中,測試驗證符合場景需求

Flume使用案例3

場景:實時監控目錄下多個新檔案

案例2單個檔案采用exec,案例3多個檔案采用spooldir

背景需求:

使用Flume監聽伺服器某個路徑下整個目錄的檔案變化,并上傳至HDFS

  1. 建立符合條件的flume配置檔案
  2. 執行flume-ng配置檔案,開啟監控
  3. 向upload目錄中添加檔案,被監控的目錄/opt/module/flume/upload/
  4. 檢視驗證HDFS上資料
  5. 檢視/opt/module/flume/upload目錄中上傳的檔案是否已經标記為.COMPLETED結尾;.tmp字尾結尾檔案沒有上傳。

準備工作,編寫配置

在/opt/module/flume/datas目錄下編寫配置檔案,flume-dir-hdfs.conf

[email protected]:/opt/module/flume >ls
bin  CHANGELOG  conf  datas  DEVNOTES  doap_Flume.rdf  docs  lib  LICENSE  logs  NOTICE  README.md  RELEASE-NOTES  tools
[email protected]:/opt/module/flume >mkdir upload
[email protected]:/opt/module/flume >cd datas/
[email protected]:/opt/module/flume/datas >ls
flume-file-hdfs.conf  netcatsource_loggersink.conf
[email protected]:/opt/module/flume/datas >vim flume-dir-hdfs.conf
# source/channel/sink
bigdata.sources = r3
bigdata.sinks = k3
bigdata.channels = c3

# Describe/configure the source
bigdata.sources.r3.type = spooldir
bigdata.sources.r3.spoolDir = /opt/module/flume/upload
bigdata.sources.r3.fileSuffix = .COMPLETED
bigdata.sources.r3.fileHeader = true
#忽略所有以.tmp結尾的檔案,不上傳
bigdata.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
bigdata.sinks.k3.type = hdfs
bigdata.sinks.k3.hdfs.path = hdfs://ops01:8020/flume/upload/%Y%m%d/%H
#上傳檔案的字首
bigdata.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動檔案夾
bigdata.sinks.k3.hdfs.round = true
#多少時間機關建立一個新的檔案夾
bigdata.sinks.k3.hdfs.roundValue = 1
#重新定義時間機關
bigdata.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
bigdata.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
bigdata.sinks.k3.hdfs.batchSize = 100
#設定檔案類型,可支援壓縮
bigdata.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的檔案
bigdata.sinks.k3.hdfs.rollInterval = 60
#設定每個檔案的滾動大小大概是128M
bigdata.sinks.k3.hdfs.rollSize = 134217700
#檔案的滾動與Event數量無關
bigdata.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
bigdata.channels.c3.type = memory
bigdata.channels.c3.capacity = 1000
bigdata.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata.sources.r3.channels = c3
bigdata.sinks.k3.channel = c3
           

啟動agent

[email protected]:/opt/module/flume >ll upload/
total 0
[email protected]:/opt/module/flume >flume-ng agent -c conf/ -n bigdata -f datas/flume-dir-hdfs.conf 
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application -n bigdata -f datas/flume-dir-hdfs.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
           

【注意1】

flume-ng agent -c conf/ -n bigdata -f datas/flume-dir-hdfs.conf

-c 是 --conf 縮寫

-f 是 --conf-file 縮寫

-n 是 --name 縮寫

相當于flume-ng agent --conf conf/ --name bigdata --conf-file datas/flume-dir-hdfs.conf

【注意2】

這裡監控的是dir,次元是目錄級别的,并且是新檔案;也就是監控被監控目錄下的檔案新增變化;是以在使用Spooling Directory Source時,不要在監控目錄中upload建立後又持續修改檔案;上傳完成的檔案會以.COMPLETED結尾;被監控檔案夾每500毫秒掃描一次檔案變動。

場景實驗

[email protected]:/home/wangting >cd /opt/module/flume/upload/
# 目前目錄為空
[email protected]:/opt/module/flume/upload >ll
total 0
# 模拟一個.txt結尾檔案
[email protected]:/opt/module/flume/upload >touch wang.txt
# 模拟一個.tmp結尾檔案
[email protected]:/opt/module/flume/upload >touch ting.tmp
# # 模拟一個.log結尾檔案
[email protected]:/opt/module/flume/upload >touch ting.log
# 模拟一個帶.tmp但其它内容結尾的檔案
[email protected]:/opt/module/flume/upload >touch bigdata.tmp_bak
# 建立完畢後,ls -l 檢視驗證
# 配置檔案中定義了忽略所有以.tmp結尾的檔案,不上傳 配置bigdata.sources.r3.ignorePattern = ([^ ]*\.tmp)
[email protected]:/opt/module/flume/upload >ll
total 0
-rw-rw-r-- 1 wangting wangting 0 Apr 24 14:11 bigdata.tmp_bak.COMPLETED
-rw-rw-r-- 1 wangting wangting 0 Apr 24 14:11 ting.log.COMPLETED
-rw-rw-r-- 1 wangting wangting 0 Apr 24 14:11 ting.tmp
-rw-rw-r-- 1 wangting wangting 0 Apr 24 14:11 wang.txt.COMPLETED

# 是以結果是.tmp結尾的不讀取,其它讀取;檢視日志
[email protected]:/opt/module/flume/upload >cd /opt/module/flume/logs/
[email protected]:/opt/module/flume/logs >ll
total 20
-rw-rw-r-- 1 wangting wangting 19333 Apr 24 14:12 flume.log
[email protected]:/opt/module/flume/logs >tail -f flume.log 
24 Apr 2021 14:11:05,980 INFO  [pool-5-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:497)  - Preparing to move file /opt/module/flume/upload/wang.txt to /opt/module/flume/upload/wang.txt.COMPLETED
24 Apr 2021 14:11:07,984 INFO  [pool-5-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents:384)  - Last read took us just up to a file boundary. Rolling to the next file, if there is one.
24 Apr 2021 14:11:07,985 INFO  [pool-5-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:497)  - Preparing to move file /opt/module/flume/upload/bigdata.tmp_bak to /opt/module/flume/upload/bigdata.tmp_bak.COMPLETED
24 Apr 2021 14:11:10,677 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSDataStream.configure:57)  - Serializer = TEXT, UseRawLocalFileSystem = false
24 Apr 2021 14:11:10,860 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:246)  - Creating hdfs://ops01:8020/flume/upload/20210424/14/upload-.1619244670678.tmp
24 Apr 2021 14:11:11,200 INFO  [hdfs-k3-call-runner-0] (org.apache.hadoop.conf.Configuration.logDeprecation:1395)  - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
24 Apr 2021 14:11:15,019 INFO  [Thread-8] (org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend:239)  - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
24 Apr 2021 14:12:11,989 INFO  [hdfs-k3-roll-timer-0] (org.apache.flume.sink.hdfs.HDFSEventSink$1.run:393)  - Writer callback called.
24 Apr 2021 14:12:11,990 INFO  [hdfs-k3-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.doClose:438)  - Closing hdfs://ops01:8020/flume/upload/20210424/14/upload-.1619244670678.tmp
24 Apr 2021 14:12:12,015 INFO  [hdfs-k3-call-runner-6] (org.apache.flume.sink.hdfs.BucketWriter$7.call:681)  - Renaming hdfs://ops01:8020/flume/upload/20210424/14/upload-.1619244670678.tmp to hdfs://ops01:8020/flume/upload/20210424/14/upload-.1619244670678
# 擷取到hdfs相關内容資訊hdfs://ops01:8020/flume/upload/20210424/14/upload-.1619244670678 
           

檢視hdfs資訊

[email protected]:/opt/module/flume/upload >hdfs dfs -ls /flume/upload/20210424/
2021-04-24 14:13:20,594 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
drwxr-xr-x   - wangting supergroup          0 2021-04-24 14:12 /flume/upload/20210424/14
[email protected]:/opt/module/flume/upload >hdfs dfs -ls /flume/upload/20210424/14
2021-04-24 14:13:27,463 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
-rw-r--r--   3 wangting supergroup          3 2021-04-24 14:12 /flume/upload/20210424/14/upload-.1619244670678
[email protected]:/opt/module/flume/upload >
           

Flume使用案例4

場景:實時監控目錄下的多個追加檔案

背景需求

  1. 建立符合條件的flume配置檔案
  2. 執行配置檔案,開啟agent監控目錄下檔案狀态變化\
  3. 向監控檔案追加内容

    echo wang >> download/file1.txt

    echo ting >> download/file2.txt

  4. 被監控檔案路徑 /opt/module/flume/download
  5. 檢視HDFS上資料

準備工作,編寫配置

在/opt/module/flume/datas目錄下編寫配置檔案,flume-taildir-hdfs.conf

[email protected]:/opt/module/flume >mkdir download
[email protected]:/opt/module/flume >cd datas/
[email protected]:/opt/module/flume/datas >ll
total 12
-rw-rw-r-- 1 wangting wangting 1533 Apr 24 14:05 flume-dir-hdfs.conf
-rw-rw-r-- 1 wangting wangting 1405 Apr 23 11:13 flume-file-hdfs.conf
-rw-rw-r-- 1 wangting wangting  787 Apr 17 15:58 netcatsource_loggersink.conf
[email protected]:/opt/module/flume/datas >vim flume-taildir-hdfs.conf

bigdata.sources = r3
bigdata.sinks = k3
bigdata.channels = c3

# Describe/configure the source
bigdata.sources.r3.type = TAILDIR
bigdata.sources.r3.positionFile = /opt/module/flume/tail_dir.json
bigdata.sources.r3.filegroups = f1 f2
bigdata.sources.r3.filegroups.f1 = /opt/module/flume/download/.*file.*
bigdata.sources.r3.filegroups.f2 = /opt/module/flume/download/.*log.*

# Describe the sink
bigdata.sinks.k3.type = hdfs
bigdata.sinks.k3.hdfs.path = hdfs://ops01:8020/flume/download/%Y%m%d/%H
#上傳檔案的字首
bigdata.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動檔案夾
bigdata.sinks.k3.hdfs.round = true
#多少時間機關建立一個新的檔案夾
bigdata.sinks.k3.hdfs.roundValue = 1
#重新定義時間機關
bigdata.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
bigdata.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
bigdata.sinks.k3.hdfs.batchSize = 100
#設定檔案類型,可支援壓縮
bigdata.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的檔案
bigdata.sinks.k3.hdfs.rollInterval = 60
#設定每個檔案的滾動大小大概是128M
bigdata.sinks.k3.hdfs.rollSize = 134217700
#檔案的滾動與Event數量無關
bigdata.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memorytail 
bigdata.channels.c3.type = memory
bigdata.channels.c3.capacity = 1000
bigdata.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata.sources.r3.channels = c3
bigdata.sinks.k3.channel = c3
           

啟動agent

[email protected]:/opt/module/flume >flume-ng agent -c conf/ -n bigdata -f datas/flume-taildir-hdfs.conf 
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application -n bigdata -f datas/flume-taildir-hdfs.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

           

場景實驗

[email protected]:/opt/module/flume >ll
total 188
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:14 bin
-rw-rw-r--  1 wangting wangting 85602 Nov 29  2018 CHANGELOG
drwxr-xr-x  2 wangting wangting  4096 Apr 22 18:18 conf
drwxrwxr-x  2 wangting wangting  4096 Apr 24 14:59 datas
-rw-r--r--  1 wangting wangting  5681 Nov 16  2017 DEVNOTES
-rw-r--r--  1 wangting wangting  2873 Nov 16  2017 doap_Flume.rdf
drwxrwxr-x 12 wangting wangting  4096 Dec 18  2018 docs
drwxrwxr-x  2 wangting wangting  4096 Apr 24 14:56 download
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:15 lib
-rw-rw-r--  1 wangting wangting 43405 Dec 10  2018 LICENSE
drwxrwxr-x  2 wangting wangting  4096 Apr 22 18:11 logs
-rw-r--r--  1 wangting wangting   249 Nov 29  2018 NOTICE
-rw-r--r--  1 wangting wangting  2483 Nov 16  2017 README.md
-rw-rw-r--  1 wangting wangting  1958 Dec 10  2018 RELEASE-NOTES
-rw-rw-r--  1 wangting wangting     0 Apr 24 15:02 tail_dir.json
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:14 tools
drwxrwxr-x  3 wangting wangting  4096 Apr 24 14:11 upload
[email protected]:/opt/module/flume >pwd
/opt/module/flume
[email protected]:/opt/module/flume >echo wang >> download/file1.txt
[email protected]:/opt/module/flume >echo ting >> download/file2.txt
[email protected]:/opt/module/flume >ll download/
total 8
-rw-rw-r-- 1 wangting wangting 5 Apr 24 15:02 file1.txt
-rw-rw-r-- 1 wangting wangting 5 Apr 24 15:02 file2.txt
[email protected]:/opt/module/flume >ll
total 192
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:14 bin
-rw-rw-r--  1 wangting wangting 85602 Nov 29  2018 CHANGELOG
drwxr-xr-x  2 wangting wangting  4096 Apr 22 18:18 conf
drwxrwxr-x  2 wangting wangting  4096 Apr 24 14:59 datas
-rw-r--r--  1 wangting wangting  5681 Nov 16  2017 DEVNOTES
-rw-r--r--  1 wangting wangting  2873 Nov 16  2017 doap_Flume.rdf
drwxrwxr-x 12 wangting wangting  4096 Dec 18  2018 docs
drwxrwxr-x  2 wangting wangting  4096 Apr 24 15:02 download
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:15 lib
-rw-rw-r--  1 wangting wangting 43405 Dec 10  2018 LICENSE
drwxrwxr-x  2 wangting wangting  4096 Apr 22 18:11 logs
-rw-r--r--  1 wangting wangting   249 Nov 29  2018 NOTICE
-rw-r--r--  1 wangting wangting  2483 Nov 16  2017 README.md
-rw-rw-r--  1 wangting wangting  1958 Dec 10  2018 RELEASE-NOTES
-rw-rw-r--  1 wangting wangting   145 Apr 24 15:03 tail_dir.json
drwxr-xr-x  2 wangting wangting  4096 Apr 17 14:14 tools
drwxrwxr-x  3 wangting wangting  4096 Apr 24 14:11 upload
[email protected]:/opt/module/flume >cat tail_dir.json
[{"inode":4203350,"pos":5,"file":"/opt/module/flume/download/file1.txt"},{"inode":4203351,"pos":5,"file":"/opt/module/flume/download/file2.txt"}]
[email protected]:/opt/module/flume >echo wang222 >> download/file1.txt
[email protected]:/opt/module/flume >echo ting222 >> download/file2.txt
[email protected]:/opt/module/flume >
[email protected]:/opt/module/flume >cat tail_dir.json
[{"inode":4203350,"pos":13,"file":"/opt/module/flume/download/file1.txt"},{"inode":4203351,"pos":13,"file":"/opt/module/flume/download/file2.txt"}]
[email protected]:/opt/module/flume >
# 注意pos的值變化,這裡相當于記錄位置的指針

# 檢視日志資訊
[email protected]:/opt/module/flume >
[email protected]:/opt/module/flume >tail -f /opt/module/flume/logs/flume.log 
24 Apr 2021 15:03:00,395 INFO  [Thread-9] (org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend:239)  - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
24 Apr 2021 15:03:57,359 INFO  [hdfs-k3-roll-timer-0] (org.apache.flume.sink.hdfs.HDFSEventSink$1.run:393)  - Writer callback called.
24 Apr 2021 15:03:57,360 INFO  [hdfs-k3-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.doClose:438)  - Closing hdfs://ops01:8020/flume/download/20210424/15/upload-.1619247776033.tmp
24 Apr 2021 15:03:57,381 INFO  [hdfs-k3-call-runner-5] (org.apache.flume.sink.hdfs.BucketWriter$7.call:681)  - Renaming hdfs://ops01:8020/flume/download/20210424/15/upload-.1619247776033.tmp to hdfs://ops01:8020/flume/download/20210424/15/upload-.1619247776033
24 Apr 2021 15:04:26,502 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSDataStream.configure:57)  - Serializer = TEXT, UseRawLocalFileSystem = false
24 Apr 2021 15:04:26,515 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:246)  - Creating hdfs://ops01:8020/flume/download/20210424/15/upload-.1619247866503.tmp
24 Apr 2021 15:04:29,545 INFO  [Thread-15] (org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend:239)  - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
24 Apr 2021 15:05:26,536 INFO  [hdfs-k3-roll-timer-0] (org.apache.flume.sink.hdfs.HDFSEventSink$1.run:393)  - Writer callback called.
24 Apr 2021 15:05:26,536 INFO  [hdfs-k3-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.doClose:438)  - Closing hdfs://ops01:8020/flume/download/20210424/15/upload-.1619247866503.tmp
24 Apr 2021 15:05:26,550 INFO  [hdfs-k3-call-runner-2] (org.apache.flume.sink.hdfs.BucketWriter$7.call:681)  - Renaming hdfs://ops01:8020/flume/download/20210424/15/upload-.1619247866503.tmp to hdfs://ops01:8020/flume/download/20210424/15/upload-.1619247866503

# 檢視hdfs資訊
[email protected]:/opt/module/flume >hdfs dfs -ls /flume/download/20210424/15/
2021-04-24 15:07:19,138 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 2 items
-rw-r--r--   3 wangting supergroup         10 2021-04-24 15:03 /flume/download/20210424/15/upload-.1619247776033
-rw-r--r--   3 wangting supergroup         16 2021-04-24 15:05 /flume/download/20210424/15/upload-.1619247866503
[email protected]:/opt/module/flume >hdfs dfs -cat /flume/download/20210424/15/upload-.1619247776033
2021-04-24 15:07:37,749 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2021-04-24 15:07:38,472 INFO  [main] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
wang
ting
[email protected]:/opt/module/flume >hdfs dfs -cat /flume/download/20210424/15/upload-.1619247866503
2021-04-24 15:07:51,807 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2021-04-24 15:07:52,533 INFO  [main] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
wang222
ting222
           

【注意1】

Taildir Source維護了一個.json格式的position File,其會定期的往position File中更新每個檔案讀取到的最新的位置,是以能夠實作斷點續傳。Position File的格式如下:

[{“inode”:4203350,“pos”:13,“file”:"/opt/module/flume/download/file1.txt"},

{“inode”:4203351,“pos”:13,“file”:"/opt/module/flume/download/file2.txt"}]

【注意2】

Linux中儲存檔案中繼資料的區域就叫做inode,每個inode都有一個号碼,作業系統用inode号碼來識别不同的檔案,Unix/Linux系統内部不使用檔案名,而使用inode号碼來識别檔案。這樣inode + pos 就可以定位到指針位置,再關聯file的檔案名

Flume進階

Flume事務

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

Put事務流程

doPut:将批資料先寫入臨時緩沖區putList

doCommit:檢查channel記憶體隊列是否足夠合并。

doRollback:channel記憶體隊列空間不足,復原資料

Take事務

doTake:将資料取到臨時緩沖區takeList,并将資料發送到HDFS

doCommit:如果資料全部發送成功,則清除臨時緩沖區takeList

doRollback:資料發送過程中如果出現異常,rollback将臨時緩沖區takeList中的資料歸還給channel記憶體隊列。

Flume Agent内部原理

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

重要元件:

ChannelSelector

ChannelSelector的作用就是選出Event将要被發往哪個Channel。其共有兩種類型,分别是Replicating(複制)和Multiplexing(多路複用)。

ReplicatingSelector會将同一個Event發往所有的Channel,Multiplexing會根據相應的原則,将不同的Event發往不同的Channel。

SinkProcessor

SinkProcessor共有三種類型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor

DefaultSinkProcessor對應的是單個的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor對應的是Sink Group,LoadBalancingSinkProcessor可以實作負載均衡的功能,FailoverSinkProcessor可以錯誤恢複的功能。

Flume拓撲結構

簡單串聯

​ 這種模式是将多個flume順序連接配接起來了,從最初的source開始到最終sink傳送的目的存儲系統。此模式不建議橋接過多的flume數量, flume數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點flume當機,會影響整個傳輸系統。

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

複制和多路複用

​ Flume支援将事件流向一個或者多個目的地。這種模式可以将相同資料複制到多個channel中,或者将不同資料分發到不同的channel中,sink可以選擇傳送到不同的目的地。

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

負載均衡和故障轉移

​ Flume支援使用将多個sink邏輯上分到一個sink組,sink組配合不同的SinkProcessor可以實作負載均衡和錯誤恢複的功能。

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

聚合

​ 這種模式是我們最常見的,也非常實用,日常web應用通常分布在上百個伺服器,大者甚至上千個、上萬個伺服器。産生的日志,處理起來也非常麻煩。用flume的這種組合方式能很好的解決這一問題,每台伺服器部署一個flume采集日志,傳送到一個集中收集日志的flume,再由此flume上傳到hdfs、hive、hbase等,進行日志分析。

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

Flume使用案例5

場景:複制和多路複用案例

背景需求

​ 使用Flume-1監控檔案變動,Flume-1将變動内容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1将變動内容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem,相當于有3個agent協作。

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

準備工作,編寫配置

編寫配置 /opt/module/flume/datas/05-flume-file-flume.conf

【注意】路徑日志/opt/module/hive/logs/hiveServer2.log根據實際情況配置

[email protected]:/opt/module/flume/datas >vim 05-flume-file-flume.conf
# Name the components on this agent
bigdata01.sources = r1
bigdata01.sinks = k1 k2
bigdata01.channels = c1 c2
# 将資料流複制給所有channel
bigdata01.sources.r1.selector.type = replicating

# Describe/configure the source
bigdata01.sources.r1.type = exec
bigdata01.sources.r1.command = tail -F /opt/module/hive/logs/hiveServer2.log
bigdata01.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個資料發送者
bigdata01.sinks.k1.type = avro
bigdata01.sinks.k1.hostname = ops01 
bigdata01.sinks.k1.port = 44441

bigdata01.sinks.k2.type = avro
bigdata01.sinks.k2.hostname = ops01
bigdata01.sinks.k2.port = 44442

# Describe the channel
bigdata01.channels.c1.type = memory
bigdata01.channels.c1.capacity = 1000
bigdata01.channels.c1.transactionCapacity = 100

bigdata01.channels.c2.type = memory
bigdata01.channels.c2.capacity = 1000
bigdata01.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata01.sources.r1.channels = c1 c2
bigdata01.sinks.k1.channel = c1
bigdata01.sinks.k2.channel = c2
           

編寫配置 /opt/module/flume/datas/05-flume-flume-hdfs.conf

【注意】bigdata02.sources.r1.port = 44441需要和上面bigdata01.sinks.k1.port = 44441端口一緻

# Name the components on this agent
bigdata02.sources = r1
bigdata02.sinks = k1
bigdata02.channels = c1

# Describe/configure the source
# source端的avro是一個資料接收服務
bigdata02.sources.r1.type = avro
bigdata02.sources.r1.bind = ops01
bigdata02.sources.r1.port = 44441

# Describe the sink
bigdata02.sinks.k1.type = hdfs
bigdata02.sinks.k1.hdfs.path = hdfs://ops01:8020/flume/%Y%m%d/%H
#上傳檔案的字首
bigdata02.sinks.k1.hdfs.filePrefix = flume-
#是否按照時間滾動檔案夾
bigdata02.sinks.k1.hdfs.round = true
#多少時間機關建立一個新的檔案夾
bigdata02.sinks.k1.hdfs.roundValue = 1
#重新定義時間機關
bigdata02.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
bigdata02.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
bigdata02.sinks.k1.hdfs.batchSize = 100
#設定檔案類型,可支援壓縮
bigdata02.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的檔案
bigdata02.sinks.k1.hdfs.rollInterval = 600
#設定每個檔案的滾動大小大概是128M
bigdata02.sinks.k1.hdfs.rollSize = 134217700
#檔案的滾動與Event數量無關
bigdata02.sinks.k1.hdfs.rollCount = 0

# Describe the channel
bigdata02.channels.c1.type = memory
bigdata02.channels.c1.capacity = 1000
bigdata02.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata02.sources.r1.channels = c1
bigdata02.sinks.k1.channel = c1
           

編寫配置 /opt/module/flume/datas/05-flume-flume-dir.conf

【注意】bigdata03.sources.r1.port = 44442 需要和上面bigdata01.sinks.k2.port = 44442端口一緻

【注意】/opt/module/flume/job目錄需要提前mkdir建立好,agent任務不會自動建立對應配置中的目錄

# Name the components on this agent
bigdata03.sources = r1
bigdata03.sinks = k1
bigdata03.channels = c2

# Describe/configure the source
bigdata03.sources.r1.type = avro
bigdata03.sources.r1.bind = ops01
bigdata03.sources.r1.port = 44442

# Describe the sink
bigdata03.sinks.k1.type = file_roll
bigdata03.sinks.k1.sink.directory = /opt/module/flume/job

# Describe the channel
bigdata03.channels.c2.type = memory
bigdata03.channels.c2.capacity = 1000
bigdata03.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata03.sources.r1.channels = c2
bigdata03.sinks.k1.channel = c2
           

啟動agent

【注意】需要啟動多個agent,需要開多個會話窗,需要保持多個agent都在持續運作

agent-1

flume-ng agent --conf conf/ --name bigdata03 --conf-file datas/05-flume-flume-dir.conf

[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata03 --conf-file datas/05-flume-flume-dir.conf
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata03 --conf-file datas/05-flume-flume-dir.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
           

agent-2

flume-ng agent --conf conf/ --name bigdata02 --conf-file datas/05-flume-flume-hdfs.conf

[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata02 --conf-file datas/05-flume-flume-hdfs.conf
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata02 --conf-file datas/05-flume-flume-hdfs.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
           

agent-3

flume-ng agent --conf conf/ --name bigdata01 --conf-file datas/05-flume-file-flume.conf

[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata01 --conf-file datas/05-flume-file-flume.conf
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata01 --conf-file datas/05-flume-file-flume.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
           
# 檢視已建立配置
[email protected]:/opt/module/flume/datas >ll
total 28
-rw-rw-r-- 1 wangting wangting 1057 Apr 24 17:01 05-flume-file-flume.conf
-rw-rw-r-- 1 wangting wangting  609 Apr 24 17:03 05-flume-flume-dir.conf
-rw-rw-r-- 1 wangting wangting 1437 Apr 24 17:02 05-flume-flume-hdfs.conf
-rw-rw-r-- 1 wangting wangting 1533 Apr 24 14:05 flume-dir-hdfs.conf
-rw-rw-r-- 1 wangting wangting 1405 Apr 23 11:13 flume-file-hdfs.conf
-rw-rw-r-- 1 wangting wangting 1526 Apr 24 14:59 flume-taildir-hdfs.conf
-rw-rw-r-- 1 wangting wangting  787 Apr 17 15:58 netcatsource_loggersink.conf
# 檢視/opt/module/flume/job目錄是否建立
[email protected]:/opt/module/flume/datas >ll /opt/module/flume/job
total 0
           

場景實驗

開啟Hive進行操作,産生記錄檔

# 進入Hive指令行
[email protected]:/opt/module/hive/conf >beeline -u jdbc:hive2://ops01:10000 -n wangting
Connecting to jdbc:hive2://ops01:10000
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.2 by Apache Hive
# 随意查詢一個表
0: jdbc:hive2://ops01:10000> select * from emp;
INFO  : Compiling command(queryId=wangting_20210424170631_54655d42-8542-49fb-ac6f-03ed302e4c02): select * from emp
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:emp.empno, type:int, comment:null), FieldSchema(name:emp.ename, type:string, comment:null), FieldSchema(name:emp.job, type:string, comment:null), FieldSchema(name:emp.mgr, type:int, comment:null), FieldSchema(name:emp.hiredate, type:string, comment:null), FieldSchema(name:emp.sal, type:double, comment:null), FieldSchema(name:emp.comm, type:double, comment:null), FieldSchema(name:emp.deptno, type:int, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=wangting_20210424170631_54655d42-8542-49fb-ac6f-03ed302e4c02); Time taken: 0.134 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=wangting_20210424170631_54655d42-8542-49fb-ac6f-03ed302e4c02): select * from emp
INFO  : Completed executing command(queryId=wangting_20210424170631_54655d42-8542-49fb-ac6f-03ed302e4c02); Time taken: 0.0 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+------------+------------+------------+----------+---------------+----------+-----------+-------------+
| emp.empno  | emp.ename  |  emp.job   | emp.mgr  | emp.hiredate  | emp.sal  | emp.comm  | emp.deptno  |
+------------+------------+------------+----------+---------------+----------+-----------+-------------+
| 7369       | SMITH      | CLERK      | 7902     | 1980-12-17    | 800.0    | NULL      | 20          |
| 7499       | ALLEN      | SALESMAN   | 7698     | 1981-2-20     | 1600.0   | 300.0     | 30          |
| 7521       | WARD       | SALESMAN   | 7698     | 1981-2-22     | 1250.0   | 500.0     | 30          |
| 7566       | JONES      | MANAGER    | 7839     | 1981-4-2      | 2975.0   | NULL      | 20          |
| 7654       | MARTIN     | SALESMAN   | 7698     | 1981-9-28     | 1250.0   | 1400.0    | 30          |
| 7698       | BLAKE      | MANAGER    | 7839     | 1981-5-1      | 2850.0   | NULL      | 30          |
| 7782       | CLARK      | MANAGER    | 7839     | 1981-6-9      | 2450.0   | NULL      | 10          |
| 7788       | SCOTT      | ANALYST    | 7566     | 1987-4-19     | 3000.0   | NULL      | 20          |
| 7839       | KING       | PRESIDENT  | NULL     | 1981-11-17    | 5000.0   | NULL      | 10          |
| 7844       | TURNER     | SALESMAN   | 7698     | 1981-9-8      | 1500.0   | 0.0       | 30          |
| 7876       | ADAMS      | CLERK      | 7788     | 1987-5-23     | 1100.0   | NULL      | 20          |
| 7900       | JAMES      | CLERK      | 7698     | 1981-12-3     | 950.0    | NULL      | 30          |
| 7902       | FORD       | ANALYST    | 7566     | 1981-12-3     | 3000.0   | NULL      | 20          |
| 7934       | MILLER     | CLERK      | 7782     | 1982-1-23     | 1300.0   | NULL      | 10          |
+------------+------------+------------+----------+---------------+----------+-----------+-------------+
14 rows selected (0.247 seconds)
# 調用MapReduce查詢
0: jdbc:hive2://ops01:10000> select count(*) from emp;
INFO  : Compiling command(queryId=wangting_20210424170653_1addfc8b-e2ed-4ba5-bb06-22b2573d30c6): select count(*) from emp
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=wangting_20210424170653_1addfc8b-e2ed-4ba5-bb06-22b2573d30c6); Time taken: 0.117 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=wangting_20210424170653_1addfc8b-e2ed-4ba5-bb06-22b2573d30c6): select count(*) from emp
WARN  : Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
INFO  : Query ID = wangting_20210424170653_1addfc8b-e2ed-4ba5-bb06-22b2573d30c6
INFO  : Total jobs = 1
INFO  : Launching Job 1 out of 1
INFO  : Starting task [Stage-1:MAPRED] in serial mode
INFO  : Number of reduce tasks determined at compile time: 1
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1615531413182_0102
INFO  : Executing with tokens: []
INFO  : The url to track the job: http://ops02:8088/proxy/application_1615531413182_0102/
INFO  : Starting Job = job_1615531413182_0102, Tracking URL = http://ops02:8088/proxy/application_1615531413182_0102/
INFO  : Kill Command = /opt/module/hadoop-3.1.3/bin/mapred job  -kill job_1615531413182_0102
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
INFO  : 2021-04-24 17:07:01,947 Stage-1 map = 0%,  reduce = 0%
INFO  : 2021-04-24 17:07:09,112 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.22 sec
INFO  : 2021-04-24 17:07:16,256 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.26 sec
INFO  : MapReduce Total cumulative CPU time: 5 seconds 260 msec
INFO  : Ended Job = job_1615531413182_0102
INFO  : MapReduce Jobs Launched: 
INFO  : Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 5.26 sec   HDFS Read: 14007 HDFS Write: 102 SUCCESS
INFO  : Total MapReduce CPU Time Spent: 5 seconds 260 msec
INFO  : Completed executing command(queryId=wangting_20210424170653_1addfc8b-e2ed-4ba5-bb06-22b2573d30c6); Time taken: 23.796 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+------+
| _c0  |
+------+
| 14   |
+------+
1 row selected (23.933 seconds)
# 查詢不存在的表,制造異常
0: jdbc:hive2://ops01:10000> select count(*) from emppppppppppppppp;
Error: Error while compiling statement: FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'emppppppppppppppp' (state=42S02,code=10001)
0: jdbc:hive2://ops01:10000> select count(*) from emppppppppppppppp;
Error: Error while compiling statement: FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'emppppppppppppppp' (state=42S02,code=10001)

           

驗證flume-dir方式,可以看到在/opt/module/flume/job目錄下會同步生成相關檔案

[email protected]:/opt/module/flume/job >ll
total 16
-rw-rw-r-- 1 wangting wangting   0 Apr 24 17:05 1619255120463-1
-rw-rw-r-- 1 wangting wangting 575 Apr 24 17:05 1619255120463-2
-rw-rw-r-- 1 wangting wangting   3 Apr 24 17:06 1619255120463-3
-rw-rw-r-- 1 wangting wangting 863 Apr 24 17:07 1619255120463-4
-rw-rw-r-- 1 wangting wangting 445 Apr 24 17:07 1619255120463-5

[email protected]:/opt/module/flume/job >cat 1619255120463-2
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2021-04-24 16:18:20,663 Stage-1 map = 0%,  reduce = 0%
2021-04-24 16:18:25,827 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.14 sec
2021-04-24 16:18:31,975 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.19 sec
MapReduce Total cumulative CPU time: 6 seconds 190 msec
Ended Job = job_1615531413182_0101
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 6.19 sec   HDFS Read: 14007 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 190 msec
OK
[email protected]:/opt/module/flume/job >cat 1619255120463-3
OK
[email protected]:/opt/module/flume/job >cat 1619255120463-4
Query ID = wangting_20210424170653_1addfc8b-e2ed-4ba5-bb06-22b2573d30c6
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1615531413182_0102, Tracking URL = http://ops02:8088/proxy/application_1615531413182_0102/
Kill Command = /opt/module/hadoop-3.1.3/bin/mapred job  -kill job_1615531413182_0102
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2021-04-24 17:07:01,947 Stage-1 map = 0%,  reduce = 0%
2021-04-24 17:07:09,112 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.22 sec
[email protected]:/opt/module/flume/job >cat 1619255120463-5
2021-04-24 17:07:16,256 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.26 sec
MapReduce Total cumulative CPU time: 5 seconds 260 msec
Ended Job = job_1615531413182_0102
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 5.26 sec   HDFS Read: 14007 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 260 msec
OK
FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'emppppppppppppppp'
FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'emppppppppppppppp'

           

驗證hdfs方式,可以看到hdfs同步也同步生成了hive對應日志

# 檢視對應目錄
[email protected]:/opt/module/flume/job >hdfs dfs -ls /flume/20210424/17
2021-04-24 17:12:46,127 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
-rw-r--r--   3 wangting supergroup        575 2021-04-24 17:05 /flume/20210424/17/flume-.1619255148002.tmp
# 檢視生成檔案寫入内容
[email protected]:/opt/module/flume/job >hdfs dfs -cat /flume/20210424/17/flume-.1619255148002.tmp
2021-04-24 17:13:17,766 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2021-04-24 17:13:18,648 INFO  [main] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2021-04-24 16:18:20,663 Stage-1 map = 0%,  reduce = 0%
2021-04-24 16:18:25,827 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.14 sec
2021-04-24 16:18:31,975 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.19 sec
MapReduce Total cumulative CPU time: 6 seconds 190 msec
Ended Job = job_1615531413182_0101
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 6.19 sec   HDFS Read: 14007 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 190 msec
OK
OK
Query ID = wangting_20210424170653_1addfc8b-e2ed-4ba5-bb06-22b2573d30c6
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1615531413182_0102, Tracking URL = http://ops02:8088/proxy/application_1615531413182_0102/
Kill Command = /opt/module/hadoop-3.1.3/bin/mapred job  -kill job_1615531413182_0102
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2021-04-24 17:07:01,947 Stage-1 map = 0%,  reduce = 0%
2021-04-24 17:07:09,112 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.22 sec
2021-04-24 17:07:16,256 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.26 sec
MapReduce Total cumulative CPU time: 5 seconds 260 msec
Ended Job = job_1615531413182_0102
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 5.26 sec   HDFS Read: 14007 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 260 msec
OK
FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'emppppppppppppppp'
FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'emppppppppppppppp'

           

Flume使用案例6

場景:負載均衡和故障轉移

背景需求

使用Flume1監控一個端口,其sink組中的sink分别對接Flume2和Flume3,采用FailoverSinkProcessor,實作故障轉移的功能。

Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

準備工作,編寫配置

對應3個配置檔案

編寫配置 /opt/module/flume/datas/06-flume-netcat-flume.conf

【注意】:

source 資料源輸入用netcat來模拟内容輸入,直覺易了解

sink 寫出對應2個sink,也就是後面再用2個agent中的source來接資料

[email protected]:/opt/module/flume/datas >vim 06-flume-netcat-flume.conf
# Name the components on this agent
bigdata01.sources = r1
bigdata01.channels = c1
bigdata01.sinkgroups = g1
bigdata01.sinks = k1 k2

# Describe/configure the source
bigdata01.sources.r1.type = netcat
bigdata01.sources.r1.bind = localhost
bigdata01.sources.r1.port = 44444

bigdata01.sinkgroups.g1.processor.type = failover
bigdata01.sinkgroups.g1.processor.priority.k1 = 5
bigdata01.sinkgroups.g1.processor.priority.k2 = 10
bigdata01.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
bigdata01.sinks.k1.type = avro
bigdata01.sinks.k1.hostname = ops01
bigdata01.sinks.k1.port = 44441

bigdata01.sinks.k2.type = avro
bigdata01.sinks.k2.hostname = ops01
bigdata01.sinks.k2.port = 44442

# Describe the channel
bigdata01.channels.c1.type = memory
bigdata01.channels.c1.capacity = 1000
bigdata01.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata01.sources.r1.channels = c1
bigdata01.sinkgroups.g1.sinks = k1 k2
bigdata01.sinks.k1.channel = c1
bigdata01.sinks.k2.channel = c1
           

編寫配置 /opt/module/flume/datas/06-flume-flume-console1.conf

[email protected]:/opt/module/flume/datas >vim 06-flume-flume-console1.conf
# Name the components on this agent
bigdata02.sources = r1
bigdata02.sinks = k1
bigdata02.channels = c1

# Describe/configure the source
bigdata02.sources.r1.type = avro
bigdata02.sources.r1.bind = ops01
bigdata02.sources.r1.port = 44441

# Describe the sink
bigdata02.sinks.k1.type = logger

# Describe the channel
bigdata02.channels.c1.type = memory
bigdata02.channels.c1.capacity = 1000
bigdata02.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata02.sources.r1.channels = c1
bigdata02.sinks.k1.channel = c1
           

編寫配置 /opt/module/flume/datas/06-flume-flume-console2.conf

[email protected]:/opt/module/flume/datas >vim 06-flume-flume-console2.conf
# Name the components on this agent
bigdata03.sources = r1
bigdata03.sinks = k1
bigdata03.channels = c2

# Describe/configure the source
bigdata03.sources.r1.type = avro
bigdata03.sources.r1.bind = ops01
bigdata03.sources.r1.port = 44442

# Describe the sink
bigdata03.sinks.k1.type = logger

# Describe the channel
bigdata03.channels.c2.type = memory
bigdata03.channels.c2.capacity = 1000
bigdata03.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata03.sources.r1.channels = c2
bigdata03.sinks.k1.channel = c2
           

【注意】:

bigdata01 -> 監聽44444端口,netcat發來的資料;

bigdata02 -> 監聽44441端口,bigdata01發來的資料

bigdata03 -> 監聽44442端口,bigdata01發來的資料

bigdata02和bigdata03 隻會有一個能接收到,當接收到的那個服務異常,另一個才會接到資料;目的是測試是否能自動成功跳轉到新的端口上繼續接收

啟動agent

【注意】:啟動多個會話視窗,讓每個agent持續運作。

agent-bigdata03

flume-ng agent --conf conf/ --name bigdata03 --conf-file datas/06-flume-flume-console2.conf -Dflume.root.logger=INFO,console

[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata03 --conf-file datas/06-flume-flume-console2.conf -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata03 --conf-file datas/06-flume-flume-console2.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-04-25 09:42:05,310 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2021-04-25 09:42:05,315 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:138)] Reloading configuration file:datas/06-flume-flume-console2.conf
2021-04-25 09:42:05,322 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:05,323 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:05,323 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:05,323 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1
2021-04-25 09:42:05,324 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1117)] Added sinks: k1 Agent: bigdata03
2021-04-25 09:42:05,324 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1
2021-04-25 09:42:05,324 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c2
2021-04-25 09:42:05,324 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c2
2021-04-25 09:42:05,324 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c2
2021-04-25 09:42:05,324 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:05,325 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'bigdata03' has no configfilters.
2021-04-25 09:42:05,345 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:163)] Post-validation flume configuration contains configuration for agents: [bigdata03]
2021-04-25 09:42:05,345 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:151)] Creating channels
2021-04-25 09:42:05,352 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c2 type memory
2021-04-25 09:42:05,360 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c2
2021-04-25 09:42:05,362 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type avro
2021-04-25 09:42:05,374 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2021-04-25 09:42:05,377 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:120)] Channel c2 connected to [r1, k1]
2021-04-25 09:42:05,381 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:162)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: ops01, port: 44442 } }} sinkRunners:{k1=SinkRunner: { policy:o[email protected] counterGroup:{ name:null counters:{} } }} channels:{c2=org.apache.flume.channel.MemoryChannel{name: c2}} }
2021-04-25 09:42:05,383 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c2
2021-04-25 09:42:05,441 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c2: Successfully registered new MBean.
2021-04-25 09:42:05,441 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c2 started
2021-04-25 09:42:05,441 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2021-04-25 09:42:05,442 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2021-04-25 09:42:05,442 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r1: { bindAddress: ops01, port: 44442 }...
2021-04-25 09:42:05,776 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2021-04-25 09:42:05,776 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2021-04-25 09:42:05,778 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r1 started.
2021-04-25 09:43:33,977 (New I/O server boss #17) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x168ca9eb, /11.8.37.50:50586 => /11.8.37.50:44442] OPEN
2021-04-25 09:43:33,978 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x168ca9eb, /11.8.37.50:50586 => /11.8.37.50:44442] BOUND: /11.8.37.50:44442
2021-04-25 09:43:33,978 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x168ca9eb, /11.8.37.50:50586 => /11.8.37.50:44442] CONNECTED: /11.8.37.50:50586
           

agent-bigdata02

flume-ng agent --conf conf/ --name bigdata02 --conf-file datas/06-flume-flume-console1.conf -Dflume.root.logger=INFO,console

[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata02 --conf-file datas/06-flume-flume-console1.conf -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata02 --conf-file datas/06-flume-flume-console1.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-04-25 09:42:28,068 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2021-04-25 09:42:28,074 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:138)] Reloading configuration file:datas/06-flume-flume-console1.conf
2021-04-25 09:42:28,081 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:28,082 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:28,082 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1
2021-04-25 09:42:28,082 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1
2021-04-25 09:42:28,082 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1
2021-04-25 09:42:28,082 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1
2021-04-25 09:42:28,083 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:28,083 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1
2021-04-25 09:42:28,083 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1117)] Added sinks: k1 Agent: bigdata02
2021-04-25 09:42:28,083 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1
2021-04-25 09:42:28,083 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'bigdata02' has no configfilters.
2021-04-25 09:42:28,104 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:163)] Post-validation flume configuration contains configuration for agents: [bigdata02]
2021-04-25 09:42:28,105 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:151)] Creating channels
2021-04-25 09:42:28,113 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
2021-04-25 09:42:28,121 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1
2021-04-25 09:42:28,123 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type avro
2021-04-25 09:42:28,135 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2021-04-25 09:42:28,138 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:120)] Channel c1 connected to [r1, k1]
2021-04-25 09:42:28,142 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:162)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: ops01, port: 44441 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSin[email protected] counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2021-04-25 09:42:28,143 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2021-04-25 09:42:28,201 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2021-04-25 09:42:28,202 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2021-04-25 09:42:28,202 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2021-04-25 09:42:28,202 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2021-04-25 09:42:28,203 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r1: { bindAddress: ops01, port: 44441 }...
2021-04-25 09:42:28,544 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2021-04-25 09:42:28,544 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2021-04-25 09:42:28,546 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r1 started.
2021-04-25 09:43:33,777 (New I/O server boss #17) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xc58d6c06, /11.8.37.50:45068 => /11.8.37.50:44441] OPEN
2021-04-25 09:43:33,778 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xc58d6c06, /11.8.37.50:45068 => /11.8.37.50:44441] BOUND: /11.8.37.50:44441
2021-04-25 09:43:33,778 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xc58d6c06, /11.8.37.50:45068 => /11.8.37.50:44441] CONNECTED: /11.8.37.50:45068
           

agent-bigdata01

flume-ng agent --conf conf/ --name bigdata01 --conf-file datas/06-flume-netcat-flume.conf

[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata01 --conf-file datas/06-flume-netcat-flume.conf
Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/opt/module/hive) for Hive access
+ exec /usr/jdk1.8.0_131/bin/java -Xmx20m -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/tez/*:/opt/module/tez/lib/*:/opt/module/hive/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name bigdata01 --conf-file datas/06-flume-netcat-flume.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
           

場景實驗

# 檢視44444端口已經運作,bigdata01的agent正常
[email protected]:/opt/module/flume >netstat -tnlpu|grep 44444
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 127.0.0.1:44444         :::*                    LISTEN      6498/java           
[email protected]:/opt/module/flume >
[email protected]:/opt/module/flume >nc localhost 44444
wang
OK						# 看到ok後可以在控制台檢視目前是哪一個bigdata的agent接收到了資料,可以看到03接收到了Event
Only one rev
OK						# 繼續發送消息,依然是03接收到Event
now is 44442
OK						# bigdata03 agent運作正常會持續接收
44442 bigdata03			
OK						# 發完這條 44442 bigdata03 後  把03的agent停掉
new info
OK						# 檢視new info 是否被bigdata02成功接收
now is 44441
OK						# 驗證成功,成功轉移
44441 bigdata02
OK

           

bigdata03 (port 44442) 控制台:

2021-04-25 09:45:23,459 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67                                     wang }
2021-04-25 09:47:54,107 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4F 6E 6C 79 20 6F 6E 65 20 72 65 76             Only one rev }
2021-04-25 09:48:40,111 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6E 6F 77 20 69 73 20 34 34 34 34 32             now is 44442 }
2021-04-25 09:49:57,121 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 34 34 34 34 32 20 62 69 67 64 61 74 61 30 33    44442 bigdata03 }
stopping		# 停03
[email protected]:/opt/module/flume >netstat -tnlpu|grep 44442
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
[email protected]:/opt/module/flume >netstat -tnlpu|grep 44441
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 11.8.37.50:44441        :::*                    LISTEN      5988/java           
[email protected]:/opt/module/flume >
# 現在隻有44441端口在
           

bigdata02 (port 44441) 控制台:

2021-04-25 09:51:30,235 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6E 65 77 20 69 6E 66 6F                         new info }
2021-04-25 09:51:46,004 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6E 6F 77 20 69 73 20 34 34 34 34 31             now is 44441 }
2021-04-25 09:52:24,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 34 34 34 34 31 20 62 69 67 64 61 74 61 30 32    44441 bigdata02 }
           

最終,03異常的情況下,02可以正常的接收到資料

Flume使用案例7

場景:聚合

背景需求

  1. ops01上的Flume-1監控檔案/opt/module/group.log檔案内容變化。
  2. ops02上的Flume-2監控某一個44444端口的資料流。
  3. Flume-1與Flume-2将資料都發送給ops03上的Flume-3,Flume-3将最終資料列印到控制台。
    Flume簡介及Flume部署、原理和使用介紹Flume簡介及Flume部署、原理和使用介紹

【注意1】:

需要3台機器; 都已經部署了flume,例如ops01 / ops02 / ops03 指的是分别在3台伺服器部署了flume

【注意2】:

部署好flume,需要在/etc/hosts 都配置hosts解析

例如:tail -5 /etc/hosts

11.8.37.50 ops01

11.8.36.63 ops02

11.8.36.76 ops03

【注意3】:

現在配置檔案和之前案例不一樣,在哪個機器啟動agent需要在哪個對應伺服器上對應路徑編寫配置

準備工作,編寫配置

ops01 /opt/module/flume/datas/

vim 07-flume1-logger-flume.conf

# Name the components on this agent
bigdata01.sources = r1
bigdata01.sinks = k1
bigdata01.channels = c1

# Describe/configure the source
bigdata01.sources.r1.type = exec
bigdata01.sources.r1.command = tail -F /opt/module/group.log
bigdata01.sources.r1.shell = /bin/bash -c

# Describe the sink
bigdata01.sinks.k1.type = avro
bigdata01.sinks.k1.hostname = ops03
bigdata01.sinks.k1.port = 44441

# Describe the channel
bigdata01.channels.c1.type = memory
bigdata01.channels.c1.capacity = 1000
bigdata01.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata01.sources.r1.channels = c1
bigdata01.sinks.k1.channel = c1
           

ops02 /opt/module/flume/datas/

vim 07-flume2-netcat-flume.conf

# Name the components on this agent
bigdata02.sources = r1
bigdata02.sinks = k1
bigdata02.channels = c1

# Describe/configure the source
bigdata02.sources.r1.type = netcat
bigdata02.sources.r1.bind = ops02
bigdata02.sources.r1.port = 44444

# Describe the sink
bigdata02.sinks.k1.type = avro
bigdata02.sinks.k1.hostname = ops03
bigdata02.sinks.k1.port = 44441

# Use a channel which buffers events in memory
bigdata02.channels.c1.type = memory
bigdata02.channels.c1.capacity = 1000
bigdata02.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata02.sources.r1.channels = c1
bigdata02.sinks.k1.channel = c1
           

ops03 /opt/module/flume/datas/

vim 07-flume3-flume-logger.conf

# Name the components on this agent
bigdata03.sources = r1
bigdata03.sinks = k1
bigdata03.channels = c1

# Describe/configure the source
bigdata03.sources.r1.type = avro
bigdata03.sources.r1.bind = ops03
bigdata03.sources.r1.port = 44441

# Describe the sink
# Describe the sink
bigdata03.sinks.k1.type = logger

# Describe the channel
bigdata03.channels.c1.type = memory
bigdata03.channels.c1.capacity = 1000
bigdata03.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
bigdata03.sources.r1.channels = c1
bigdata03.sinks.k1.channel = c1
           

啟動agent

啟動順序一般都是從後到前,接收方先啟動;示例較多,這裡就隻貼指令省去info輸出了

# ops03伺服器,bigdata03,需要看最終展示效果,增加-Dflume.root.logger=INFO,console
[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata03 --conf-file datas/07-flume3-flume-logger.conf -Dflume.root.logger=INFO,console
# ops01伺服器,bigdata01 ,對應測試路徑下檔案内容變化
[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata01 --conf-file datas/07-flume1-logger-flume.conf
# ops02伺服器,bigdata02,對應測試某端口流資料變化
[email protected]:/opt/module/flume >flume-ng agent --conf conf/ --name bigdata02 --conf-file datas/07-flume2-netcat-flume.conf
           

場景實驗

# ops01,再打開一個會話視窗
[email protected]:/opt/module >cd /opt/module
[email protected]:/opt/module >echo 'wangt' >> group.log
[email protected]:/opt/module >echo 'wangt' >> group.log
[email protected]:/opt/module >echo 'wangt' >> group.log
[email protected]:/opt/module >echo 'wangt' >> group.log
[email protected]:/opt/module >echo 'wangt' >> group.log

# 對應檢視ops03 agent控制台輸出
2021-04-25 11:33:28,745 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67 74                                  wangt }
2021-04-25 11:33:32,746 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67 74                                  wangt }
2021-04-25 11:33:32,746 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67 74                                  wangt }
2021-04-25 11:33:32,746 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67 74                                  wangt }
2021-04-25 11:33:32,747 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67 74                                  wangt }

# ops02,再打開一個會話視窗
[email protected]:/opt/module >netstat -tnlpu|grep 44444
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 11.8.36.63:44444        :::*                    LISTEN      62200/java          
[email protected]:/opt/module >telnet ops02 44444
Trying 11.8.36.63...
Connected to ops02.
Escape character is '^]'.
wangt
OK
ops02
OK
66666 
OK
telnet test
OK

# 對應檢視ops03 agent控制台輸出
2021-04-25 11:34:19,491 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 61 6E 67 74 0D                               wangt. }
2021-04-25 11:34:26,318 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6F 70 73 30 32 0D                               ops02. }
2021-04-25 11:34:35,320 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 36 36 36 36 36 0D                               66666. }
2021-04-25 11:34:46,523 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 6C 6E 65 74 20 74 65 73 74 0D             telnet test. }
           

最終,資料成功聚合到07-flume3-flume-logger.conf 定義的bigdata03上。