天天看点

flume自定义拦截器(Interceptor)拼接header和body信息

一、需求背景

       最近项目有这样一个需求,分别采集不同应用不同机器上的日志,在做日志清洗后存入DB,数据库表字段需要存当前日志的来源,比如,来自于哪个项目,该项目的哪台机器,由于我们使用的是flume来做日志采集,故去翻flume的官网,发现有拦截器可以支持我的需求,一个是主机拦截器,可以在source之后配置,在header里面拼上ip信息,另一个是static拦截器,可以自定义key和value,这样的话我们就可以自定义该日志信息来源于哪个项目了;

      测试后发现,在设置了多个拦截器后,消息的头信息里面的确包含了ip和项目信息,但是kafka收到的消息只有消息的body部分,没有header信息,一番折腾后,决定自定义一个flume拦截器,用来拦截event信息,将里面的头信息和body信息取出来然后统一拼接到body里面,经过测试,完美解决了我的需求。

二、实战

    如何自定义flume拦截器 ? 建一个maven工程,导入flume-core包,然后实现interceptor接口,先看代码

package com.flume.schedule;

import java.util.List;
import java.util.Map;

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;

public class FlumeInterceptor implements Interceptor {
	
	private Logger logger = LoggerFactory.getLogger(FlumeInterceptor.class);

	@Override
	public void close() {
		
	}

	@Override
	public void initialize() {
		
	}

	@Override
	public Event intercept(Event event) {
		StringBuilder builder = new StringBuilder();
		Map<String,String> headerMap = event.getHeaders();
		String ip = "";
		String projectName = "";
		if(null != headerMap && !headerMap.isEmpty()) {
			 ip = headerMap.get("host");
			 projectName = headerMap.get("projectname");
			 builder.append("ip:" + ip + ";projectname:" + projectName);
		}
                byte[] byteBody = event.getBody();
		String body = new String(byteBody,Charsets.UTF_8);
		builder.append(";body:" + body);
		event.setBody(builder.toString().trim().getBytes());
		logger.info("拼接后的body信息:" + builder.toString().trim());
		return event;
	}

	@Override
	public List<Event> intercept(List<Event> events) {
		for(final Event event : events) {
			intercept(event);
		}
		return events;
	}

}
           
package com.flume.schedule;

import org.apache.flume.Context;
import org.apache.flume.interceptor.Interceptor;

public class FlumeBuilder implements Interceptor.Builder{

	@Override
	public void configure(Context context) {
		
	}

	@Override
	public Interceptor build() {
		return new FlumeInterceptor();
	}

}
           

 只需要2个类,一个是FlumeIntercetor用来拼接头信息和body信息,一个是FlumeBuilder用来启动这个拦截器,然后将上面的代码打成一个jar包,放到flume的安装目录的plugin.d下(如果没有自己建一个),如下

flume自定义拦截器(Interceptor)拼接header和body信息

 将我们的jar包放到lib下接口,然后配置flume的conf文件,在拦截器配置部分,使用我们自定义的拦截器

a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source

a1.sources.s1.type = exec
a1.sources.s1.channels = c1
a1.sources.s1.command = tail -F  /home/esuser/flume/logs/flume.log

#config interceptor
a1.sources.s1.interceptors=i1 i2 i3
a1.sources.s1.interceptors.i1.type=host
a1.sources.s1.interceptors.i1.useIP=true
a1.sources.s1.interceptors.i1.preserverExisting=false

a1.sources.s1.interceptors.i2.type=static
a1.sources.s1.interceptors.i2.key=projectname
a1.sources.s1.interceptors.i2.value=数据采集项目
a1.sources.s1.interceptors.i2.preserverExisting=false

# 使用自定义的拦截器,这里一个source使用了3个拦截器,分别配置host,工程名,然后用我们自定义的拦截器进行拼接,flume在加载的时候会按顺序加载
a1.sources.s1.interceptors.i3.type=com.flume.schedule.FlumeBuilder

# Describe the sink

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.80.132
a1.sinks.k1.port= 3333
a1.sinks.k1.channels = c1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.80.132
a1.sinks.k2.port = 4444
a1.sinks.k2.channels = c1

# Use a channel that buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1



a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 1000
           

然后启动flume,kafka,storm,写个定时任务每隔5s打印一条日志数据;

看一下kafka消费者

ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:05:05 [INFO] [org.springframework.jmx.export.MBeanExporter:449] - Unregistering JMX-exposed beans on shutdown
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:20 [INFO] [org.springframework.boot.StartupInfoLogger:48] - Starting FlumeApplication v0.0.1-SNAPSHOT on slave with PID 4859 (/home/esuser/flume/myconf/flume-0.0.1-SNAPSHOT.jar started by root in /home/esuser/flume/myconf)
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:20 [INFO] [org.springframework.boot.SpringApplication:661] - No active profile set, falling back to default profiles: default
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:21 [INFO] [org.springframework.context.support.AbstractApplicationContext:582] - Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@2d38eb89: startup date [Mon Feb 12 14:53:21 PST 2018]; root of context hierarchy
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:24 [INFO] [org.hibernate.validator.internal.util.Version:30] - HV000001: Hibernate Validator 5.2.4.Final
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:48 [INFO] [org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer:87] - Tomcat initialized with port(s): 5678 (http)
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:49 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Starting service Tomcat
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:49 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Starting Servlet Engine: Apache Tomcat/8.5.6
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:51 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Initializing Spring embedded WebApplicationContext
ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 14:53:51 [INFO] [org.springframework.boot.context.embedded.EmbeddedWebApplicationContext:276] - Root WebApplicationContext: initialization completed in 29741 ms
           

看一下storm的topology日志

2018-02-12 15:01:20.122 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: []
2018-02-12 15:01:20.131 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=192.168.80.132:9092, topic=test, partition=0}]
2018-02-12 15:01:20.941 c.l.s.ParseBolt [INFO] 当前线程:Thread-16-parseBolt01-executor[4 4]数据库信息:身份证
2018-02-12 15:01:20.942 c.l.s.ParseBolt [INFO] 当前线程:Thread-16-parseBolt01-executor[4 4]数据库信息:户口本
2018-02-12 15:01:20.942 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(4)
2018-02-12 15:01:20.943 c.l.s.ParseBolt [INFO] 当前线程:Thread-6-parseBolt01-executor[3 3]数据库信息:身份证
2018-02-12 15:01:20.944 c.l.s.ParseBolt [INFO] 当前线程:Thread-6-parseBolt01-executor[3 3]数据库信息:户口本
2018-02-12 15:01:20.944 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(3)
2018-02-12 15:01:20.954 c.l.s.ParseBolt [INFO] 当前线程:Thread-14-parseBolt01-executor[5 5]数据库信息:身份证
2018-02-12 15:01:20.954 c.l.s.ParseBolt [INFO] 当前线程:Thread-14-parseBolt01-executor[5 5]数据库信息:户口本
2018-02-12 15:01:20.963 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(5)
2018-02-12 15:01:21.134 o.a.s.d.executor [INFO] Prepared bolt insertbolt01:(2)
2018-02-12 15:01:22.224 o.a.s.k.PartitionManager [INFO] Read partition information from: /kafka2storm/id/partition_0  --> {"partition":0,"offset":7325,"topology":{"name":"flumestorm2mysql","id":"flumestorm2mysql-1-1518475953"},"topic":"test","broker":{"port":9092,"host":"192.168.80.132"}}
2018-02-12 15:01:22.666 o.a.s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 7325; old topology_id: flumestorm2mysql-1-1518475953 - new topology_id: hello2-2-1518476417
2018-02-12 15:01:22.673 o.a.s.k.PartitionManager [INFO] Starting Kafka 192.168.80.132:0 from offset 7325
2018-02-12 15:01:22.691 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing
2018-02-12 15:01:23.239 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:00 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:00;生产数据
2018-02-12 15:01:23.275 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:10 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:10;生产数据
2018-02-12 15:01:23.279 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:05 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:05;生产数据
2018-02-12 15:01:23.280 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:15 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:15;生产数据
2018-02-12 15:01:23.296 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:20 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:20;生产数据
2018-02-12 15:01:23.297 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:25 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:25;生产数据
2018-02-12 15:01:23.304 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:30 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:30;生产数据
2018-02-12 15:01:23.316 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:35 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:35;生产数据
2018-02-12 15:01:23.317 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:40 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:40;生产数据
2018-02-12 15:01:23.318 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:00:45 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:00:45;生产数据
2018-02-12 15:01:23.327 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:数据采集项目;body:2018-02-12 15:01:15 [INFO] [com.flume.schedule.FlumeSchedule:22] - 当前时间:2018-02-12 15:01:15;生产数据
           

最后看DB

flume自定义拦截器(Interceptor)拼接header和body信息

可以看到,ip信息和数据来自于哪个项目,都被我们采集到了。