對于flume攔截器,我的了解是:在app(應用程式日志)和 source 之間的,對app日志進行攔截處理的。也即在日志進入到source之前,對日志進行一些包裝、清新過濾等等動作。
官方上提供的已有的攔截器有:
像很多java的開源項目如springmvc中的攔截器一樣,flume的攔截器也是chain形式的,可以對一個source指定多個攔截器,按先後順序依次處理。
下面舉例說明這些攔截器的用法,首先我們調整一下第一篇文章中的那個writelog類:
public class writelog {
protected static final log logger = logfactory.getlog(writelog.class);
/**
* @param args
* @throws interruptedexception
*/
public static void main(string[] args) throws interruptedexception {
// todo auto-generated method stub
while (true) {
logger.info(new date().gettime());
logger.info(“{\”requesttime\”:”
+ system.currenttimemillis()
+ “,\”requestparams\”:{\”timestamp\”:1405499314238,\”phone\”:\”02038824941\”,\”cardname\”:\”測試商家名稱\”,\”provincecode\”:\”440000\”,\”citycode\”:\”440106\”},\”requesturl\”:\”/reporter-api/reporter/reporter12/init.do\”}”);
thread.sleep(2000);
}
}
}
又多輸出了一行日志資訊,現在每次循環都會輸出兩行日志資訊,第一行是一個時間戳資訊,第二行是一行json格式的字元串資訊。
接下來我們用regex_filter和 timestamp這兩個攔截器來實作這樣一個功能:
1 過濾掉log4j輸出的第一行那個時間戳日志資訊,隻收集json格式的日志資訊
2 将收集的日志資訊儲存到hdfs上,每天的日志儲存到以該天命名的目錄下面,如2014-7-25号的日志,儲存到/flume/events/14-07-25目錄下面。
修改後的flume.conf如下:
tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1
tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1
tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp
tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactioncapacity=1000
tier1.channels.channel1.keep-alive=30
tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d
tier1.sinks.sink1.hdfs.filetype=datastream
tier1.sinks.sink1.hdfs.writeformat=text
tier1.sinks.sink1.hdfs.rollinterval=0
tier1.sinks.sink1.hdfs.rollsize=10240
tier1.sinks.sink1.hdfs.rollcount=0
tier1.sinks.sink1.hdfs.idletimeout=60
我們對source1添加了兩個攔截器i1和i2,i1為regex_filter,過濾的正則為\\{.*\\},注意正則的寫法用到了轉義字元,不然source1無法啟動,會報錯。
i2為timestamp,在header中添加了一個timestamp的key,然後我們修改了sink1.hdfs.path在後面加上了/%y-%m-%d這一串字元,這一串字元要求event的header中必須有timestamp這個key,這就是為什麼我們需要添加一個timestamp攔截器的原因,如果不添加這個攔截器,無法使用這樣的占位符,會報錯。還有很多占位符,請參考官方文檔。
然後運作writelog,去hdfs上檢視對應目錄下面的檔案,會發現内容隻有json字元串的日志,與我們的功能描述一緻。