天天看點

flume 采集寫入hbase速度越來越慢_Flume入門(上)

flume 采集寫入hbase速度越來越慢_Flume入門(上)

Flume概述

Flume是一種基于流式架構的日志采集,聚合和傳輸系統。可以用來把Java EE平台的日志資料(比如實時推薦)傳輸給HDFS 主要目的:實時讀取伺服器日志,寫到HDFS中

架構

flume 采集寫入hbase速度越來越慢_Flume入門(上)

Agent:JVM程序。以事件(event)的形式把資料傳送到目的地,主要由三部分組成:source,channel,sink

:herb: source: 負責生産事件(接收資料)。Source元件可以處理各種類型的日志資料,包括avro, thrift, exec, jms netcat等等

:herb:channel:是source和sink中間的緩沖區,這樣兩端讀寫速度可以不同。同時channel是線程安全 ,可以對接多個source和sink。channel分為:Memory channel(記憶體存儲)或者File channel(磁盤中),Kafka channel

:herb: sink: 負責消費事件。不斷輪詢channel中的資料并且批量清除資料,并将這些資料寫入到存儲或者另一個flume。可以寫入hdfs,logger控制台,avro,file等等。

:herb: event :flume傳輸資料的形式,由 header(kv結構) + body(位元組數組)組成

Avro source: 對接多個flume Exec source

flume安裝

Flume官網位址

文檔檢視位址

下載下傳位址

将apache-flume-1.7.0-bin.tar.gz解壓到你的目錄下,為了友善可以改名成flume

修改flume/conf/flume-env.sh.template配置檔案

mv flume-env.sh.template flume-env.sh

然後修改檔案中的Java路徑變成自己的

export JAVA_HOME=/opt/module/jdk1.8.0_144

入門案例

case 1 監控端口

用flume從端口接收資料,列印到控制台 首先安裝下netcat

sudo yum install -y nc

在flume目錄下建立job檔案專門存配置檔案夾并進入job檔案夾,自定義一個配置檔案

vim flume-netcat-logger.conf

在檔案中寫入

# Name the components on this agent
# a1(agent)的source,sink, channel名稱
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#a1輸入源是端口
a1.sources.r1.type = netcat
#a1監聽主機
a1.sources.r1.bind = localhost
#a1端口号(你自己随便指定一個沒有被占用的)
a1.sources.r1.port = 44444

# Describe the sink
#輸出目的地是控制台logger類型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
#channel設定成再記憶體中存儲
a1.channels.c1.type = memory
#channel最大容量1000個event
a1.channels.c1.capacity = 1000
#channel收集到100條再送出事務
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
#把source 和 sink 連接配接到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

然後還要做兩件事:啟動flume和向4444端口發送資料 啟動flume

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

或者

bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

這裡說明下

  • --conf/-c : 配置檔案存儲位置
  • -name/-n : Agent的名字
  • --conf-file/-f : 配置檔案的地方
  • -Dflume.root.logger : flume動态修改flume.root.logger參數屬性值,并将控制台日志列印級别設定為INFO級别。日志級别包括:log、info、warn、error。

向4444端口發送資料

nc localhost 4444 hello

Flume監聽頁面收到了消息

case 2 監控單個追加檔案

目标:監控Hive日志檔案,并上傳到HDFS

Flume要想将資料輸出到HDFS,須持有Hadoop相關jar包 (在resource下的flume-hadoop-jar.rar),解壓後放在flume/lib檔案夾下。

在job下建立配置檔案

vim flume-file-hdfs.conf

然後添加

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
#定義source類型為exec可執行指令
a2.sources.r2.type = exec
#指令
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c 執行shell腳本的路徑

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳檔案的字首
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動檔案夾
a2.sinks.k2.hdfs.round = true
#多少時間機關建立一個新的檔案夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間機關
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設定檔案類型,可支援壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的檔案(秒)
a2.sinks.k2.hdfs.rollInterval = 30
#設定每個檔案的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#檔案的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
           

打開flume

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

把叢集、Hive打開

在hive上随便進行些操作後檢視hdfs下面的flume檔案

flume 采集寫入hbase速度越來越慢_Flume入門(上)

case 3 實時監控目錄下多個新檔案

監聽整個目錄的檔案變化,上傳到HDFS

建立配置檔案

vim flume-dir-hdfs.conf

然後添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
#監控目錄
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結尾的檔案,不上傳。上傳完成的檔案會以.COMPLETED結尾
a3.sources.r3.ignorePattern = ([^ ]*.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
#上傳到HDFS的路徑
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳檔案的字首
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動檔案夾
a3.sinks.k3.hdfs.round = true
#多少時間機關建立一個新的檔案夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間機關
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設定檔案類型,可支援壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的檔案
a3.sinks.k3.hdfs.rollInterval = 60
#設定每個檔案的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#檔案的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
           

case 4 追加檔案監控

case 3中監控的是檔案名字,如果你向一個檔案中追加内容是不會被記錄下來的。如果想記錄追加檔案監控,需要使用taildir類别的source

用positionfile 記錄位置檔案,flume中途關機,再啟動也會記錄

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
# positionfile的位置
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1
# 監控的檔案目錄
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳檔案的字首
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動檔案夾
a3.sinks.k3.hdfs.round = true
#多少時間機關建立一個新的檔案夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間機關
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設定檔案類型,可支援壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的檔案
a3.sinks.k3.hdfs.rollInterval = 60
#設定每個檔案的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#檔案的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
           

hdfs.rollInterval 滾動生成新檔案 預設30秒 hdfs.rollsize hdfs.rollcount hdfs.round 檔案夾滾動