一、簡介
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支援在日志系統中定制各類資料發送方,用于收集資料;同時,Flume提供對資料進行簡單處理,并寫到各種資料接受方(可定制)的能力。
二、Flume下載下傳
官網:http://flume.apache.org
下載下傳:http://www.apache.org/dist/flume
#遠端下載下傳 版本1.8
wget http://www.apache.org/dist/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
三、Flume安裝
1、解壓壓縮包
tar -zxvf apache-flume-1.8.0-bin.tar.gz
2、配置環境變量
#配置目前使用者環境變量
vi ~/.bash_profile
#在檔案中添加如下指令,記得切換自己檔案路徑
export FLUME_HOME=/home/app/apache-flume-1.8.0
export PATH=$FLUME_HOME/bin:$PATH
#立即生效
source ~/.bash_profile
四、偵聽網絡端口資料
1、新增flume-port.conf檔案
vi flume-port.conf
2、編輯flume-port.conf檔案
#####################################################################
#定義了目前agent的名字叫做a1
##定了該agent中的sources元件叫做r1
a1.sources = r1
##定了該agent中的sinks元件叫做k1
a1.sinks = k1
##定了該agent中的channels元件叫做c1
a1.channels = c1
#####################################################################
# 監聽資料源的方式,這裡采用監聽網絡端口
#source的類型為網絡位元組流
a1.sources.r1.type = netcat
#source監聽的網絡的hostname
a1.sources.r1.bind = woniu2
#source監聽的網絡的port
a1.sources.r1.port = 9527
#####################################################################
# 采集的資料的下沉(落地)方式 通過日志
#sink的類型為logger日志方式,log4j的級别有INFO、Console、file
a1.sinks.k1.type = logger
#####################################################################
# 描述channel的部分,使用記憶體做資料的臨時存儲
#channel的類型使用記憶體進行資料緩存,這是最常見的一種channel
a1.channels.c1.type = memory
#定義了channel對的容量
a1.channels.c1.capacity = 1000
#定義channel的最大的事務容量
a1.channels.c1.transactionCapacity = 100
#####################################################################
# 使用channel将source和sink連接配接起來
# 需要将source和sink使用channel連接配接起來,組成一個類似流水管道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、啟動flume
./flume-ng agent -c /home/app/apache-flume-1.8.0/conf -n a1 \
-f /home/app/apache-flume-1.8.0/conf/flume-port.conf -Dflume.root.logger=INFO,console
4、安裝telnet
#安裝telnet-server
yum install -y telnet-server.x86_64
#安裝telnet
yum install -y telnet.x86_64
5、發送資料
#連接配接
telnet woniu2 9527
6、測試案例
五、偵聽新增檔案
1、新增flume-file.conf檔案
vi flume-file.conf
2、編輯flume-file.conf檔案
#####################################################################
#定義了目前agent的名字叫做a1
##定了該agent中的sources元件叫做r1
a1.sources = r1
##定了該agent中的sinks元件叫做k1
a1.sinks = k1
##定了該agent中的channels元件叫做c1
a1.channels = c1
#####################################################################
# 監聽資料源的方式,這裡采用監聽目錄中的新增檔案
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/data/flume
a1.sources.r1.fileSuffix = .woniu
# a1.sources.r1.deletePolicy = immediate
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true
#####################################################################
# 采集的資料的下沉(落地)方式 通過日志
#sink的類型為logger日志方式,log4j的級别有INFO、Console、file
a1.sinks.k1.type = logger
#####################################################################
# 描述channel的部分,使用記憶體做資料的臨時存儲
#channel的類型使用記憶體進行資料緩存,這是最常見的一種channel
a1.channels.c1.type = memory
#定義了channel對的容量
a1.channels.c1.capacity = 1000
#定義channel的最大的事務容量
a1.channels.c1.transactionCapacity = 100
#####################################################################
# 使用channel将source和sink連接配接起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、啟動flume
./flume-ng agent -c /home/app/apache-flume-1.8.0/conf -n a1 \
-f /home/app/apache-flume-1.8.0/conf/flume-file.conf -Dflume.root.logger=INFO,console
4、測試案例
六、監聽檔案中的新增資料
1、建立目錄
存放檢查點資料:/home/data/flume/checkpoint
存放channel的資料:/home/data/flume/data
2、新增flume-content.conf檔案
vi flume-content.conf
3、編輯flume-file.conf檔案
#####################################################################
#定義了目前agent的名字叫做a1
##定了該agent中的sources元件叫做r1
a1.sources = r1
##定了該agent中的sinks元件叫做k1
a1.sinks = k1
##定了該agent中的channels元件叫做c1
a1.channels = c1
#####################################################################
# 監聽資料源的方式,這裡監聽檔案中的新增資料
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/data/flume/woniu.log
#####################################################################
# 采集的資料的下沉(落地)方式 通過日志
#sink的類型為logger日志方式,log4j的級别有INFO、Console、file
a1.sinks.k1.type = logger
#####################################################################
# 描述channel的部分,使用記憶體做資料的臨時存儲
a1.channels.c1.type = file
# 存放檢查點資料
a1.channels.c1.checkpointDir = /home/data/flume/checkpoint
#定義channel的最大的事務容量
a1.channels.c1.transactionCapacity = 1000000
# 存放channel的資料
a1.channels.c1.dataDirs = /home/data/flume/data
#####################################################################
# 使用channel将source和sink連接配接起來
# 需要将source和sink使用channel連接配接起來,組成一個類似流水管道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、啟動flume
./flume-ng agent -c /home/app/apache-flume-1.8.0/conf -n a1 \
-f /home/app/apache-flume-1.8.0/conf/flume-content.conf -Dflume.root.logger=INFO,console
5、寫資料
echo hello woniu >> woniu.log
6、測試案例
七、flume資料儲存到HDFS
1、安裝hadoop
https://blog.csdn.net/u011374856/article/details/103310627
2、建立目錄
存放檢查點資料:/home/data/flume/checkpoint
存放channel的資料:/home/data/flume/data
3、新增flume-hdfs.conf檔案
vi flume-hdfs.conf
4、編輯flume-hdfs.conf檔案
#####################################################################
#定義了目前agent的名字叫做a1
##定了該agent中的sources元件叫做r1
a1.sources = r1
##定了該agent中的sinks元件叫做k1
a1.sinks = k1
##定了該agent中的channels元件叫做c1
a1.channels = c1
#####################################################################
# 監聽資料源的方式,這裡監聽檔案中的新增資料
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/data/flume/woniu.log
#####################################################################
# 采集的資料的下沉(落地)方式 存儲到hdfs
a1.sinks.k1.type = hdfs
#存儲路徑
a1.sinks.k1.hdfs.path = hdfs://10.0.7.62:6001/flume/%Y%m%d
# 檔案生成後的字首
a1.sinks.k1.hdfs.filePrefix = ttpark
# 檔案生成後的字尾
a1.sinks.k1.hdfs.fileSuffix = .log
# 檔案使用時的字首
a1.sinks.k1.hdfs.inUsePrefix = woniu.
# 檔案使用時的字尾
a1.sinks.k1.hdfs.inUseSuffix = .txt
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 預設為SequenceFile,檢視hdfs上的檔案時為序列化的
a1.sinks.k1.hdfs.fileType = DataStream
# 上面的要配置,這個也要配置,寫入的資料格式為文本内容
a1.sinks.k1.hdfs.writeFormat = Text
# 下面這個配置選項不加,那麼rollInterval rollSize rollCount是不會生效的
a1.sinks.k1.hdfs.minBlockReplicas = 1
#####################################################################
# 描述channel的部分,使用記憶體做資料的臨時存儲
a1.channels.c1.type = file
# 存放檢查點資料
a1.channels.c1.checkpointDir = /home/data/flume/checkpoint
#定義channel的最大的事務容量
a1.channels.c1.transactionCapacity = 1000000
# 存放channel的資料
a1.channels.c1.dataDirs = /home/data/flume/data
#####################################################################
# 使用channel将source和sink連接配接起來
# 需要将source和sink使用channel連接配接起來,組成一個類似流水管道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5、拷貝hadoop的jar包
#hadoop-common
cp /home/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1.jar $FLUME_HOME/lib/
#woodstox-core
#stax2-api
cp /home/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/stax-api-1.0-2.jar $FLUME_HOME/lib/
#commons-configuration
cp /home/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-configuration-1.6.jar $FLUME_HOME/lib/
#hadoop-auth
cp /home/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.15.1.jar $FLUME_HOME/lib/
#htrace-core4
cp /home/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib/
#hadoop-hdfs
cp /home/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1.jar $FLUME_HOME/lib/
6、啟動flume
./flume-ng agent -c /home/app/apache-flume-1.8.0/conf -n a1 \
-f /home/app/apache-flume-1.8.0/conf/flume-hdfs.conf -Dflume.root.logger=INFO,console
7、寫資料
echo hello woniu >> woniu.log
八、flume資料發送到Kafka
1、安裝kafka
https://blog.csdn.net/u011374856/article/details/103471001
2、新增flume-kafka.conf檔案
vi flume-kafka.conf
3、編輯flume-kafka.conf檔案
#####################################################################
#定義了目前agent的名字叫做a1
##定了該agent中的sources元件叫做r1
a1.sources = r1
##定了該agent中的sinks元件叫做k1
a1.sinks = k1
##定了該agent中的channels元件叫做c1
a1.channels = c1
#####################################################################
# 監聽資料源的方式,這裡監聽檔案中的新增資料
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/data/flume/woniu.log
#####################################################################
# 采集的資料的下沉(落地)方式 存儲到hdfs
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#kafka server的位址
a1.sinks.k1.brokerList = 10.0.7.62:9092
#topic
a1.sinks.k1.topic = woniu_topic_yao
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks = 1
#####################################################################
# 描述channel的部分,使用記憶體做資料的臨時存儲
a1.channels.c1.type = file
# 存放檢查點資料
a1.channels.c1.checkpointDir = /home/data/flume/checkpoint
#定義channel的最大的事務容量
a1.channels.c1.transactionCapacity = 1000000
# 存放channel的資料
a1.channels.c1.dataDirs = /home/data/flume/data
#####################################################################
# 使用channel将source和sink連接配接起來
# 需要将source和sink使用channel連接配接起來,組成一個類似流水管道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、啟動flume
./flume-ng agent -c /home/app/apache-flume-1.8.0/conf -n a1 \
-f /home/app/apache-flume-1.8.0/conf/flume-kafka.conf -Dflume.root.logger=INFO,console
5、寫資料
echo hello woniu >> woniu.log