優化flume:
用flume接受tomcat的日志檔案catalina.out,将接受的日志檔案發送到kafka主題。問題是flume經常挂,臨時解決方法是寫腳本自動拉起。
flume主程序不容易挂,容易挂的是子程序,也就是讀取tomcat檔案的指令所再程序容易挂。
flume配置檔案和拉起腳本如下:
flume配置檔案:
# Name the components on this agent
a1.sources = x1
a1.channels = c
a1.sinks = r
# Describe/configure the source
a1.sources.x1.type = exec
a1.sources.x1.command = tail -F /app/tomcat9991/logs/catalina.out
a1.sources.x1.channels = c
# Describe the sink
a1.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.r.kafka.topic = xcxInfoLog
a1.sinks.r.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092
a1.sinks.r.channel = c
# Use a channel which buffers events in memory
a1.channels.c.type = memory
a1.channels.c.capacity = 5000
a1.channels.c.transactionCapacity = 5000
a1.sources.x1.interceptors = i1
a1.sources.x1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.x1.interceptors.i1.headerName = key
a1.sources.x1.interceptors.i1.preserveExisting = false
拉起flume程序的腳本:
#小程式Tomcat9991日志,flume中Source端監控并重新開機
source /home/logv/.bash_profile
num=1
result01=''
result02=`ps -ef|awk '{print $8 $9 $10}'|grep tail-F/app/tomcat9991/logs/catalina.out|grep -v grep|wc -l`
if [ $result02 -lt $num ]; then
result01=`ps -ef|grep xcxInfoLogTomcat9991.properties|grep -v grep|awk '{print $2}'`
kill -9 $result01
nohup /home/logv/bigdata/flume1.8/bin/flume-ng agent -c /home/logv/bigdata/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/xcxInfoLogTomcat9991.properties -n a1 -Dflume.root.logger=ERROR,console > /home/logv/flumeProperties/logs/xcxInfoLogTomcat9991.log 2>&1 &
fi
解決方案:
提升flume中channel的記憶體:修改flume-env.sh檔案,調整flume-env.sh檔案中agent預設初始化的記憶體:預設是20M,現改成2G
export JAVA_OPTS="-Xms1024m -Xmx2048m -Xss256k -Xmn1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xloggc:/work/app/flume/logs/server-gc.log.$(date +%F) -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=200M"
其他的配置在配置檔案中稍加改動,優化後的配置檔案:
# Name the components on this agent
a2.sources = s1
a2.channels = c
a2.sinks = s2
# Describe/configure the source
a2.sources.s1.type = exec
a2.sources.s1.command = tail -F /app/tomcat9991/logs/catalina.out
#增加了該配置,每次doput的容量,也就是source讀取多少event時,将資料發送到channel
a2.sources.s1.batchSize=1000
#增加了該配置,source每隔多久将資料發送到channel
a2.sources.s1.batchTimeout = 3000
a2.sources.s1.channels = c
# Describe the sink
a2.sinks.s2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.s2.kafka.topic = test1
a2.sinks.s2.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092,10.251.27.102:9092,10.251.27.121:9092
#更改了生産者确認機制,該配置項預設是-1.配置成1是一種折中的優化方式,這種方式需要leader成功将資料寫入本地log,但是所有的follower
#是否成功寫入沒有經過确認,這種情況下如果follower沒有成功備份資料,而此時leader又挂掉,則消息會丢失。該方法是性能和安全性的一種折中
a2.sinks.s2.kafka.producer.acks= 1
#增加了該配置,增大了sink批處理的大小,提升處理速度
a2.sinks.s2.flumeBatchSize = 2000
a2.sinks.s2.kafka.producer.max.request.size=20971520
a2.sinks.s2.channel = c
a2.channels.c.type = memory
#增大該配置,增大的是channel的容量
a2.channels.c.capacity = 1000000
#增大該配置,putlist和takelist的大小,該值需小于capacity
a2.channels.c.transactionCapacity = 100000
#增加該配置項,該配置項指event header占用jvm的比例,預設是百分之20,現降低至百分之10
a2.channels.c.byteCapacityBufferPercentage=10
a2.sources.s1.interceptors = i1
a2.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a2.sources.s1.interceptors.i1.headerName = key
a2.sources.s1.interceptors.i1.preserveExisting = false
啟動指令:
nohup /home/logv/bigData/flume1.8/bin/flume-ng agent -n a2 -c /home/logv/bigData/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/testKafka.properties &
啟動後flume子程序不挂,部分資料也正常寫入kafka,但是報錯:
2018-12-26 23:41:12,598 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:264)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
報錯原因提示是請求包含的消息大于伺服器将接受的最大消息大小。這不是flume的錯誤,而是kafka的問題,kafka中,能接受的單條消息的大小是有限制的,預設是1M,由于現有日志中包含圖檔資訊,遠大于1M,是以提升kafka能接受的單條消息的大小程度。有兩種方式,一種是修改某一個topic,一種是修改kafka的配置檔案。
修改某一個topic:/app/bigData/kafka/bin/kafka-topics.sh --zookeeper 10.251.27.123:2181 --alter --topic test1 --config max.message.bytes=209715200 提升至200M
修改配置檔案:在kafka的server.properties配置上添加兩個配置:
#broker能接收消息的最大位元組數
message.max.bytes=209715200
#broker可複制的消息的最大位元組數,該配置項必須不小message.max.bytes,因為該配置項是消費者從partition中擷取消息放入記憶體中所用的記憶體大小,
#如果小于message.max.bytes,可能會導緻給消費者配置設定的記憶體放不下一個message
replica.fetch.max.bytes=209715200