天天看點

flume架構圖以及模闆

flume架構圖以及模闆

模闆:

================主要方式===================

a1.sources = r1  

a1.channels = c1 c2

a1.sinks = s1 s2

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir =/root/my

a1.sources.r1.selector.type=multiplexing        //指定multiplexing selector 會比較header資訊,不指定會有預設的

a1.sources.r1.selector.header = state

a1.sources.r1.selector.mapping.USER = c1

a1.sources.r1.selector.mapping.ORDER = c2

a1.sources.r1.selector.default = c1

a1.sources.r1.interceptors=i1 i2 i3

a1.sources.r1.interceptors.i1.type=timestamp

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i2.type=host

a1.sources.r1.interceptors.i2.preserveExisting=false

a1.sources.r1.interceptors.i2.useIP=true

a1.sources.r1.interceptors.i2.hostHeader=hostname

a1.sources.r1.interceptors.i3.type=static

a1.sources.r1.interceptors.i3.preserveExisting=false

a1.sources.r1.interceptors.i3.key=hn

a1.sources.r1.interceptors.i3.value=hdp

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000                    

a1.channels.c1.transactionCapacity=100           

a1.channels.c1.keep-alive=3

a1.channels.c1.byteCapacityBufferPercentage=20    

a1.channels.c1.byteCapacity=800000               

a1.channels.c2.type=file

a1.channels.c2.capacity=1000

a1.channels.c2.transactionCapacity=100

a1.sinks.s1.type=hdfs

a1.sinks.s1.hdfs.path=/flume/%Y/%m/%d/mulc1

a1.sinks.s1.hdfs.filePrefix=s1sink

a1.sinks.s1.hdfs.fileSuffix=.log

a1.sinks.s1.hdfs.inUseSuffix=.tmp

a1.sinks.s1.hdfs.rollInterval=60

a1.sinks.s1.hdfs.rollSize=1024

a1.sinks.s1.hdfs.rollCount=10

a1.sinks.s1.hdfs.idleTimeout=0

a1.sinks.s1.hdfs.batchSize=100

a1.sinks.s1.hdfs.fileType=DataStream

a1.sinks.s1.hdfs.writeFormat=Text

a1.sinks.s1.hdfs.round=true

a1.sinks.s1.hdfs.roundValue=1

a1.sinks.s1.hdfs.roundUnit=second

a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sinks.s2.type=hdfs

a1.sinks.s2.hdfs.path=/flume/%Y/%m/%d/mulc2

a1.sinks.s2.hdfs.filePrefix=s2sink

a1.sinks.s2.hdfs.fileSuffix=.log

a1.sinks.s2.hdfs.inUseSuffix=.tmp

a1.sinks.s2.hdfs.rollInterval=60

a1.sinks.s2.hdfs.rollSize=1024

a1.sinks.s2.hdfs.rollCount=10

a1.sinks.s2.hdfs.idleTimeout=0

a1.sinks.s2.hdfs.batchSize=100

a1.sinks.s2.hdfs.fileType=DataStream

a1.sinks.s2.hdfs.writeFormat=Text

a1.sinks.s2.hdfs.round=true

a1.sinks.s2.hdfs.roundValue=1

a1.sinks.s2.hdfs.roundUnit=second

a1.sinks.s2.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1 c2

a1.sinks.s1.channel=c1

a1.sinks.s2.channel=c2

flume-ng agent -c $FLUME_HOME/conf -f /opt/apache-flume-1.6.0-bin/conf/m2.conf -n a1 -Dflume.root.logger=INFO,console

============項目====================

a1.sources = r1

a1.channels = c1

a1.sinks = s1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log

a1.sources.r1.interceptors=i1 i2 i3

a1.sources.r1.interceptors.i1.type=timestamp

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i2.type=host

a1.sources.r1.interceptors.i2.preserveExisting=false

a1.sources.r1.interceptors.i2.useIP=true

a1.sources.r1.interceptors.i2.hostHeader=hostname

a1.sources.r1.interceptors.i3.type=static

a1.sources.r1.interceptors.i3.preserveExisting=false

a1.sources.r1.interceptors.i3.key=hn

a1.sources.r1.interceptors.i3.value=hdp

#描述channel 

a1.channels.c1.type=file

a1.channels.c1.capacity=1000                    

a1.channels.c1.transactionCapacity=100           

a1.channels.c1.keep-alive=3

a1.channels.c1.byteCapacityBufferPercentage=20    

a1.channels.c1.byteCapacity=800000

#描述sink

a1.sinks.s1.type=hdfs

a1.sinks.s1.hdfs.path=/flumedemo/%Y/%m/%d/test

a1.sinks.s1.hdfs.filePrefix=s1sink

a1.sinks.s1.hdfs.fileSuffix=.log

a1.sinks.s1.hdfs.inUseSuffix=.tmp

a1.sinks.s1.hdfs.rollInterval=60              //生成新檔案

a1.sinks.s1.hdfs.rollSize=1024

a1.sinks.s1.hdfs.rollCount=10

a1.sinks.s1.hdfs.idleTimeout=0

a1.sinks.s1.hdfs.batchSize=100

a1.sinks.s1.hdfs.fileType=DataStream

a1.sinks.s1.hdfs.writeFormat=Text

a1.sinks.s1.hdfs.round=true                   //生成新目錄

a1.sinks.s1.hdfs.roundValue=1

a1.sinks.s1.hdfs.roundUnit=second

a1.sinks.s1.hdfs.useLocalTimeStamp=true

#将source和sink綁定到channel

a1.sources.r1.channels = c1

a1.sinks.s1.channel = c1

flume-ng agent -c /opt/apache-flume-1.6.0-bin/conf -f /opt/apache-flume-1.6.0-bin/conf/test.conf -n a1 -Dflume.root.logger=INFO,console

-----kafka示例--------------------------------

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir =/root/my

# 用于描述channel,在記憶體中做資料的臨時的存儲

a1.channels.c1.type = memory

a1.channels.c1.capacity=1000                    

a1.channels.c1.transactionCapacity=100           

a1.channels.c1.keep-alive=3

a1.channels.c1.byteCapacityBufferPercentage=20    

a1.channels.c1.byteCapacity=800000

# 用于描述sink,類型是日志格式,用于定制消息釋出方的參數

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.topic = recharge

a1.sinks.k1.brokerList = NODE01:9092,NODE02:9092,NODE03:9092

# 将a1中的各個元件建立關聯關系,将source和sink都指向了同一個channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume-ng agent -c /opt/apache-flume-1.6.0-bin/conf -f /opt/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

------------------------------------------

flume_kafka.sh       :  

flume-kafka: 采用Kafka Channel,省去了Sink,提高了效率

  flume配置檔案: vim file-flume-kafka.conf        

                    a1.sources=r1

                    a1.channels=c1 c2

                    # configure source

                    a1.sources.r1.type = TAILDIR

                    a1.sources.r1.positionFile = /opt/flume/test/log_position.json

                    a1.sources.r1.filegroups = f1

                    a1.sources.r1.filegroups.f1 = /tmp/logs/app.+

                    a1.sources.r1.fileHeader = true

                    a1.sources.r1.channels = c1 c2

                    #interceptor

                    a1.sources.r1.interceptors =  i1 i2

                    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder

                    a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder

                    a1.sources.r1.selector.type = multiplexing

                    a1.sources.r1.selector.header = topic

                    a1.sources.r1.selector.mapping.topic_start = c1

                    a1.sources.r1.selector.mapping.topic_event = c2

                    # configure channel

                    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

                    a1.channels.c1.kafka.bootstrap.servers = NODE01:9092,NODE02:9092,NODE03:9092

                    a1.channels.c1.kafka.topic = topic_start

                    a1.channels.c1.parseAsFlumeEvent = false

                    a1.channels.c1.kafka.consumer.group.id = flume-consumer

                    a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel

                    a1.channels.c2.kafka.bootstrap.servers = NODE01:9092,NODE02:9092,NODE03:9092

                    a1.channels.c2.kafka.topic = topic_event

                    a1.channels.c2.parseAsFlumeEvent = false

                    a1.channels.c2.kafka.consumer.group.id = flume-consumer   

------------------------------------------------------------------

flume把kafka中的資料拉取到hdfs

vim kafka-flume-hdfs.conf

## 元件

a1.sources=r1 r2

a1.channels=c1 c2

a1.sinks=k1 k2

## source1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.batchSize = 5000

a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.kafka.bootstrap.servers = NODE01:9092,NODE02:9092,NODE03:9092

a1.sources.r1.kafka.topics=topic_start

## source2

a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r2.batchSize = 5000

a1.sources.r2.batchDurationMillis = 2000

a1.sources.r2.kafka.bootstrap.servers = NODE01:9092,NODE02:9092,NODE03:9092

a1.sources.r2.kafka.topics=topic_event

## channel1

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/flume/checkpoint/behavior1

a1.channels.c1.dataDirs = /opt/flume/data/behavior1/

a1.channels.c1.maxFileSize = 2146435071

a1.channels.c1.capacity = 1000000

a1.channels.c1.keep-alive = 6

## channel2

a1.channels.c2.type = file

a1.channels.c2.checkpointDir = /opt/flume/checkpoint/behavior2

a1.channels.c2.dataDirs = /opt/flume/data/behavior2/

a1.channels.c2.maxFileSize = 2146435071

a1.channels.c2.capacity = 1000000

a1.channels.c2.keep-alive = 6

## sink1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d            

a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = second

##sink2

a1.sinks.k2.type = hdfs

a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d

a1.sinks.k2.hdfs.filePrefix = logevent-

a1.sinks.k2.hdfs.round = true

a1.sinks.k2.hdfs.roundValue = 10

a1.sinks.k2.hdfs.roundUnit = second

## 不要産生大量小檔案

a1.sinks.k1.hdfs.rollInterval = 10

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10

a1.sinks.k2.hdfs.rollSize = 134217728

a1.sinks.k2.hdfs.rollCount = 0

## 控制輸出檔案是原生檔案。

a1.sinks.k1.hdfs.fileType = DataStream 

a1.sinks.k2.hdfs.fileType = DataStream 

#a1.sinks.k1.hdfs.codeC = lzop

#a1.sinks.k2.hdfs.codeC = lzop

## 拼裝

a1.sources.r1.channels = c1

a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2

a1.sinks.k2.channel= c2

--------------------------------------------------------------------------

ETL攔截器主要用于,過濾時間戳不合法和Json資料不完整的日志

Flume ETL攔截器LogETLInterceptor

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;

import java.util.ArrayList;

import java.util.List;

public class LogETLInterceptor implements Interceptor {

    @Override

    public void initialize() {

    }

    @Override

    public Event intercept(Event event) {

        // 1 擷取資料

        byte[] body = event.getBody();

        String log = new String(body, Charset.forName("UTF-8"));

        // 2 判斷資料類型并向Header中指派

        if (log.contains("start")) {

            if (LogUtils.validateStart(log)){

                return event;

            }

        }else {

            if (LogUtils.validateEvent(log)){

                return event;

            }

        }

        // 3 傳回校驗結果

        return null;

    }

    @Override

    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {

            Event intercept1 = intercept(event);

            if (intercept1 != null){

                interceptors.add(intercept1);

            }

        }

        return interceptors;

    }

    @Override

    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override

        public Interceptor build() {

            return new LogETLInterceptor();

        }

        @Override

        public void configure(Context context) {

        }

    }

}

4)Flume日志過濾工具類

package com.atguigu.flume.interceptor;

import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {

    public static boolean validateEvent(String log) {

        // 伺服器時間 | json

        // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"[email protected]","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}

        // 1 切割

        String[] logContents = log.split("\\|");

        // 2 校驗

        if(logContents.length != 2){

            return false;

        }

        //3 校驗伺服器時間

        if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){

            return false;

        }

        // 4 校驗json

        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){

            return false;

        }

        return true;

    }

    public static boolean validateStart(String log) {

 // {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"[email protected]","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}

        if (log == null){

            return false;

        }

        // 校驗json

        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){

            return false;

        }

        return true;

    }

}

-----------------------

日志類型區分攔截器主要用于,将啟動日志和事件日志區分開來,友善發往Kafka的不同Topic。

Flume日志類型區分攔截器LogTypeInterceptor

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

public class LogTypeInterceptor implements Interceptor {

    @Override

    public void initialize() {

    }

    @Override

    public Event intercept(Event event) {

        // 區分日志類型:   body  header

        // 1 擷取body資料

        byte[] body = event.getBody();

        String log = new String(body, Charset.forName("UTF-8"));

        // 2 擷取header

        Map<String, String> headers = event.getHeaders();

        // 3 判斷資料類型并向Header中指派

        if (log.contains("start")) {

            headers.put("topic","topic_start");

        }else {

            headers.put("topic","topic_event");

        }

        return event;

    }

    @Override

    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {

            Event intercept1 = intercept(event);

            interceptors.add(intercept1);

        }

        return interceptors;

    }

    @Override

    public void close() {

    }

    public static class Builder implements  Interceptor.Builder{

        @Override

        public Interceptor build() {

            return new LogTypeInterceptor();

        }

        @Override

        public void configure(Context context) {

        }

    }

}