背景介绍
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理。
自定义拦截器
根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。
实现
本技术方案核心包括二部分:
① 编写java代码,自定义拦截器;
内容包括:
1.定义一个类CustomParameterInterceptor实现Interceptor接口。
2. 在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。
3.添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串。
4.写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。
5.接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。
6.定义一个静态类,类中封装MD5加密方法
7. 通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib中
package cn.itcast.interceptor;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.FIELD_SEPARATOR;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.DEFAULT_FIELD_SEPARATOR;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.INDEXS;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.DEFAULT_INDEXS;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.INDEXS_SEPARATOR;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.DEFAULT_INDEXS_SEPARATOR;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.ENCRYPTED_FIELD_INDEX;
import static cn.itcast.interceptor.CustomParameterInterceptor.Constants.DEFAULT_ENCRYPTED_FIELD_INDEX;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.google.common.base.Charsets;
/**
* @author lishas
*
*/
public class CustomParameterInterceptor implements Interceptor{
/** The field_separator.指明每一行字段的分隔符 */
private final String fields_separator;
/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
private final String indexs;
/** The indexs_separator. 多个下标的分隔符*/
private final String indexs_separator;
/** The encrypted_field_index. 需要加密的字段下标*/
private final String encrypted_field_index;
/**
*
* @param regex
* @param field_separator
* @param indexs
* @param indexs_separator
*/
public CustomParameterInterceptor( String fields_separator,
String indexs, String indexs_separator,String encrypted_field_index) {
String f = fields_separator.trim();
String i = indexs_separator.trim();
this.indexs = indexs;
this.encrypted_field_index=encrypted_field_index.trim();
if (!f.equals("")) {
f = UnicodeToString(f);
}
this.fields_separator =f;
if (!i.equals("")) {
i = UnicodeToString(i);
}
this.indexs_separator = i;
}
/*
*
* \t 制表符 ('\u0009') \n 新行(换行)符 (' ') \r 回车符 (' ') \f 换页符 ('\u000C') \a 报警
* (bell) 符 ('\u0007') \e 转义符 ('\u001B') \cx 空格(\u0020)对应于 x 的控制符
*
* @param str
* @return
* @data:2015-6-30
*/
public static String UnicodeToString(String str) {
Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
Matcher matcher = pattern.matcher(str);
char ch;
while (matcher.find()) {
ch = (char) Integer.parseInt(matcher.group(), );
str = str.replace(matcher.group(), ch + "");
}
return str;
}
/*
* @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
*/
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
String line = new String(event.getBody(), Charsets.UTF_8);
String[] fields_spilts = line.split(fields_separator);
String[] indexs_split = indexs.split(indexs_separator);
String newLine="";
for (int i = ; i < indexs_split.length; i++) {
int parseInt = Integer.parseInt(indexs_split[i]);
//对加密字段进行加密
if(!"".equals(encrypted_field_index)&&encrypted_field_index.equals(indexs_split[i])){
newLine+=StringUtils.GetMD5Code(fields_spilts[parseInt]);
}else{
newLine+=fields_spilts[parseInt];
}
if(i!=indexs_split.length-){
newLine+=fields_separator;
}
}
event.setBody(newLine.getBytes(Charsets.UTF_8));
return event;
} catch (Exception e) {
return event;
}
}
/*
* @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
*/
public List<Event> intercept(List<Event> events) {
List<Event> out = new ArrayList<Event>();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
/*
* @see org.apache.flume.interceptor.Interceptor#initialize()
*/
public void initialize() {
// TODO Auto-generated method stub
}
/*
* @see org.apache.flume.interceptor.Interceptor#close()
*/
public void close() {
// TODO Auto-generated method stub
}
public static class Builder implements Interceptor.Builder {
/** The fields_separator.指明每一行字段的分隔符 */
private String fields_separator;
/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
private String indexs;
/** The indexs_separator. 多个下标下标的分隔符*/
private String indexs_separator;
/** The encrypted_field. 需要加密的字段下标*/
private String encrypted_field_index;
/*
* @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
*/
public void configure(Context context) {
fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
indexs = context.getString(INDEXS, DEFAULT_INDEXS);
indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
encrypted_field_index= context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);
}
/*
* @see org.apache.flume.interceptor.Interceptor.Builder#build()
*/
public Interceptor build() {
return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator,encrypted_field_index);
}
}
/**
* The Class Constants.
*
* @author lishas
*/
public static class Constants {
/** The Constant FIELD_SEPARATOR. */
public static final String FIELD_SEPARATOR = "fields_separator";
/** The Constant DEFAULT_FIELD_SEPARATOR. */
public static final String DEFAULT_FIELD_SEPARATOR =" ";
/** The Constant INDEXS. */
public static final String INDEXS = "indexs";
/** The Constant DEFAULT_INDEXS. */
public static final String DEFAULT_INDEXS = "0";
/** The Constant INDEXS_SEPARATOR. */
public static final String INDEXS_SEPARATOR = "indexs_separator";
/** The Constant DEFAULT_INDEXS_SEPARATOR. */
public static final String DEFAULT_INDEXS_SEPARATOR = ",";
/** The Constant ENCRYPTED_FIELD_INDEX. */
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";
/** The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";
/** The Constant PROCESSTIME. */
public static final String PROCESSTIME = "processTime";
/** The Constant PROCESSTIME. */
public static final String DEFAULT_PROCESSTIME = "a";
}
/**
* 字符串md5加密
*/
public static class StringUtils {
// 全局数组
private final static String[] strDigits = { "0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
// 返回形式为数字跟字符串
private static String byteToArrayString(byte bByte) {
int iRet = bByte;
// System.out.println("iRet="+iRet);
if (iRet < ) {
iRet += ;
}
int iD1 = iRet / ;
int iD2 = iRet % ;
return strDigits[iD1] + strDigits[iD2];
}
// 返回形式只为数字
private static String byteToNum(byte bByte) {
int iRet = bByte;
System.out.println("iRet1=" + iRet);
if (iRet < ) {
iRet += ;
}
return String.valueOf(iRet);
}
// 转换字节数组为16进制字串
private static String byteToString(byte[] bByte) {
StringBuffer sBuffer = new StringBuffer();
for (int i = ; i < bByte.length; i++) {
sBuffer.append(byteToArrayString(bByte[i]));
}
return sBuffer.toString();
}
public static String GetMD5Code(String strObj) {
String resultString = null;
try {
resultString = new String(strObj);
MessageDigest md = MessageDigest.getInstance("MD5");
// md.digest() 该函数返回值为存放哈希值结果的byte数组
resultString = byteToString(md.digest(strObj.getBytes()));
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace();
}
return resultString;
}
}
}
② 修改Flume的配置信息
新增配置文件spool-interceptor-hdfs.conf,内容为:
a1.channels = c1
a1.sources = r1
a1.sinks = s1
channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
启动:
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console