天天看點

Flume 日志收集系統 Spooldir-Source HDFS-sink

日志即log,記錄發生的事件。以Nginx為例,有error_log和access_log 2個日志。access_log是通路日志,每條通路記錄會産生幾百位元組的資料,随着通路量增加,日志檔案會越來越大,必須定期清理日志。

現在資料越來越重要,是以不能簡單丢棄,要儲存這些資料做更多資料分析。可以将資料儲存到HDFS系統上,Flume是一個資料搬運軟體,它擴充了很多功能,支援很多資料源。不編寫代碼利用Flume就可以搭建一個将log儲存到HDFS的可靠系統。

一、Flume 元件

  • Source 采集資訊源
  • Channel 消息緩存隊列
  • Sink 從緩存隊列中拉取消息,并處理。

消息 Record,Source封裝Event(事件)成為Record對象,并儲存到Channel中,Sink拉取Record并儲存到目标系統中。

Sink處理完成之後,會向Channel發送确認消息,提供消息處理的可靠性。

因為Flume是一個大資料元件,在剛接觸的時候犯了思維慣性錯誤,以為Source、Channel、Sink是部署在不同主機上的。如圖一個Agent包括了三個元件,運作在一台主機上,準确的說一個JVM程序。常見的Source是agent可監聽的檔案夾、檔案,Sink是hdfs。

二、配置檔案

LogAgent.sources = mysource
LogAgent.channels = mychannel
LogAgent.sinks = mysink
LogAgent.sources.mysource.type = spooldir
LogAgent.sources.mysource.channels = mychannel
LogAgent.sources.mysource.spoolDir =/Users/wangsen/hadoop/apache-flume-1.7.0-bin/conf_copy
LogAgent.sinks.mysink.channel = mychannel
LogAgent.sinks.mysink.type = hdfs
LogAgent.sinks.mysink.hdfs.path = hdfs://namenode:9000/data/logs2/
LogAgent.sinks.mysink.hdfs.rollInterval = 30
LogAgent.sinks.mysink.hdfs.batchSize = 10000
LogAgent.sinks.mysink.hdfs.rollSize = 0
LogAgent.sinks.mysink.hdfs.rollCount = 10000
LogAgent.sinks.mysink.hdfs.fileType = DataStream
LogAgent.sinks.mysink.hdfs.useLocalTimeStamp = true
LogAgent.channels.mychannel.type = memory
LogAgent.channels.mychannel.capacity = 10000
LogAgent.channels.mychannel.transactionCapacity = 10000
           

運作flume

bin/flume-ng agent --conf conf --conf-file conf/logagent.properties --name LogAgent -Dflume.root.logger=DEBUG,console

三、注意事項

1. sinks.mysink.hdfs.batchSize 和channels.mychannel.transactionCapacity

process failed

org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or in creasing thread count

如果sink.batchSize 大于 transactionCapacity:channel的處理能力被占滿,得不到sink的确認消息,因為沒有達到sink批處理數。

2.spooldir 監聽目錄中的檔案

spooldir監聽檔案目錄,當出現新檔案時,将新檔案轉化成事件。預設deseriallizer的值是LINE,檔案的每行封裝成一個Event。是以,在sink端也是按代表一行的Record進行處理。

3.hdfs sink 配置

hdfs.fileType = DataStream ##儲存檔案時不用壓縮

hdfs.rollCount = 10000 ##每個檔案記錄10000條Record,超過10000條分割檔案

hdfs.rollSize = 0 ## 不以檔案的大小分割

hdfs.batchSize = 10000 ## 批處理數,沒達到時儲存在.tmp檔案中

hdfs.rollInterval = 30 ##批處理逾時時間,将tmp檔案寫入到正式檔案,并送出确認。

四、實驗結果

源檔案夾:

drwxr-xr-x 2 wangsen staff 64 8 23 09:50 .flumespool

-rw-r--r-- 1 wangsen staff 1661 9 26 2016 flume-conf.properties.template.COMPLETED

-rw-r--r-- 1 wangsen staff 1455 9 26 2016 flume-env.ps1.template.COMPLETED

-rw-r--r-- 1 wangsen staff 1565 9 26 2016 flume-env.sh.template.COMPLETED

-rw-r--r-- 1 wangsen staff 3107 9 26 2016 log4j.properties.COMPLETED

-rw-r--r--@ 1 wangsen staff 778 8 23 09:49

logagent.properties.COMPLETED

處理成功資料,添加字尾.COMPLETED,此字尾可以在.properties檔案中設定。

HDFS:

-rw-r--r-- 3 root supergroup 8567 2018-08-23 09:50 /data/logs2/FlumeData.1534989021404

生成一個檔案,沒有超過10000行就儲存在一個檔案。檔案名稱可以在.properties檔案中配置。

總結

本文是Flume基本實驗,TailDir是一種更強大的目錄源Source,支援檔案級的監聽。通過設定Decoder可以檔案作為事件(不以Line為Event),實作檔案夾的同步。通過級聯方式,實作多個主機之間高可靠檔案/檔案夾同步。