天天看點

Flume将Tomcat日志收集到Kafka裡面 *

前提

編寫Flume配置檔案

f1.conf

#a1是agent的名稱,a1中定義了一個叫r1的source,如果有多個,使用空格間隔
a1.sources = r1
a1.channels = c1 c2

#組名名.屬性名=屬性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
# 一批寫多少個
a1.sources.r1.batchSize=1000
#讀取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx開頭$代表以什麼結尾 .代表比對任意字元
#+代表比對任意位置
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$

#JSON檔案的儲存位置
a1.sources.r1.positionFile=/root/soft/apache-flume-1.7.0/custdata/log_position.json

#定義攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder

#定義ChannelSelector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
# 要和你自己寫的攔截器一樣才行.
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2


#定義chanel為 KafkaChannel
a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
# Kafka位址
a1.channels.c1.kafka.bootstrap.servers=zjj101:9092,zjj102:9092,zjj103:9092
#定義Kafka主題位址.如果不自己建立的話會自動建立,建議自己建立,自己建立可以指定分區和副本.
a1.channels.c1.kafka.topic=topic_start
# 不希望存header資訊
a1.channels.c1.parseAsFlumeEvent=false

a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers=zjj101:9092,zjj102:9092,zjj103:9092
a1.channels.c2.kafka.topic=topic_event
a1.channels.c2.parseAsFlumeEvent=false

#連接配接元件 同一個source可以對接多個channel,一個sink隻能從一個channel拿資料!
a1.sources.r1.channels=c1 c2      

建立兩個topic

建立 topic_start 和 topic_event ,都是三個分區兩個副本數

Flume将Tomcat日志收集到Kafka裡面 *

啟動Flume收集日志

[root@zjj101 conf]# flume-ng agent -c conf/ -n a1 -f /root/soft/apache-flume-1.7.0/conf/f1.conf -Dflume.root.logger=DEBUG,console      

檢視Flume記錄日志的json檔案

[root@zjj101 custdata]# cat log_position.json 
[{"inode":1870589,"pos":696353,"file":"/tmp/logs/app-2020-10-15.log"}]
[root@zjj101 custdata]#      

用kafka 工具檢視

繼續閱讀