天天看點

采集日志Flume的叢集搭建與詳細配置

目錄

    • 1.概述
    • 2.運作機制
    • 3.Flum采集系統建構圖
      • 3.1簡單結構
      • 3.2複雜結構
    • 4.采集日志Flume快速入門
      • 4.1安裝部署
      • 4.2Flume具體配置
      • 4.3Flume的ETL和分類型攔截器
        • 4.3.1配置pom.xml檔案
        • 4.3.2ETL攔截器
        • 4.3.3日志過濾工具類
        • 4.3.4日志分類攔截器
        • 4.3.5打包上傳并啟動
    • 5.采集日志FLume啟動腳本
    • 6.腳本補充
      • 6.1Hadoop啟動腳本
      • 6.2生成日志腳本
      • 6.3zookeeper啟動腳本
      • 6.3叢集指令腳本
      • 6.4統一叢集時間腳本

1.概述

Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的軟體。

Flume的核心是把資料從資料源(source)收集過來,再将收集到的資料送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存資料(channel),待資料真正到達目的地(sink)後,flume在删除自己緩存的資料。

Flume支援定制各類資料發送方,用于收集各類型資料;同時,Flume支援定制各種資料接受方,用于最終存儲資料。一般的采集需求,通過對flume的簡單配置即可實作。針對特殊場景也具備良好的自定義擴充能力。是以,flume可以适用于大部分的日常資料采集場景。

目前Flume有兩個版本。Flume 0.9X版本的統稱Flume OG(original generation),Flume1.X版本的統稱Flume NG(next generation)。由于Flume NG經過核心元件、核心配置以及代碼架構重構,與Flume OG有很大不同,使用時請注意區分。改動的另一原因是将Flume納入 apache 旗下,Cloudera Flume 改名為 Apache Flume。

2.運作機制

Flume系統中核心的角色是agent,agent本身是一個Java程序,一般運作在日志收集節點。

采集日志Flume的叢集搭建與詳細配置

每一個agent相當于一個資料傳遞員,内部有三個元件:

Source:采集源,用于跟資料源對接,以擷取資料;

Sink:下沉地,采集資料的傳送目的,用于往下一級agent傳遞資料或者往最終存儲系統傳遞資料;

Channel:agent内部的資料傳輸通道,用于從source将資料傳遞到sink;

在整個資料的傳輸的過程中,流動的是event,它是Flume内部資料傳輸的最基本單元。event将傳輸的資料進行封裝。如果是文本檔案,通常是一行記錄,event也是事務的基本機關。event從source,流向channel,再到sink,本身為一個位元組數組,并可攜帶headers(頭資訊)資訊。event代表着一個資料的最小完整單元,從外部資料源來,向外部的目的地去。

一個完整的event包括:event headers、event body、event資訊,其中event資訊就是flume收集到的日記記錄。

3.Flum采集系統建構圖

3.1簡單結構

單個 agent 采集資料

采集日志Flume的叢集搭建與詳細配置

3.2複雜結構

多個 agent 之間串聯

采集日志Flume的叢集搭建與詳細配置

4.采集日志Flume快速入門

4.1安裝部署

  1. 将apache-flume-1.7.0-bin.tar.gz上傳到linux的/opt/software目錄下
  2. 解壓apache-flume-1.7.0-bin.tar.gz到/opt/module/目錄
    [[email protected] software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
               
  3. 修改apache-flume-1.7.0-bin的名稱為flume
    [[email protected] module]$ mv apache-flume-1.7.0-bin flume
               
  4. 将flume/conf下的flume-env.sh.template檔案修改為flume-env.sh,并配置flume-env.sh檔案
    [[email protected] conf]$ mv flume-env.sh.template flume-env.sh
    [[email protected] conf]$ vim flume-env.sh
    export JAVA_HOME=/opt/module/jdk1.8.0_144  
               

4.2Flume具體配置

  1. 在/opt/module/flume/conf目錄下建立file-flume-kafka.conf檔案
    [[email protected] conf]$ vim file-flume-kafka.conf
               
  2. 配置該檔案

    File—>Flume—>Kafka

    Flume官網:官網入口

    #1.定義元件
    # a1即為Agent1
    a1.sources=r1	#定義一個資料源為r1
    a1.channels=c1 c2	#定義兩個傳輸通道為c1和c2 
    
    #2.配置source--configure source
    #定義讀取資料源r1的讀取方式為talidir
    a1.sources.r1.type = TAILDIR
    #設定JSON格式的檔案的位置。該檔案中記錄tialing最後位置,實作斷點續傳
    a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
    #定義檔案組,如果有多個檔案組以空格的形式隔開
    a1.sources.r1.filegroups = f1
    #定義要讀取的日志資料位置,即每個檔案組的絕對路徑
    a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
    #是否在event的header中添加檔案的完整路徑資訊
    a1.sources.r1.fileHeader = true
    #指定source(資料源為r1)使用的channel(傳輸通道)為c1 c2
    a1.sources.r1.channels = c1 c2
    
    #配置攔截器--interceptor
    #定義source(資料源為r1)的攔截器有i1 i2
    a1.sources.r1.interceptors =  i1 i2
    #定義攔截器i1為ETL攔截器,攔截不規範的資料
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
    #定義攔截器i2為日志類型攔截器,将啟動日志與事件日志分開
    a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
    
    #定義source(資料源為r1)的選擇器類型為multiplexing(多路
    #Multiplexing 可以選擇該發往哪些channel
    #Replicating 會将source過來的events發往所有channel
    a1.sources.r1.selector.type = multiplexing
    #設定選擇器的鍵值對header的key為topic
    a1.sources.r1.selector.header = topic
    #判斷選擇器的鍵值對header的topic對應的value,如果為topic_start則資料通過c1傳輸,反之用c2
    a1.sources.r1.selector.mapping.topic_start = c1
    a1.sources.r1.selector.mapping.topic_event = c2
    #通過這種方法成功将啟動日志與事件日志分離
    
    #配置傳輸通道--configure channel
    #配置傳輸通道的類型。
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    #kafka叢集主機清單,中間用逗号隔開
    a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    #日志類型為啟動日志,資料發往哪一個Kafka中去
    a1.channels.c1.kafka.topic = topic_start
    #如果為true,資料前會有FlumeEvent字首。設定為false則沒有字首
    a1.channels.c1.parseAsFlumeEvent = false
    #設定消費者組,Kafka 僅僅使用 Consumer Group 這一種機制,卻同時實作了傳統消息引擎系統的兩大模型:
    #如果所有執行個體都屬于同一個 Group,那麼它實作的就是消息隊列模型;
    #如果所有執行個體分别屬于不同的 Group,那麼它實作的就是釋出 / 訂閱模型。
    a1.channels.c1.kafka.consumer.group.id = flume-consumer
    
    a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.channels.c2.kafka.topic = topic_event
    a1.channels.c2.parseAsFlumeEvent = false
    a1.channels.c2.kafka.consumer.group.id = flume-consumer
               

4.3Flume的ETL和分類型攔截器

ETL攔截器:主要用于,過濾時間戳不合法和Json資料不完整的日志

日志類型區分攔截器:主要用于,将啟動日志和事件日志區分開來,友善發往Kafka的不同Topic。

前期準備:

  1. 建立Maven工程flume-interceptor。
  2. 建立包名com.atguigu.flume.interceptor

4.3.1配置pom.xml檔案

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
           

4.3.2ETL攔截器

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {
    @Override
    public void initialize() {
        //初始化
    }

    @Override
    //判斷單個event
    public Event intercept(Event event) {
        //擷取資料
        byte[] body = event.getBody();
        String log = new String(body,Charset.forName("UTF-8"));

        //判斷資料類型并向Header中指派
        if (log.contains("start")) {
            if (LogUtils.validateStart(log)) {
                return event;
            }
        } else {
            if(LogUtils.validateEvent(log)) {
                return event;
            }
        }
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : list) {
            Event intercept1 = intercept(event);

            if (intercept1 != null) {
                interceptors.add(intercept1);
            }
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
           

4.3.3日志過濾工具類

package com.atguigu.flume.interceptor;

import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {
    public static boolean validateEvent(String log) {
        //切割
        String[] logContents = log.split("\\|");
/*
1615265478494|{"cm":
{"ln":"-87.9","sv":"V2.2.2","os":"8.2.8","g":"[email protected]","mid":"997","nw":"3G"
,"l":"en","vc":"1","hw":"640*1136","ar":"MX","uid":"997","t":"1615174356590","la":"-2.8
","md":"Huawei-12","vn":"1.2.8","ba":"Huawei","sr":"M"},"ap":"app"}
*/
        //校驗
        if (logContents.length != 2) {
            return false;
        }
        //校驗伺服器時間

        if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
            return false;
        }

        //校驗Json
        //trim()  去掉字元串兩端的空格  不論空格多少
        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")) {
            return false;
        }
        return true;
    }
    public static boolean validateStart(String log) {
        if (log == null) return false;
		/*{"action":"1","ar":"MX","ba":"Sumsung","detail":"","en":"start",
		"entry":"3","extend1":"","g":"[email protected]","hw":"640*960"}
		*/
        //校驗json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) return false;

        return true;
    }
}

           

4.3.4日志分類攔截器

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        //區分日志類型  body header
        //擷取body資料
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        //擷取header
        Map<String, String> headers = event.getHeaders();

        //判斷判斷資料類型并向header中指派
        if (log.contains("start")) {
            //啟動日志
            headers.put("topic" , "topic_start");
        } else {
            //事件日志
            headers.put("topic" , "topic_event");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : list) {
            Event intercept1 = intercept(event);
            interceptors.add(intercept1);
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
           

4.3.5打包上傳并啟動

  1. 将不帶依賴的包,上傳到hadoop102的Flume的lib檔案夾下。
    [[email protected] lib]$ ls | grep interceptor
    flume-interceptor-1.0-SNAPSHOT.jar
               
  2. 将flume-interceptor-1.0-SNAPSHOT.jar分發到hadoop103,hadoop104

    前期準備:xysnc分發腳本

    #!/bin/bash
    #1 擷取輸入參數個數,如果沒有參數,直接退出
    pcount=$#
    if ((pcount==0)); then
    echo no args;
    exit;
    fi
    
    #2 擷取檔案名稱
    p1=$1
    fname=`basename $p1`
    echo fname=$fname
    
    #3 擷取上級目錄的絕對路徑
    pdir=`cd -P $(dirname $p1); pwd`
    echo pdir=$pdir
    
    #4 擷取目前使用者名稱
    user=`whoami`
    
    #5 循環
    for host in hadoop102 hadoop103 hadoop104
    do
        echo ------------------- $host --------------
        rsync -av $pdir/$fname $user@$host:$pdir
    done
               
    分發flume-interceptor-1.0-SNAPSHOT.jar
    [[email protected] lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar 
    fname=flume-interceptor-1.0-SNAPSHOT.jar
    pdir=/opt/module/flume/lib
    ------------------- hadoop102 --------------
    sending incremental file list
    
    sent 72 bytes  received 12 bytes  168.00 bytes/sec
    total size is 6667  speedup is 79.37
    ------------------- hadoop103 --------------
    sending incremental file list
    flume-interceptor-1.0-SNAPSHOT.jar
    
    sent 6782 bytes  received 31 bytes  4542.00 bytes/sec
    total size is 6667  speedup is 0.98
    ------------------- hadoop104 --------------
    sending incremental file list
    flume-interceptor-1.0-SNAPSHOT.jar
    
    sent 6782 bytes  received 31 bytes  13626.00 bytes/sec
    total size is 6667  speedup is 0.98
               
  3. 分發file-flume-kafka.conf到hadoop102,hadoop103
    [[email protected] conf]$ pwd
    /opt/module/flume/conf
    [[email protected] conf]$ xsync file-flume-kafka.conf 
    fname=file-flume-kafka.conf
    pdir=/opt/module/flume/conf
    ------------------- hadoop102 --------------
    sending incremental file list
    
    sent 59 bytes  received 12 bytes  47.33 bytes/sec
    total size is 1307  speedup is 18.41
    ------------------- hadoop103 --------------
    sending incremental file list
    file-flume-kafka.conf
    
    sent 1409 bytes  received 31 bytes  2880.00 bytes/sec
    total size is 1307  speedup is 0.91
    ------------------- hadoop104 --------------
    sending incremental file list
    file-flume-kafka.conf
    
    sent 1409 bytes  received 31 bytes  960.00 bytes/sec
    total size is 1307  speedup is 0.91
    
               

5.采集日志FLume啟動腳本

  1. 在/home/lili/bin目錄下建立腳本f1.sh
    [[email protected] bin]$ vim f1.sh
    #!/bin/bash
    #flume啟動腳本
    #腳本啟動的為hadoop1與hadoop2
    #本項目是 flume -- kafka -- flume的形式
    #該腳本為第一個flume,即為采集資料輸送到kafka
    case $1 in
    "start"){
    	for i in hadoop102 hadoop103
    	do
    		 echo " --------啟動 $i 采集flume-------"
    		ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/job/data-warehouse/flume.log 2>&1  &"
    	done
    };;
    "stop"){
    	for i in hadoop102 hadoop103
    	do
    		echo " --------停止 $i 采集flume-------"
                    ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs kill"	
    	done
    };;
    esac
    
    # nohup,該指令可以在你退出帳戶/關閉終端之後繼續運作相應的程序。nohup就是不挂起的意思,不挂斷地運作指令。
    # awk預設分隔符為空格
    # xargs表示取出前面指令運作的結果,作為後面指令的輸入參數。
    
               
  2. 增加腳本權限
    [[email protected] bin]$ chmod 777 f1.sh
               
  3. Flume叢集啟動
    [[email protected] flume]$ f1.sh start
     --------啟動 hadoop102 采集flume-------
     --------啟動 hadoop103 采集flume-------
    [[email protected] bin]$ jps
    17358 Jps
    17139 Application
               
  4. Flume叢集停止
    [[email protected] flume]$ f1.sh stop
     --------停止 hadoop102 采集flume-------
     --------停止 hadoop103 采集flume-------
    [[email protected] bin]$ jps
    17378 Jps
               

6.腳本補充

6.1Hadoop啟動腳本

[[email protected] bin]$ vim hdp
#!/bin/bash
#1 擷取參數個數,如果沒有參數,直接退出
pcount=$#
if((pcount==0)); then 
echo no args; 
exit; 
fi 

case $1 in
"start" ){

	echo "================    開始啟動叢集    ================"

	# 開啟hadoop、yarn、曆史伺服器
	echo "================    正在啟動HDFS    ================"
	ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/start-dfs.sh'
	
	echo "================    正在啟動YARN    ================"
	ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/start-yarn.sh'
	
	echo "================    正在啟動JobHistoryServer    ================"
	ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh start historyserver'
};;

"stop" ){

	echo "================    開始關閉叢集    ================"

	# 關閉hadoop、yarn、曆史伺服器
	echo "================    正在關閉JobHistoryServer    ================"
	ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh stop historyserver'
	
	echo "================    正在關閉YARN    ================"
        ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh'
	
	echo "================    正在關閉HDFS    ================"
        ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh'
};;
esac
           

6.2生成日志腳本

[[email protected] bin]$ vim lg.sh
#!/bin/bash
for i in hadoop102 hadoop103
do
	echo ----------生成日志----------
	ssh $i "java -jar /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 >/dev/null 2>&1 &"
done
           

6.3zookeeper啟動腳本

[[email protected] bin]$ vim zk.sh
#! /bin/bash
case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
	done
};;
esac

           

6.3叢集指令腳本

注:對叢集發出指令并傳回結果,多用于程序的檢視

[[email protected] bin]$ vim xcall.sh
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
	echo ----------$i----------
	ssh  $i "$* "
done
           

例如:

[[email protected] bin]$ xcall.sh jps
----------hadoop102----------
17469 Jps
----------hadoop103----------
10961 Jps
----------hadoop104----------
11583 Jps
           

6.4統一叢集時間腳本

[[email protected] bin]$ vim dt.sh
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
	echo ----------$i 同步時間----------
	#當用到sudo的時候需要加  -t
	ssh -t $i "sudo date -s $1 "
done
           

例如:

[[email protected] bin]$ dt.sh 16:00:00
----------hadoop102 同步時間----------
[sudo] password for lili: 
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop102 closed.
----------hadoop103 同步時間----------
[sudo] password for lili: 
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop103 closed.
----------hadoop104 同步時間----------
[sudo] password for lili: 
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop104 closed.