天天看點

flume學習(五):Flume Channel Selectors使用

前幾篇文章隻有一個項目的日志,現在我們考慮多個項目的日志的收集,我拷貝了一份flumedemo項目,重命名為flumedemo2,添加了一個writelog2.java類,稍微改動了一下json字元串的輸出,将以前requesturl中的”reporter-api”改為了”image-api”,以便和writelog類的輸出稍微區分開來,如下:

package com.besttone.flume;  

import java.util.date;  

import org.apache.commons.logging.log;  

import org.apache.commons.logging.logfactory;  

public class writelog2 {  

    protected static final log logger = logfactory.getlog(writelog2.class);  

    /** 

     * @param args 

     * @throws interruptedexception 

     */  

    public static void main(string[] args) throws interruptedexception {  

        // todo auto-generated method stub  

        while (true) {  

            logger.info(new date().gettime());  

            logger.info(“{\”requesttime\”:”  

                    + system.currenttimemillis()  

                    + “,\”requestparams\”:{\”timestamp\”:1405499314238,\”phone\”:\”02038824941\”,\”cardname\”:\”測試商家名稱\”,\”provincecode\”:\”440000\”,\”citycode\”:\”440106\”},\”requesturl\”:\”/image-api/reporter/reporter12/init.do\”}”);  

            thread.sleep(2000);  

        }  

    }  

}  

現在有這麼一個需求描述:要求flumedemo的項目的log4j日志輸出到hdfs,而flumedemo2項目的log4j日志輸出到agent的log日志中。

我們還是采用log4jappender來配置log4j輸出給flume的souce,現在的需求明顯是有兩個sink了,一個sink為hdfs,一個sink為logger。于是現在的拓撲結構應該是這樣的:

flume學習(五):Flume Channel Selectors使用

需要實作這麼一個拓撲接口,就需要使用到channel selectors,讓不同的項目日志通過不同的channel到不同的sink中去。

官方文檔上channel selectors 有兩種類型:

replicating channel selector (default)

multiplexing channel selector

這兩種selector的差別是:replicating 會将source過來的events發往所有channel,而multiplexing 可以選擇該發往哪些channel。對于上面的例子來說,如果采用replicating ,那麼demo和demo2的日志會同時發往channel1和channel2,這顯然是和需求不符的,需求隻是讓demo的日志發往channel1,而demo2的日志發往channel2。

綜上所述,我們選擇multiplexing channel selector。這裡我們有遇到一個棘手的問題,multiplexing 需要判斷header裡指定key的值來決定分發到某個具體的channel,我們現在demo和demo2同時運作在同一個伺服器上,如果在不同的伺服器上運作,我們可以在 source1上加上一個 host 攔截器(上一篇有介紹過),這樣可以通過header中的host來判斷event該分發給哪個channel,而這裡是在同一個伺服器上,由host是區分不出來日志的來源的,我們必須想辦法在header中添加一個key來區分日志的來源。

設想一下,如果header中有一個key:flume.client.log4j.logger.source,我們通過設定這個key的值,demo設為app1,demo2設為app2,這樣我們就能通過設定:

tier1.sources.source1.channels=channel1 channel2

tier1.sources.source1.selector.type=multiplexing

tier1.sources.source1.selector.header=flume.client.log4j.logger.source

tier1.sources.source1.selector.mapping.app1=channel1

tier1.sources.source1.selector.mapping.app2=channel2

來将不同項目的的日志輸出到不同的channel了。

我們按照這個思路繼續下去,遇到了困難,log4jappender沒有這樣的參數來讓你設定。怎麼辦?翻看了一下log4jappender的源碼,發現可以很容易的實作擴充參數,于是我複制了一份log4jappender代碼,新加了一個類叫log4jextappender.java,裡面擴充了一個參數叫:source,代碼如下:

import java.io.bytearrayoutputstream;  

import java.io.ioexception;  

import java.nio.charset.charset;  

import java.util.hashmap;  

import java.util.map;  

import java.util.properties;  

import org.apache.avro.schema;  

import org.apache.avro.generic.genericrecord;  

import org.apache.avro.io.binaryencoder;  

import org.apache.avro.io.datumwriter;  

import org.apache.avro.io.encoderfactory;  

import org.apache.avro.reflect.reflectdata;  

import org.apache.avro.reflect.reflectdatumwriter;  

import org.apache.avro.specific.specificrecord;  

import org.apache.flume.event;  

import org.apache.flume.eventdeliveryexception;  

import org.apache.flume.flumeexception;  

import org.apache.flume.api.rpcclient;  

import org.apache.flume.api.rpcclientconfigurationconstants;  

import org.apache.flume.api.rpcclientfactory;  

import org.apache.flume.clients.log4jappender.log4javroheaders;  

import org.apache.flume.event.eventbuilder;  

import org.apache.log4j.appenderskeleton;  

import org.apache.log4j.helpers.loglog;  

import org.apache.log4j.spi.loggingevent;  

/** 

 *  

 * appends log4j events to an external flume client which is decribed by the 

 * log4j configuration file. the appender takes two required parameters: 

 * <p> 

 * <strong>hostname</strong> : this is the hostname of the first hop at which 

 * flume (through an avrosource) is listening for events. 

 * </p> 

 * <strong>port</strong> : this the port on the above host where the flume 

 * source is listening for events. 

 * a sample log4j properties file which appends to a source would look like: 

 * <pre> 

 * log4j.appender.out2 = org.apache.flume.clients.log4jappender.log4jappender 

 * log4j.appender.out2.port = 25430 

 * log4j.appender.out2.hostname = foobarflumesource.com 

 * log4j.logger.org.apache.flume.clients.log4jappender = debug,out2</p> 

 * </pre> 

 * <i>note: change the last line to the package of the class(es), that will do 

 * the appending.for example if classes from the package com.bar.foo are 

 * appending, the last line would be:</i> 

 * <p>log4j.logger.com.bar.foo = debug,out2</p> 

 */  

public class log4jextappender extends appenderskeleton {  

    private string hostname;  

    private int port;  

    private string source;  

    public string getsource() {  

        return source;  

    public void setsource(string source) {  

        this.source = source;  

    private boolean unsafemode = false;  

    private long timeout = rpcclientconfigurationconstants.default_request_timeout_millis;  

    private boolean avroreflectionenabled;  

    private string avroschemaurl;  

    rpcclient rpcclient = null;  

     * if this constructor is used programmatically rather than from a log4j 

     * conf you must set the <tt>port</tt> and <tt>hostname</tt> and then call 

     * <tt>activateoptions()</tt> before calling <tt>append()</tt>. 

    public log4jextappender() {  

     * sets the hostname and port. even if these are passed the 

     * <tt>activateoptions()</tt> function must be called before calling 

     * <tt>append()</tt>, else <tt>append()</tt> will throw an exception. 

     *  

     * @param hostname 

     *            the first hop where the client should connect to. 

     * @param port 

     *            the port to connect on the host. 

    public log4jextappender(string hostname, int port, string source) {  

        this.hostname = hostname;  

        this.port = port;  

     * append the loggingevent, to send to the first flume hop. 

     * @param event 

     *            the loggingevent to be appended to the flume. 

     * @throws flumeexception 

     *             if the appender was closed, or the hostname and port were not 

     *             setup, there was a timeout, or there was a connection error. 

    @override  

    public synchronized void append(loggingevent event) throws flumeexception {  

        // if rpcclient is null, it means either this appender object was never  

        // setup by setting hostname and port and then calling activateoptions  

        // or this appender object was closed by calling close(), so we throw an  

        // exception to show the appender is no longer accessible.  

        if (rpcclient == null) {  

            string errormsg = “cannot append to appender! appender either closed or”  

                    + ” not setup correctly!”;  

            loglog.error(errormsg);  

            if (unsafemode) {  

                return;  

            }  

            throw new flumeexception(errormsg);  

        if (!rpcclient.isactive()) {  

            reconnect();  

        // client created first time append is called.  

        map<string, string> hdrs = new hashmap<string, string>();  

        hdrs.put(log4javroheaders.logger_name.tostring(), event.getloggername());  

        hdrs.put(log4javroheaders.timestamp.tostring(),  

                string.valueof(event.timestamp));  

        // 添加日志來源  

        if (this.source == null || this.source.equals(“”)) {  

            this.source = “unknown”;  

        hdrs.put(“flume.client.log4j.logger.source”, this.source);  

        // to get the level back simply use  

        // loggerevent.tolevel(hdrs.get(integer.parseint(  

        // log4javroheaders.log_level.tostring()))  

        hdrs.put(log4javroheaders.log_level.tostring(),  

                string.valueof(event.getlevel().toint()));  

        event flumeevent;  

        object message = event.getmessage();  

        if (message instanceof genericrecord) {  

            genericrecord record = (genericrecord) message;  

            populateavroheaders(hdrs, record.getschema(), message);  

            flumeevent = eventbuilder.withbody(  

                    serialize(record, record.getschema()), hdrs);  

        } else if (message instanceof specificrecord || avroreflectionenabled) {  

            schema schema = reflectdata.get().getschema(message.getclass());  

            populateavroheaders(hdrs, schema, message);  

            flumeevent = eventbuilder  

                    .withbody(serialize(message, schema), hdrs);  

        } else {  

            hdrs.put(log4javroheaders.message_encoding.tostring(), “utf8″);  

            string msg = layout != null ? layout.format(event) : message  

                    .tostring();  

            flumeevent = eventbuilder.withbody(msg, charset.forname(“utf8″),  

                    hdrs);  

        try {  

            rpcclient.append(flumeevent);  

        } catch (eventdeliveryexception e) {  

            string msg = “flume append() failed.”;  

            loglog.error(msg);  

            throw new flumeexception(msg + ” exception follows.”, e);  

    private schema schema;  

    private bytearrayoutputstream out;  

    private datumwriter<object> writer;  

    private binaryencoder encoder;  

    protected void populateavroheaders(map<string, string> hdrs, schema schema,  

            object message) {  

        if (avroschemaurl != null) {  

            hdrs.put(log4javroheaders.avro_schema_url.tostring(), avroschemaurl);  

            return;  

        loglog.warn(“cannot find id for schema. adding header for schema, “  

                + “which may be inefficient. consider setting up an avro schema cache.”);  

        hdrs.put(log4javroheaders.avro_schema_literal.tostring(),  

                schema.tostring());  

    private byte[] serialize(object datum, schema datumschema)  

            throws flumeexception {  

        if (schema == null || !datumschema.equals(schema)) {  

            schema = datumschema;  

            out = new bytearrayoutputstream();  

            writer = new reflectdatumwriter<object>(schema);  

            encoder = encoderfactory.get().binaryencoder(out, null);  

        out.reset();  

            writer.write(datum, encoder);  

            encoder.flush();  

            return out.tobytearray();  

        } catch (ioexception e) {  

            throw new flumeexception(e);  

    // this function should be synchronized to make sure one thread  

    // does not close an appender another thread is using, and hence risking  

    // a null pointer exception.  

     * closes underlying client. if <tt>append()</tt> is called after this 

     * function is called, it will throw an exception. 

     *             if errors occur during close 

    public synchronized void close() throws flumeexception {  

        // any append calls after this will result in an exception.  

        if (rpcclient != null) {  

            try {  

                rpcclient.close();  

            } catch (flumeexception ex) {  

                loglog.error(“error while trying to close rpcclient.”, ex);  

                if (unsafemode) {  

                    return;  

                }  

                throw ex;  

            } finally {  

                rpcclient = null;  

            string errormsg = “flume log4jappender already closed!”;  

    public boolean requireslayout() {  

        // this method is named quite incorrectly in the interface. it should  

        // probably be called canuselayout or something. according to the docs,  

        // even if the appender can work without a layout, if it can work with  

        // one,  

        // this method must return true.  

        return true;  

     * set the first flume hop hostname. 

    public void sethostname(string hostname) {  

     * set the port on the hostname to connect to. 

    public void setport(int port) {  

    public void setunsafemode(boolean unsafemode) {  

        this.unsafemode = unsafemode;  

    public boolean getunsafemode() {  

        return unsafemode;  

    public void settimeout(long timeout) {  

        this.timeout = timeout;  

    public long gettimeout() {  

        return this.timeout;  

    public void setavroreflectionenabled(boolean avroreflectionenabled) {  

        this.avroreflectionenabled = avroreflectionenabled;  

    public void setavroschemaurl(string avroschemaurl) {  

        this.avroschemaurl = avroschemaurl;  

     * activate the options set using <tt>setport()</tt> and 

     * <tt>sethostname()</tt> 

     *             if the <tt>hostname</tt> and <tt>port</tt> combination is 

     *             invalid. 

    public void activateoptions() throws flumeexception {  

        properties props = new properties();  

        props.setproperty(rpcclientconfigurationconstants.config_hosts, “h1″);  

        props.setproperty(rpcclientconfigurationconstants.config_hosts_prefix  

                + “h1″, hostname + “:” + port);  

        props.setproperty(  

                rpcclientconfigurationconstants.config_connect_timeout,  

                string.valueof(timeout));  

                rpcclientconfigurationconstants.config_request_timeout,  

            rpcclient = rpcclientfactory.getinstance(props);  

            if (layout != null) {  

                layout.activateoptions();  

        } catch (flumeexception e) {  

            string errormsg = “rpc client creation failed! “ + e.getmessage();  

            throw e;  

     * make it easy to reconnect on failure 

    private void reconnect() throws flumeexception {  

        close();  

        activateoptions();  

然後然後将這個類打了一個jar包:log4jextappender.jar,扔在了flumedemo和flumedemo2的lib目錄下。

這時候flumedemo的log4j.properties如下:

log4j.rootlogger=info  

log4j.category.com.besttone=info,flume,console,logfile  

#log4j.appender.flume = org.apache.flume.clients.log4jappender.log4jextappender  

log4j.appender.flume = com.besttone.flume.log4jextappender  

log4j.appender.flume.hostname = localhost  

log4j.appender.flume.port = 44444  

log4j.appender.flume.unsafemode = false  

log4j.appender.flume.source = app1  

log4j.appender.console= org.apache.log4j.consoleappender  

log4j.appender.console.target= system.out  

log4j.appender.console.layout= org.apache.log4j.patternlayout  

log4j.appender.console.layout.conversionpattern= %d{yyyy-mm-dd hh:mm:ss} %5p %c{1}: %l – %m%n  

log4j.appender.logfile= org.apache.log4j.dailyrollingfileappender  

log4j.appender.logfile.file= logs/app.log  

log4j.appender.logfile.maxfilesize=10kb  

log4j.appender.logfile.append= true  

log4j.appender.logfile.threshold= debug  

log4j.appender.logfile.layout= org.apache.log4j.patternlayout  

log4j.appender.logfile.layout.conversionpattern= %-d{yyyy-mm-dd hh:mm:ss} [%t:%r] – [%5p] %m%n  

flumedemo2的如下:

log4j.appender.flume.source = app2  

将原來的log4j.appender.flume 由org.apache.flume.clients.log4jappender.log4jextappender改為了我重新實作添加了source參數的com.besttone.flume.log4jextappender

然後flumedemo的log4j.appender.flume.source = app1,flumedemo2的log4j.appender.flume.source = app2。

運作flumedemo的writelog類,和和flumedemo2的writelog2類,分别去hdfs上和agent的log檔案中看看内容,發現hdfs上都是app1的日志,log檔案中都是app2的日志,功能實作。

完整的flume.conf如下:

tier1.sources=source1  

tier1.channels=channel1 channel2  

tier1.sinks=sink1 sink2  

tier1.sources.source1.type=avro  

tier1.sources.source1.bind=0.0.0.0  

tier1.sources.source1.port=44444  

tier1.sources.source1.channels=channel1 channel2  

tier1.sources.source1.selector.type=multiplexing  

tier1.sources.source1.selector.header=flume.client.log4j.logger.source  

tier1.sources.source1.selector.mapping.app1=channel1  

tier1.sources.source1.selector.mapping.app2=channel2  

tier1.sources.source1.interceptors=i1 i2  

tier1.sources.source1.interceptors.i1.type=regex_filter  

tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  

tier1.sources.source1.interceptors.i2.type=timestamp  

tier1.channels.channel1.type=memory  

tier1.channels.channel1.capacity=10000  

tier1.channels.channel1.transactioncapacity=1000  

tier1.channels.channel1.keep-alive=30  

tier1.channels.channel2.type=memory  

tier1.channels.channel2.capacity=10000  

tier1.channels.channel2.transactioncapacity=1000  

tier1.channels.channel2.keep-alive=30  

tier1.sinks.sink1.type=hdfs  

tier1.sinks.sink1.channel=channel1  

tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d  

tier1.sinks.sink1.hdfs.round=true  

tier1.sinks.sink1.hdfs.roundvalue=10  

tier1.sinks.sink1.hdfs.roundunit=minute  

tier1.sinks.sink1.hdfs.filetype=datastream  

tier1.sinks.sink1.hdfs.writeformat=text  

tier1.sinks.sink1.hdfs.rollinterval=0  

tier1.sinks.sink1.hdfs.rollsize=10240  

tier1.sinks.sink1.hdfs.rollcount=0  

tier1.sinks.sink1.hdfs.idletimeout=60  

tier1.sinks.sink2.type=logger  

tier1.sinks.sink2.channel=channel2