天天看點

Flume 自定義Source、Sink和Interceptor

1. 自定義source

http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.15.1/FlumeDeveloperGuide.html#source 

Flume 自定義Source、Sink和Interceptor

一個簡單的自定義source

package com.wxx.bigdata.hadoop.mapreduce.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

/**
 * 自定義flume的資料源
 */
public class CustomerSource extends AbstractSource implements Configurable, PollableSource {
    private String prefix;
    private String suffix;

    /**
     * 處理event
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        try {
            for(int i = 1; i <= 100; i++){
                SimpleEvent simpleEvent = new SimpleEvent();
                simpleEvent.setBody((prefix + i + suffix).getBytes());
                // 不需要關注事務,processEvent()方法中,由channel去處理
                getChannelProcessor().processEvent(simpleEvent);
            }

            status = Status.READY;
        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;
        } finally {

        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //傳回狀态的結果
        return status;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    /**
     * 擷取agent傳入的資訊
     * @param context
     */
    @Override
    public void configure(Context context) {

        this.prefix = context.getString("prefix", "ruozedata");
        this.suffix = context.getString("suffix");

    }
}
           

打jar包上傳到/home/hadoop/app/apache-flume-1.6.0-cdh5.15.1-bin/lib

啟動腳本

flume-ng agent \
--name a1 \
--conf-file /home/hadoop/script/flume/customer/customer_source.conf \
--conf  $FLUME_HOME/conf \
-Dflume.root.logger=INFO,console
           

結果如下圖

Flume 自定義Source、Sink和Interceptor

mysql自定義Source請參考: https://github.com/keedio/flume-ng-sql-source

2. 自定義Sink

官網位址如下

http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.15.1/FlumeDeveloperGuide.html#sink

Flume 自定義Source、Sink和Interceptor

代碼如下

package com.wxx.bigdata.hadoop.mapreduce.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 *
 */
public class CustomerSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(CustomerSink.class);
    private String prefix;
    private String suffix;
    @Override
    public Status process() throws EventDeliveryException {

        Status status = null;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            Event event ;
            while (true){
                event = channel.take();
                if(event != null){
                    break;
                }
            }
            String body = new String(event.getBody());
            logger.info(prefix + body + suffix);
            transaction.commit();
            status = Status.READY;
        }catch (Exception e){
            transaction.rollback();
            status = Status.BACKOFF;
        }finally {
            transaction.close();
        }



        return status;
    }

    @Override
    public void configure(Context context) {
        this.prefix = context.getString("prefix", "customer");
        this.suffix = context.getString("suffix");

    }
}
           

将檔案打成jar包上傳到$FLUME_HOME/lib

Flmue配置檔案為

#定義agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定義source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444

#定義channnel
a1.channels.c1.type = memory

#定義sink
a1.sinks.k1.type = com.wxx.bigdata.hadoop.mapreduce.flume.CustomerSink
a1.sinks.k1.prefix = TEST
a1.sinks.k1.suffix = 2019

#定義配置關系
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
           

啟動腳本

flume-ng agent \
--name a1 \
--conf-file /home/hadoop/script/flume/customer/customer_sink.conf \
--conf  $FLUME_HOME/conf \
-Dflume.root.logger=INFO,console
           
Flume 自定義Source、Sink和Interceptor

3. 自定義攔截器

a1 的資料經過攔截器處理後,如果body中包含"PEK",則發送到a2,否則發送到a3

代碼如下

package com.wxx.bigdata.hadoop.mapreduce.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.InterceptorBuilderFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class CustomerInterceptor implements Interceptor {

    private List<Event> events;
    @Override
    public void initialize() {
        events  = new ArrayList<>();
    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody());
        if(body.contains("PEK")){
            headers.put("type","PEK");
        }else{
            headers.put("type","OTHER");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for(Event event : list){
            events.add(intercept(event));
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new CustomerInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
           

三個agnet的配置檔案如下

#定義agent
a1.sources = r1
a1.sinks = k1 k2 
a1.channels = c1 c2

#定義source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444

#interceptors
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.wxx.bigdata.hadoop.mapreduce.flume.CustomerInterceptor$Builder

#定義selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.PEK = c1
a1.sources.r1.selector.mapping.OTHER = c2


#定義channnel
a1.channels.c1.type = memory
a1.channels.c2.type = memory

#定義sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop000
a1.sinks.k1.port = 44445

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop000
a1.sinks.k2.port = 44446


#定義配置關系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2



#定義agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

#定義source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop000
a2.sources.r1.port = 44445

#定義channnel
a2.channels.c1.type = memory

#定義sink
a2.sinks.k1.type = logger

#定義配置關系
a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1


#定義agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定義source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44446

#定義channnel
a1.channels.c1.type = memory

#定義sink
a1.sinks.k1.type = logger

#定義配置關系
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
           

啟動腳本如下

flume-ng agent \
--name a3 \
--conf-file /home/hadoop/script/flume/customer/customer_interceptor03.conf \
--conf  $FLUME_HOME/conf \
-Dflume.root.logger=INFO,console


flume-ng agent \
--name a2 \
--conf-file /home/hadoop/script/flume/customer/customer_interceptor02.conf \
--conf  $FLUME_HOME/conf \
-Dflume.root.logger=INFO,console

flume-ng agent \
--name a1 \
--conf-file /home/hadoop/script/flume/customer/customer_interceptor01.conf \
--conf  $FLUME_HOME/conf \
-Dflume.root.logger=INFO,console
           

在一個terminal中輸入

Flume 自定義Source、Sink和Interceptor

a2下

Flume 自定義Source、Sink和Interceptor

a3下

Flume 自定義Source、Sink和Interceptor