天天看點

flume按照日志時間寫hdfs實作

flume寫hdfs的操作在HDFSEventSink.process方法中,路徑建立由BucketPath完成

分析其源碼(參考:http://caiguangguang.blog.51cto.com/1652935/1619539)

可以使用%{}變量替換的形式實作,隻需要擷取event中時間字段(nginx日志的local time)傳入hdfs.path即可

具體實作如下:

1.在KafkaSource的process方法中增加:

增加兩個頭部,分别用來記錄日志的day和hour

2.KafkaSourceUtil中的方法

因為我們的消息body是json的,是以用得了java的json-lib包,比如取消息的day:

2.hdfs.path設定頭即可

最終日志: