天天看點

flume學習(四):Flume Interceptors的使用

對于flume攔截器,我的了解是:在app(應用程式日志)和 source 之間的,對app日志進行攔截處理的。也即在日志進入到source之前,對日志進行一些包裝、清新過濾等等動作。

官方上提供的已有的攔截器有:

像很多java的開源項目如springmvc中的攔截器一樣,flume的攔截器也是chain形式的,可以對一個source指定多個攔截器,按先後順序依次處理。

下面舉例說明這些攔截器的用法,首先我們調整一下第一篇文章中的那個writelog類:

public class writelog {  

    protected static final log logger = logfactory.getlog(writelog.class);  

    /** 

     * @param args 

     * @throws interruptedexception 

     */  

    public static void main(string[] args) throws interruptedexception {  

        // todo auto-generated method stub  

        while (true) {  

            logger.info(new date().gettime());  

            logger.info(“{\”requesttime\”:”  

                    + system.currenttimemillis()  

                    + “,\”requestparams\”:{\”timestamp\”:1405499314238,\”phone\”:\”02038824941\”,\”cardname\”:\”測試商家名稱\”,\”provincecode\”:\”440000\”,\”citycode\”:\”440106\”},\”requesturl\”:\”/reporter-api/reporter/reporter12/init.do\”}”);  

            thread.sleep(2000);  

        }  

    }  

}  

又多輸出了一行日志資訊,現在每次循環都會輸出兩行日志資訊,第一行是一個時間戳資訊,第二行是一行json格式的字元串資訊。

接下來我們用regex_filter和 timestamp這兩個攔截器來實作這樣一個功能:

1 過濾掉log4j輸出的第一行那個時間戳日志資訊,隻收集json格式的日志資訊

2 将收集的日志資訊儲存到hdfs上,每天的日志儲存到以該天命名的目錄下面,如2014-7-25号的日志,儲存到/flume/events/14-07-25目錄下面。

修改後的flume.conf如下:

tier1.sources=source1  

tier1.channels=channel1  

tier1.sinks=sink1  

tier1.sources.source1.type=avro  

tier1.sources.source1.bind=0.0.0.0  

tier1.sources.source1.port=44444  

tier1.sources.source1.channels=channel1  

tier1.sources.source1.interceptors=i1 i2  

tier1.sources.source1.interceptors.i1.type=regex_filter  

tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  

tier1.sources.source1.interceptors.i2.type=timestamp  

tier1.channels.channel1.type=memory  

tier1.channels.channel1.capacity=10000  

tier1.channels.channel1.transactioncapacity=1000  

tier1.channels.channel1.keep-alive=30  

tier1.sinks.sink1.type=hdfs  

tier1.sinks.sink1.channel=channel1  

tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d  

tier1.sinks.sink1.hdfs.filetype=datastream  

tier1.sinks.sink1.hdfs.writeformat=text  

tier1.sinks.sink1.hdfs.rollinterval=0  

tier1.sinks.sink1.hdfs.rollsize=10240  

tier1.sinks.sink1.hdfs.rollcount=0  

tier1.sinks.sink1.hdfs.idletimeout=60  

我們對source1添加了兩個攔截器i1和i2,i1為regex_filter,過濾的正則為\\{.*\\},注意正則的寫法用到了轉義字元,不然source1無法啟動,會報錯。

i2為timestamp,在header中添加了一個timestamp的key,然後我們修改了sink1.hdfs.path在後面加上了/%y-%m-%d這一串字元,這一串字元要求event的header中必須有timestamp這個key,這就是為什麼我們需要添加一個timestamp攔截器的原因,如果不添加這個攔截器,無法使用這樣的占位符,會報錯。還有很多占位符,請參考官方文檔。

然後運作writelog,去hdfs上檢視對應目錄下面的檔案,會發現内容隻有json字元串的日志,與我們的功能描述一緻。