目錄
-
- 1.概述
- 2.運作機制
- 3.Flum采集系統建構圖
-
- 3.1簡單結構
- 3.2複雜結構
- 4.采集日志Flume快速入門
-
- 4.1安裝部署
- 4.2Flume具體配置
- 4.3Flume的ETL和分類型攔截器
-
- 4.3.1配置pom.xml檔案
- 4.3.2ETL攔截器
- 4.3.3日志過濾工具類
- 4.3.4日志分類攔截器
- 4.3.5打包上傳并啟動
- 5.采集日志FLume啟動腳本
- 6.腳本補充
-
- 6.1Hadoop啟動腳本
- 6.2生成日志腳本
- 6.3zookeeper啟動腳本
- 6.3叢集指令腳本
- 6.4統一叢集時間腳本
1.概述
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的軟體。
Flume的核心是把資料從資料源(source)收集過來,再将收集到的資料送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存資料(channel),待資料真正到達目的地(sink)後,flume在删除自己緩存的資料。
Flume支援定制各類資料發送方,用于收集各類型資料;同時,Flume支援定制各種資料接受方,用于最終存儲資料。一般的采集需求,通過對flume的簡單配置即可實作。針對特殊場景也具備良好的自定義擴充能力。是以,flume可以适用于大部分的日常資料采集場景。
目前Flume有兩個版本。Flume 0.9X版本的統稱Flume OG(original generation),Flume1.X版本的統稱Flume NG(next generation)。由于Flume NG經過核心元件、核心配置以及代碼架構重構,與Flume OG有很大不同,使用時請注意區分。改動的另一原因是将Flume納入 apache 旗下,Cloudera Flume 改名為 Apache Flume。
2.運作機制
Flume系統中核心的角色是agent,agent本身是一個Java程序,一般運作在日志收集節點。
每一個agent相當于一個資料傳遞員,内部有三個元件:
Source:采集源,用于跟資料源對接,以擷取資料;
Sink:下沉地,采集資料的傳送目的,用于往下一級agent傳遞資料或者往最終存儲系統傳遞資料;
Channel:agent内部的資料傳輸通道,用于從source将資料傳遞到sink;
在整個資料的傳輸的過程中,流動的是event,它是Flume内部資料傳輸的最基本單元。event将傳輸的資料進行封裝。如果是文本檔案,通常是一行記錄,event也是事務的基本機關。event從source,流向channel,再到sink,本身為一個位元組數組,并可攜帶headers(頭資訊)資訊。event代表着一個資料的最小完整單元,從外部資料源來,向外部的目的地去。
一個完整的event包括:event headers、event body、event資訊,其中event資訊就是flume收集到的日記記錄。
3.Flum采集系統建構圖
3.1簡單結構
單個 agent 采集資料
3.2複雜結構
多個 agent 之間串聯
4.采集日志Flume快速入門
4.1安裝部署
- 将apache-flume-1.7.0-bin.tar.gz上傳到linux的/opt/software目錄下
- 解壓apache-flume-1.7.0-bin.tar.gz到/opt/module/目錄
[[email protected] software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
- 修改apache-flume-1.7.0-bin的名稱為flume
[[email protected] module]$ mv apache-flume-1.7.0-bin flume
- 将flume/conf下的flume-env.sh.template檔案修改為flume-env.sh,并配置flume-env.sh檔案
[[email protected] conf]$ mv flume-env.sh.template flume-env.sh [[email protected] conf]$ vim flume-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_144
4.2Flume具體配置
- 在/opt/module/flume/conf目錄下建立file-flume-kafka.conf檔案
[[email protected] conf]$ vim file-flume-kafka.conf
-
配置該檔案
File—>Flume—>Kafka
Flume官網:官網入口
#1.定義元件 # a1即為Agent1 a1.sources=r1 #定義一個資料源為r1 a1.channels=c1 c2 #定義兩個傳輸通道為c1和c2 #2.配置source--configure source #定義讀取資料源r1的讀取方式為talidir a1.sources.r1.type = TAILDIR #設定JSON格式的檔案的位置。該檔案中記錄tialing最後位置,實作斷點續傳 a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json #定義檔案組,如果有多個檔案組以空格的形式隔開 a1.sources.r1.filegroups = f1 #定義要讀取的日志資料位置,即每個檔案組的絕對路徑 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ #是否在event的header中添加檔案的完整路徑資訊 a1.sources.r1.fileHeader = true #指定source(資料源為r1)使用的channel(傳輸通道)為c1 c2 a1.sources.r1.channels = c1 c2 #配置攔截器--interceptor #定義source(資料源為r1)的攔截器有i1 i2 a1.sources.r1.interceptors = i1 i2 #定義攔截器i1為ETL攔截器,攔截不規範的資料 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder #定義攔截器i2為日志類型攔截器,将啟動日志與事件日志分開 a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder #定義source(資料源為r1)的選擇器類型為multiplexing(多路 #Multiplexing 可以選擇該發往哪些channel #Replicating 會将source過來的events發往所有channel a1.sources.r1.selector.type = multiplexing #設定選擇器的鍵值對header的key為topic a1.sources.r1.selector.header = topic #判斷選擇器的鍵值對header的topic對應的value,如果為topic_start則資料通過c1傳輸,反之用c2 a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 #通過這種方法成功将啟動日志與事件日志分離 #配置傳輸通道--configure channel #配置傳輸通道的類型。 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel #kafka叢集主機清單,中間用逗号隔開 a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 #日志類型為啟動日志,資料發往哪一個Kafka中去 a1.channels.c1.kafka.topic = topic_start #如果為true,資料前會有FlumeEvent字首。設定為false則沒有字首 a1.channels.c1.parseAsFlumeEvent = false #設定消費者組,Kafka 僅僅使用 Consumer Group 這一種機制,卻同時實作了傳統消息引擎系統的兩大模型: #如果所有執行個體都屬于同一個 Group,那麼它實作的就是消息隊列模型; #如果所有執行個體分别屬于不同的 Group,那麼它實作的就是釋出 / 訂閱模型。 a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer
4.3Flume的ETL和分類型攔截器
ETL攔截器:主要用于,過濾時間戳不合法和Json資料不完整的日志
日志類型區分攔截器:主要用于,将啟動日志和事件日志區分開來,友善發往Kafka的不同Topic。
前期準備:
- 建立Maven工程flume-interceptor。
- 建立包名com.atguigu.flume.interceptor
4.3.1配置pom.xml檔案
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4.3.2ETL攔截器
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
//初始化
}
@Override
//判斷單個event
public Event intercept(Event event) {
//擷取資料
byte[] body = event.getBody();
String log = new String(body,Charset.forName("UTF-8"));
//判斷資料類型并向Header中指派
if (log.contains("start")) {
if (LogUtils.validateStart(log)) {
return event;
}
} else {
if(LogUtils.validateEvent(log)) {
return event;
}
}
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : list) {
Event intercept1 = intercept(event);
if (intercept1 != null) {
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4.3.3日志過濾工具類
package com.atguigu.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
//切割
String[] logContents = log.split("\\|");
/*
1615265478494|{"cm":
{"ln":"-87.9","sv":"V2.2.2","os":"8.2.8","g":"[email protected]","mid":"997","nw":"3G"
,"l":"en","vc":"1","hw":"640*1136","ar":"MX","uid":"997","t":"1615174356590","la":"-2.8
","md":"Huawei-12","vn":"1.2.8","ba":"Huawei","sr":"M"},"ap":"app"}
*/
//校驗
if (logContents.length != 2) {
return false;
}
//校驗伺服器時間
if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
return false;
}
//校驗Json
//trim() 去掉字元串兩端的空格 不論空格多少
if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")) {
return false;
}
return true;
}
public static boolean validateStart(String log) {
if (log == null) return false;
/*{"action":"1","ar":"MX","ba":"Sumsung","detail":"","en":"start",
"entry":"3","extend1":"","g":"[email protected]","hw":"640*960"}
*/
//校驗json
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) return false;
return true;
}
}
4.3.4日志分類攔截器
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//區分日志類型 body header
//擷取body資料
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
//擷取header
Map<String, String> headers = event.getHeaders();
//判斷判斷資料類型并向header中指派
if (log.contains("start")) {
//啟動日志
headers.put("topic" , "topic_start");
} else {
//事件日志
headers.put("topic" , "topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : list) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4.3.5打包上傳并啟動
- 将不帶依賴的包,上傳到hadoop102的Flume的lib檔案夾下。
[[email protected] lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT.jar
-
将flume-interceptor-1.0-SNAPSHOT.jar分發到hadoop103,hadoop104
前期準備:xysnc分發腳本
分發flume-interceptor-1.0-SNAPSHOT.jar#!/bin/bash #1 擷取輸入參數個數,如果沒有參數,直接退出 pcount=$# if ((pcount==0)); then echo no args; exit; fi #2 擷取檔案名稱 p1=$1 fname=`basename $p1` echo fname=$fname #3 擷取上級目錄的絕對路徑 pdir=`cd -P $(dirname $p1); pwd` echo pdir=$pdir #4 擷取目前使用者名稱 user=`whoami` #5 循環 for host in hadoop102 hadoop103 hadoop104 do echo ------------------- $host -------------- rsync -av $pdir/$fname $user@$host:$pdir done
[[email protected] lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar fname=flume-interceptor-1.0-SNAPSHOT.jar pdir=/opt/module/flume/lib ------------------- hadoop102 -------------- sending incremental file list sent 72 bytes received 12 bytes 168.00 bytes/sec total size is 6667 speedup is 79.37 ------------------- hadoop103 -------------- sending incremental file list flume-interceptor-1.0-SNAPSHOT.jar sent 6782 bytes received 31 bytes 4542.00 bytes/sec total size is 6667 speedup is 0.98 ------------------- hadoop104 -------------- sending incremental file list flume-interceptor-1.0-SNAPSHOT.jar sent 6782 bytes received 31 bytes 13626.00 bytes/sec total size is 6667 speedup is 0.98
- 分發file-flume-kafka.conf到hadoop102,hadoop103
[[email protected] conf]$ pwd /opt/module/flume/conf [[email protected] conf]$ xsync file-flume-kafka.conf fname=file-flume-kafka.conf pdir=/opt/module/flume/conf ------------------- hadoop102 -------------- sending incremental file list sent 59 bytes received 12 bytes 47.33 bytes/sec total size is 1307 speedup is 18.41 ------------------- hadoop103 -------------- sending incremental file list file-flume-kafka.conf sent 1409 bytes received 31 bytes 2880.00 bytes/sec total size is 1307 speedup is 0.91 ------------------- hadoop104 -------------- sending incremental file list file-flume-kafka.conf sent 1409 bytes received 31 bytes 960.00 bytes/sec total size is 1307 speedup is 0.91
5.采集日志FLume啟動腳本
- 在/home/lili/bin目錄下建立腳本f1.sh
[[email protected] bin]$ vim f1.sh #!/bin/bash #flume啟動腳本 #腳本啟動的為hadoop1與hadoop2 #本項目是 flume -- kafka -- flume的形式 #該腳本為第一個flume,即為采集資料輸送到kafka case $1 in "start"){ for i in hadoop102 hadoop103 do echo " --------啟動 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/job/data-warehouse/flume.log 2>&1 &" done };; "stop"){ for i in hadoop102 hadoop103 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac # nohup,該指令可以在你退出帳戶/關閉終端之後繼續運作相應的程序。nohup就是不挂起的意思,不挂斷地運作指令。 # awk預設分隔符為空格 # xargs表示取出前面指令運作的結果,作為後面指令的輸入參數。
- 增加腳本權限
[[email protected] bin]$ chmod 777 f1.sh
- Flume叢集啟動
[[email protected] flume]$ f1.sh start --------啟動 hadoop102 采集flume------- --------啟動 hadoop103 采集flume------- [[email protected] bin]$ jps 17358 Jps 17139 Application
- Flume叢集停止
[[email protected] flume]$ f1.sh stop --------停止 hadoop102 采集flume------- --------停止 hadoop103 采集flume------- [[email protected] bin]$ jps 17378 Jps
6.腳本補充
6.1Hadoop啟動腳本
[[email protected] bin]$ vim hdp
#!/bin/bash
#1 擷取參數個數,如果沒有參數,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi
case $1 in
"start" ){
echo "================ 開始啟動叢集 ================"
# 開啟hadoop、yarn、曆史伺服器
echo "================ 正在啟動HDFS ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/start-dfs.sh'
echo "================ 正在啟動YARN ================"
ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/start-yarn.sh'
echo "================ 正在啟動JobHistoryServer ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh start historyserver'
};;
"stop" ){
echo "================ 開始關閉叢集 ================"
# 關閉hadoop、yarn、曆史伺服器
echo "================ 正在關閉JobHistoryServer ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh stop historyserver'
echo "================ 正在關閉YARN ================"
ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh'
echo "================ 正在關閉HDFS ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh'
};;
esac
6.2生成日志腳本
[[email protected] bin]$ vim lg.sh
#!/bin/bash
for i in hadoop102 hadoop103
do
echo ----------生成日志----------
ssh $i "java -jar /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 >/dev/null 2>&1 &"
done
6.3zookeeper啟動腳本
[[email protected] bin]$ vim zk.sh
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
done
};;
esac
6.3叢集指令腳本
注:對叢集發出指令并傳回結果,多用于程序的檢視
[[email protected] bin]$ vim xcall.sh
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo ----------$i----------
ssh $i "$* "
done
例如:
[[email protected] bin]$ xcall.sh jps
----------hadoop102----------
17469 Jps
----------hadoop103----------
10961 Jps
----------hadoop104----------
11583 Jps
6.4統一叢集時間腳本
[[email protected] bin]$ vim dt.sh
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo ----------$i 同步時間----------
#當用到sudo的時候需要加 -t
ssh -t $i "sudo date -s $1 "
done
例如:
[[email protected] bin]$ dt.sh 16:00:00
----------hadoop102 同步時間----------
[sudo] password for lili:
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop102 closed.
----------hadoop103 同步時間----------
[sudo] password for lili:
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop103 closed.
----------hadoop104 同步時間----------
[sudo] password for lili:
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop104 closed.