日志即log,記錄發生的事件。以Nginx為例,有error_log和access_log 2個日志。access_log是通路日志,每條通路記錄會産生幾百位元組的資料,随着通路量增加,日志檔案會越來越大,必須定期清理日志。
現在資料越來越重要,是以不能簡單丢棄,要儲存這些資料做更多資料分析。可以将資料儲存到HDFS系統上,Flume是一個資料搬運軟體,它擴充了很多功能,支援很多資料源。不編寫代碼利用Flume就可以搭建一個将log儲存到HDFS的可靠系統。
一、Flume 元件
- Source 采集資訊源
- Channel 消息緩存隊列
- Sink 從緩存隊列中拉取消息,并處理。
消息 Record,Source封裝Event(事件)成為Record對象,并儲存到Channel中,Sink拉取Record并儲存到目标系統中。
Sink處理完成之後,會向Channel發送确認消息,提供消息處理的可靠性。
因為Flume是一個大資料元件,在剛接觸的時候犯了思維慣性錯誤,以為Source、Channel、Sink是部署在不同主機上的。如圖一個Agent包括了三個元件,運作在一台主機上,準确的說一個JVM程序。常見的Source是agent可監聽的檔案夾、檔案,Sink是hdfs。
二、配置檔案
LogAgent.sources = mysource
LogAgent.channels = mychannel
LogAgent.sinks = mysink
LogAgent.sources.mysource.type = spooldir
LogAgent.sources.mysource.channels = mychannel
LogAgent.sources.mysource.spoolDir =/Users/wangsen/hadoop/apache-flume-1.7.0-bin/conf_copy
LogAgent.sinks.mysink.channel = mychannel
LogAgent.sinks.mysink.type = hdfs
LogAgent.sinks.mysink.hdfs.path = hdfs://namenode:9000/data/logs2/
LogAgent.sinks.mysink.hdfs.rollInterval = 30
LogAgent.sinks.mysink.hdfs.batchSize = 10000
LogAgent.sinks.mysink.hdfs.rollSize = 0
LogAgent.sinks.mysink.hdfs.rollCount = 10000
LogAgent.sinks.mysink.hdfs.fileType = DataStream
LogAgent.sinks.mysink.hdfs.useLocalTimeStamp = true
LogAgent.channels.mychannel.type = memory
LogAgent.channels.mychannel.capacity = 10000
LogAgent.channels.mychannel.transactionCapacity = 10000
運作flume
bin/flume-ng agent --conf conf --conf-file conf/logagent.properties --name LogAgent -Dflume.root.logger=DEBUG,console
三、注意事項
1. sinks.mysink.hdfs.batchSize 和channels.mychannel.transactionCapacity
process failed
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or in creasing thread count
如果sink.batchSize 大于 transactionCapacity:channel的處理能力被占滿,得不到sink的确認消息,因為沒有達到sink批處理數。
2.spooldir 監聽目錄中的檔案
spooldir監聽檔案目錄,當出現新檔案時,将新檔案轉化成事件。預設deseriallizer的值是LINE,檔案的每行封裝成一個Event。是以,在sink端也是按代表一行的Record進行處理。
3.hdfs sink 配置
hdfs.fileType = DataStream ##儲存檔案時不用壓縮
hdfs.rollCount = 10000 ##每個檔案記錄10000條Record,超過10000條分割檔案
hdfs.rollSize = 0 ## 不以檔案的大小分割
hdfs.batchSize = 10000 ## 批處理數,沒達到時儲存在.tmp檔案中
hdfs.rollInterval = 30 ##批處理逾時時間,将tmp檔案寫入到正式檔案,并送出确認。
四、實驗結果
源檔案夾:
drwxr-xr-x 2 wangsen staff 64 8 23 09:50 .flumespool
-rw-r--r-- 1 wangsen staff 1661 9 26 2016 flume-conf.properties.template.COMPLETED
-rw-r--r-- 1 wangsen staff 1455 9 26 2016 flume-env.ps1.template.COMPLETED
-rw-r--r-- 1 wangsen staff 1565 9 26 2016 flume-env.sh.template.COMPLETED
-rw-r--r-- 1 wangsen staff 3107 9 26 2016 log4j.properties.COMPLETED
-rw-r--r--@ 1 wangsen staff 778 8 23 09:49
logagent.properties.COMPLETED
處理成功資料,添加字尾.COMPLETED,此字尾可以在.properties檔案中設定。
HDFS:
-rw-r--r-- 3 root supergroup 8567 2018-08-23 09:50 /data/logs2/FlumeData.1534989021404
生成一個檔案,沒有超過10000行就儲存在一個檔案。檔案名稱可以在.properties檔案中配置。
總結
本文是Flume基本實驗,TailDir是一種更強大的目錄源Source,支援檔案級的監聽。通過設定Decoder可以檔案作為事件(不以Line為Event),實作檔案夾的同步。通過級聯方式,實作多個主機之間高可靠檔案/檔案夾同步。