前幾篇文章隻有一個項目的日志,現在我們考慮多個項目的日志的收集,我拷貝了一份flumedemo項目,重命名為flumedemo2,添加了一個writelog2.java類,稍微改動了一下json字元串的輸出,将以前requesturl中的”reporter-api”改為了”image-api”,以便和writelog類的輸出稍微區分開來,如下:
package com.besttone.flume;
import java.util.date;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
public class writelog2 {
protected static final log logger = logfactory.getlog(writelog2.class);
/**
* @param args
* @throws interruptedexception
*/
public static void main(string[] args) throws interruptedexception {
// todo auto-generated method stub
while (true) {
logger.info(new date().gettime());
logger.info(“{\”requesttime\”:”
+ system.currenttimemillis()
+ “,\”requestparams\”:{\”timestamp\”:1405499314238,\”phone\”:\”02038824941\”,\”cardname\”:\”測試商家名稱\”,\”provincecode\”:\”440000\”,\”citycode\”:\”440106\”},\”requesturl\”:\”/image-api/reporter/reporter12/init.do\”}”);
thread.sleep(2000);
}
}
}
現在有這麼一個需求描述:要求flumedemo的項目的log4j日志輸出到hdfs,而flumedemo2項目的log4j日志輸出到agent的log日志中。
我們還是采用log4jappender來配置log4j輸出給flume的souce,現在的需求明顯是有兩個sink了,一個sink為hdfs,一個sink為logger。于是現在的拓撲結構應該是這樣的:
需要實作這麼一個拓撲接口,就需要使用到channel selectors,讓不同的項目日志通過不同的channel到不同的sink中去。
官方文檔上channel selectors 有兩種類型:
replicating channel selector (default)
multiplexing channel selector
這兩種selector的差別是:replicating 會将source過來的events發往所有channel,而multiplexing 可以選擇該發往哪些channel。對于上面的例子來說,如果采用replicating ,那麼demo和demo2的日志會同時發往channel1和channel2,這顯然是和需求不符的,需求隻是讓demo的日志發往channel1,而demo2的日志發往channel2。
綜上所述,我們選擇multiplexing channel selector。這裡我們有遇到一個棘手的問題,multiplexing 需要判斷header裡指定key的值來決定分發到某個具體的channel,我們現在demo和demo2同時運作在同一個伺服器上,如果在不同的伺服器上運作,我們可以在 source1上加上一個 host 攔截器(上一篇有介紹過),這樣可以通過header中的host來判斷event該分發給哪個channel,而這裡是在同一個伺服器上,由host是區分不出來日志的來源的,我們必須想辦法在header中添加一個key來區分日志的來源。
設想一下,如果header中有一個key:flume.client.log4j.logger.source,我們通過設定這個key的值,demo設為app1,demo2設為app2,這樣我們就能通過設定:
tier1.sources.source1.channels=channel1 channel2
tier1.sources.source1.selector.type=multiplexing
tier1.sources.source1.selector.header=flume.client.log4j.logger.source
tier1.sources.source1.selector.mapping.app1=channel1
tier1.sources.source1.selector.mapping.app2=channel2
來将不同項目的的日志輸出到不同的channel了。
我們按照這個思路繼續下去,遇到了困難,log4jappender沒有這樣的參數來讓你設定。怎麼辦?翻看了一下log4jappender的源碼,發現可以很容易的實作擴充參數,于是我複制了一份log4jappender代碼,新加了一個類叫log4jextappender.java,裡面擴充了一個參數叫:source,代碼如下:
import java.io.bytearrayoutputstream;
import java.io.ioexception;
import java.nio.charset.charset;
import java.util.hashmap;
import java.util.map;
import java.util.properties;
import org.apache.avro.schema;
import org.apache.avro.generic.genericrecord;
import org.apache.avro.io.binaryencoder;
import org.apache.avro.io.datumwriter;
import org.apache.avro.io.encoderfactory;
import org.apache.avro.reflect.reflectdata;
import org.apache.avro.reflect.reflectdatumwriter;
import org.apache.avro.specific.specificrecord;
import org.apache.flume.event;
import org.apache.flume.eventdeliveryexception;
import org.apache.flume.flumeexception;
import org.apache.flume.api.rpcclient;
import org.apache.flume.api.rpcclientconfigurationconstants;
import org.apache.flume.api.rpcclientfactory;
import org.apache.flume.clients.log4jappender.log4javroheaders;
import org.apache.flume.event.eventbuilder;
import org.apache.log4j.appenderskeleton;
import org.apache.log4j.helpers.loglog;
import org.apache.log4j.spi.loggingevent;
/**
*
* appends log4j events to an external flume client which is decribed by the
* log4j configuration file. the appender takes two required parameters:
* <p>
* <strong>hostname</strong> : this is the hostname of the first hop at which
* flume (through an avrosource) is listening for events.
* </p>
* <strong>port</strong> : this the port on the above host where the flume
* source is listening for events.
* a sample log4j properties file which appends to a source would look like:
* <pre>
* log4j.appender.out2 = org.apache.flume.clients.log4jappender.log4jappender
* log4j.appender.out2.port = 25430
* log4j.appender.out2.hostname = foobarflumesource.com
* log4j.logger.org.apache.flume.clients.log4jappender = debug,out2</p>
* </pre>
* <i>note: change the last line to the package of the class(es), that will do
* the appending.for example if classes from the package com.bar.foo are
* appending, the last line would be:</i>
* <p>log4j.logger.com.bar.foo = debug,out2</p>
*/
public class log4jextappender extends appenderskeleton {
private string hostname;
private int port;
private string source;
public string getsource() {
return source;
public void setsource(string source) {
this.source = source;
private boolean unsafemode = false;
private long timeout = rpcclientconfigurationconstants.default_request_timeout_millis;
private boolean avroreflectionenabled;
private string avroschemaurl;
rpcclient rpcclient = null;
* if this constructor is used programmatically rather than from a log4j
* conf you must set the <tt>port</tt> and <tt>hostname</tt> and then call
* <tt>activateoptions()</tt> before calling <tt>append()</tt>.
public log4jextappender() {
* sets the hostname and port. even if these are passed the
* <tt>activateoptions()</tt> function must be called before calling
* <tt>append()</tt>, else <tt>append()</tt> will throw an exception.
*
* @param hostname
* the first hop where the client should connect to.
* @param port
* the port to connect on the host.
public log4jextappender(string hostname, int port, string source) {
this.hostname = hostname;
this.port = port;
* append the loggingevent, to send to the first flume hop.
* @param event
* the loggingevent to be appended to the flume.
* @throws flumeexception
* if the appender was closed, or the hostname and port were not
* setup, there was a timeout, or there was a connection error.
@override
public synchronized void append(loggingevent event) throws flumeexception {
// if rpcclient is null, it means either this appender object was never
// setup by setting hostname and port and then calling activateoptions
// or this appender object was closed by calling close(), so we throw an
// exception to show the appender is no longer accessible.
if (rpcclient == null) {
string errormsg = “cannot append to appender! appender either closed or”
+ ” not setup correctly!”;
loglog.error(errormsg);
if (unsafemode) {
return;
}
throw new flumeexception(errormsg);
if (!rpcclient.isactive()) {
reconnect();
// client created first time append is called.
map<string, string> hdrs = new hashmap<string, string>();
hdrs.put(log4javroheaders.logger_name.tostring(), event.getloggername());
hdrs.put(log4javroheaders.timestamp.tostring(),
string.valueof(event.timestamp));
// 添加日志來源
if (this.source == null || this.source.equals(“”)) {
this.source = “unknown”;
hdrs.put(“flume.client.log4j.logger.source”, this.source);
// to get the level back simply use
// loggerevent.tolevel(hdrs.get(integer.parseint(
// log4javroheaders.log_level.tostring()))
hdrs.put(log4javroheaders.log_level.tostring(),
string.valueof(event.getlevel().toint()));
event flumeevent;
object message = event.getmessage();
if (message instanceof genericrecord) {
genericrecord record = (genericrecord) message;
populateavroheaders(hdrs, record.getschema(), message);
flumeevent = eventbuilder.withbody(
serialize(record, record.getschema()), hdrs);
} else if (message instanceof specificrecord || avroreflectionenabled) {
schema schema = reflectdata.get().getschema(message.getclass());
populateavroheaders(hdrs, schema, message);
flumeevent = eventbuilder
.withbody(serialize(message, schema), hdrs);
} else {
hdrs.put(log4javroheaders.message_encoding.tostring(), “utf8″);
string msg = layout != null ? layout.format(event) : message
.tostring();
flumeevent = eventbuilder.withbody(msg, charset.forname(“utf8″),
hdrs);
try {
rpcclient.append(flumeevent);
} catch (eventdeliveryexception e) {
string msg = “flume append() failed.”;
loglog.error(msg);
throw new flumeexception(msg + ” exception follows.”, e);
private schema schema;
private bytearrayoutputstream out;
private datumwriter<object> writer;
private binaryencoder encoder;
protected void populateavroheaders(map<string, string> hdrs, schema schema,
object message) {
if (avroschemaurl != null) {
hdrs.put(log4javroheaders.avro_schema_url.tostring(), avroschemaurl);
return;
loglog.warn(“cannot find id for schema. adding header for schema, “
+ “which may be inefficient. consider setting up an avro schema cache.”);
hdrs.put(log4javroheaders.avro_schema_literal.tostring(),
schema.tostring());
private byte[] serialize(object datum, schema datumschema)
throws flumeexception {
if (schema == null || !datumschema.equals(schema)) {
schema = datumschema;
out = new bytearrayoutputstream();
writer = new reflectdatumwriter<object>(schema);
encoder = encoderfactory.get().binaryencoder(out, null);
out.reset();
writer.write(datum, encoder);
encoder.flush();
return out.tobytearray();
} catch (ioexception e) {
throw new flumeexception(e);
// this function should be synchronized to make sure one thread
// does not close an appender another thread is using, and hence risking
// a null pointer exception.
* closes underlying client. if <tt>append()</tt> is called after this
* function is called, it will throw an exception.
* if errors occur during close
public synchronized void close() throws flumeexception {
// any append calls after this will result in an exception.
if (rpcclient != null) {
try {
rpcclient.close();
} catch (flumeexception ex) {
loglog.error(“error while trying to close rpcclient.”, ex);
if (unsafemode) {
return;
}
throw ex;
} finally {
rpcclient = null;
string errormsg = “flume log4jappender already closed!”;
public boolean requireslayout() {
// this method is named quite incorrectly in the interface. it should
// probably be called canuselayout or something. according to the docs,
// even if the appender can work without a layout, if it can work with
// one,
// this method must return true.
return true;
* set the first flume hop hostname.
public void sethostname(string hostname) {
* set the port on the hostname to connect to.
public void setport(int port) {
public void setunsafemode(boolean unsafemode) {
this.unsafemode = unsafemode;
public boolean getunsafemode() {
return unsafemode;
public void settimeout(long timeout) {
this.timeout = timeout;
public long gettimeout() {
return this.timeout;
public void setavroreflectionenabled(boolean avroreflectionenabled) {
this.avroreflectionenabled = avroreflectionenabled;
public void setavroschemaurl(string avroschemaurl) {
this.avroschemaurl = avroschemaurl;
* activate the options set using <tt>setport()</tt> and
* <tt>sethostname()</tt>
* if the <tt>hostname</tt> and <tt>port</tt> combination is
* invalid.
public void activateoptions() throws flumeexception {
properties props = new properties();
props.setproperty(rpcclientconfigurationconstants.config_hosts, “h1″);
props.setproperty(rpcclientconfigurationconstants.config_hosts_prefix
+ “h1″, hostname + “:” + port);
props.setproperty(
rpcclientconfigurationconstants.config_connect_timeout,
string.valueof(timeout));
rpcclientconfigurationconstants.config_request_timeout,
rpcclient = rpcclientfactory.getinstance(props);
if (layout != null) {
layout.activateoptions();
} catch (flumeexception e) {
string errormsg = “rpc client creation failed! “ + e.getmessage();
throw e;
* make it easy to reconnect on failure
private void reconnect() throws flumeexception {
close();
activateoptions();
然後然後将這個類打了一個jar包:log4jextappender.jar,扔在了flumedemo和flumedemo2的lib目錄下。
這時候flumedemo的log4j.properties如下:
log4j.rootlogger=info
log4j.category.com.besttone=info,flume,console,logfile
#log4j.appender.flume = org.apache.flume.clients.log4jappender.log4jextappender
log4j.appender.flume = com.besttone.flume.log4jextappender
log4j.appender.flume.hostname = localhost
log4j.appender.flume.port = 44444
log4j.appender.flume.unsafemode = false
log4j.appender.flume.source = app1
log4j.appender.console= org.apache.log4j.consoleappender
log4j.appender.console.target= system.out
log4j.appender.console.layout= org.apache.log4j.patternlayout
log4j.appender.console.layout.conversionpattern= %d{yyyy-mm-dd hh:mm:ss} %5p %c{1}: %l – %m%n
log4j.appender.logfile= org.apache.log4j.dailyrollingfileappender
log4j.appender.logfile.file= logs/app.log
log4j.appender.logfile.maxfilesize=10kb
log4j.appender.logfile.append= true
log4j.appender.logfile.threshold= debug
log4j.appender.logfile.layout= org.apache.log4j.patternlayout
log4j.appender.logfile.layout.conversionpattern= %-d{yyyy-mm-dd hh:mm:ss} [%t:%r] – [%5p] %m%n
flumedemo2的如下:
log4j.appender.flume.source = app2
将原來的log4j.appender.flume 由org.apache.flume.clients.log4jappender.log4jextappender改為了我重新實作添加了source參數的com.besttone.flume.log4jextappender
然後flumedemo的log4j.appender.flume.source = app1,flumedemo2的log4j.appender.flume.source = app2。
運作flumedemo的writelog類,和和flumedemo2的writelog2類,分别去hdfs上和agent的log檔案中看看内容,發現hdfs上都是app1的日志,log檔案中都是app2的日志,功能實作。
完整的flume.conf如下:
tier1.sources=source1
tier1.channels=channel1 channel2
tier1.sinks=sink1 sink2
tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1 channel2
tier1.sources.source1.selector.type=multiplexing
tier1.sources.source1.selector.header=flume.client.log4j.logger.source
tier1.sources.source1.selector.mapping.app1=channel1
tier1.sources.source1.selector.mapping.app2=channel2
tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp
tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactioncapacity=1000
tier1.channels.channel1.keep-alive=30
tier1.channels.channel2.type=memory
tier1.channels.channel2.capacity=10000
tier1.channels.channel2.transactioncapacity=1000
tier1.channels.channel2.keep-alive=30
tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d
tier1.sinks.sink1.hdfs.round=true
tier1.sinks.sink1.hdfs.roundvalue=10
tier1.sinks.sink1.hdfs.roundunit=minute
tier1.sinks.sink1.hdfs.filetype=datastream
tier1.sinks.sink1.hdfs.writeformat=text
tier1.sinks.sink1.hdfs.rollinterval=0
tier1.sinks.sink1.hdfs.rollsize=10240
tier1.sinks.sink1.hdfs.rollcount=0
tier1.sinks.sink1.hdfs.idletimeout=60
tier1.sinks.sink2.type=logger
tier1.sinks.sink2.channel=channel2