flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制;
可适用场景:日志--->flume--->实时计算(如mq+storm) 、日志--->flume--->离线计算(如odps、hdfs、hbase)、日志--->flume--->elasticsearch等。
对照这个图,作一些说明;
event 一条日志数据在flume中对应一个event对象。不过给他添加了header属性,就是一个map,放 一些额外信息,可以针对每条event做特殊处理,比如channel的选择。这些额外的键值对可以在event从source到channel之间的interceptor(拦截器)中set。
source 负责日志流入,比如从文件、网络、mq等数据源流入数据。
channel 负责数据聚合/暂存,以供sink消费掉,事务机制主要在这里实现
sink 负责数据转移到存储,比如从channel拿到日志后直接存储到odps、elasticsearch等。
拦截器 如果配置了拦截器,则event从source 进入channel前,经过拦截器链做过滤或其他处理;如识别不需要的数据等
选择器 flume默认实现有replicatingchannelselector(复制,event可同时发往多个channel)和multiplexingchannelselector(复用,可根据header中某个字段值,发往不同的channel)
下图是个简单的多channel、sink情况;flume还包含一些其他的高级的特性和使用方法,有时间可以继续研究。
遇到的问题:
日志数据中包含空行等不正确格式的记录,导致从channel中take日志记录后保存到odps失败;失败的操作被事务回滚,结果是数据流传在这个地方错误循环下去。
日志数据按实际生成的日期为分区保存在opds表的分区中,导入日志数据的日期为实际日期的后一天。
在尝试了几种方法后,最后选择自定义了一个拦截器实现(ualogfilteringinterceptor),能很好的达到目前的需求,他主要做如下两件事:
过滤掉不需要、不规范的数据,并且把过滤掉的这些数据存储到指定的文件里,每天一个文件(如果有异常记录)。
在每条event的header中加入qt值(qt值为前一天的日期,格式为yyyymmdd),每条event根据该值保持到odps的对应表分区中。
event从source put到channel和从channel take到sink后落地,这两个步骤都包裹在事务中;我这里说下memorytransaction大致实现。
memorytransaction 主要用到了两个双向阻塞队列(linkedblockingdeque)putlist和takelist作为缓冲区,同时配合使用memorychannel中的linkedblockingdeque queue;队列的大小通过flume的配置初始化好;
put事务
批量数据循环put到putlist中
commit,把putlist队列中数据offer到queue队列中,然后释放信号量,清空(clear)putlist队列
rollback,清空(clear)putlist队列
这里其实没有做太多事。
take事务
检查takelist队列大小是否够用,从queue队列中poll event到takelist队列中
commit,表明被sink正确消费掉,清空(clear)takelist队列
rollback,异常出现,则把takelist队列中的event返还到queue队列顶部