flume寫hdfs的操作在HDFSEventSink.process方法中,路徑建立由BucketPath完成
分析其源碼(參考:http://caiguangguang.blog.51cto.com/1652935/1619539)
可以使用%{}變量替換的形式實作,隻需要擷取event中時間字段(nginx日志的local time)傳入hdfs.path即可
具體實作如下:
1.在KafkaSource的process方法中增加:
增加兩個頭部,分别用來記錄日志的day和hour
2.KafkaSourceUtil中的方法
因為我們的消息body是json的,是以用得了java的json-lib包,比如取消息的day:
2.hdfs.path設定頭即可
最終日志: