天天看點

flume子程序容易死亡的問題解決以及kafka對單條消息的限制

優化flume:

用flume接受tomcat的日志檔案catalina.out,将接受的日志檔案發送到kafka主題。問題是flume經常挂,臨時解決方法是寫腳本自動拉起。

flume主程序不容易挂,容易挂的是子程序,也就是讀取tomcat檔案的指令所再程序容易挂。

flume配置檔案和拉起腳本如下:

flume配置檔案:

# Name the components on this agent
		a1.sources = x1
		a1.channels = c
		a1.sinks = r

		# Describe/configure the source
		a1.sources.x1.type = exec
		a1.sources.x1.command = tail -F /app/tomcat9991/logs/catalina.out
		a1.sources.x1.channels = c

		# Describe the sink
		a1.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
		a1.sinks.r.kafka.topic = xcxInfoLog
		a1.sinks.r.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092
		a1.sinks.r.channel = c

		# Use a channel which buffers events in memory
		a1.channels.c.type = memory
		a1.channels.c.capacity = 5000
		a1.channels.c.transactionCapacity = 5000

		a1.sources.x1.interceptors = i1
		a1.sources.x1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
		a1.sources.x1.interceptors.i1.headerName = key
		a1.sources.x1.interceptors.i1.preserveExisting = false
           

拉起flume程序的腳本:

#小程式Tomcat9991日志,flume中Source端監控并重新開機
		source /home/logv/.bash_profile
		num=1
		result01=''
		result02=`ps -ef|awk '{print $8 $9 $10}'|grep tail-F/app/tomcat9991/logs/catalina.out|grep -v grep|wc -l`
		if [ $result02 -lt $num ]; then
		result01=`ps -ef|grep xcxInfoLogTomcat9991.properties|grep -v grep|awk '{print $2}'`
		kill -9 $result01
		nohup /home/logv/bigdata/flume1.8/bin/flume-ng agent -c /home/logv/bigdata/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/xcxInfoLogTomcat9991.properties -n a1 -Dflume.root.logger=ERROR,console > /home/logv/flumeProperties/logs/xcxInfoLogTomcat9991.log 2>&1 &
		fi
           

解決方案:

提升flume中channel的記憶體:修改flume-env.sh檔案,調整flume-env.sh檔案中agent預設初始化的記憶體:預設是20M,現改成2G

export JAVA_OPTS="-Xms1024m -Xmx2048m -Xss256k -Xmn1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xloggc:/work/app/flume/logs/server-gc.log.$(date +%F) -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=200M"

其他的配置在配置檔案中稍加改動,優化後的配置檔案:

# Name the components on this agent
a2.sources = s1 
a2.channels = c
a2.sinks = s2

# Describe/configure the source
a2.sources.s1.type = exec
a2.sources.s1.command =	tail -F /app/tomcat9991/logs/catalina.out
#增加了該配置,每次doput的容量,也就是source讀取多少event時,将資料發送到channel
a2.sources.s1.batchSize=1000
#增加了該配置,source每隔多久将資料發送到channel
a2.sources.s1.batchTimeout = 3000
a2.sources.s1.channels = c

# Describe the sink
a2.sinks.s2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.s2.kafka.topic = test1
a2.sinks.s2.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092,10.251.27.102:9092,10.251.27.121:9092
#更改了生産者确認機制,該配置項預設是-1.配置成1是一種折中的優化方式,這種方式需要leader成功将資料寫入本地log,但是所有的follower
#是否成功寫入沒有經過确認,這種情況下如果follower沒有成功備份資料,而此時leader又挂掉,則消息會丢失。該方法是性能和安全性的一種折中
a2.sinks.s2.kafka.producer.acks= 1
#增加了該配置,增大了sink批處理的大小,提升處理速度
a2.sinks.s2.flumeBatchSize = 2000
a2.sinks.s2.kafka.producer.max.request.size=20971520
a2.sinks.s2.channel = c

a2.channels.c.type = memory
#增大該配置,增大的是channel的容量
a2.channels.c.capacity = 1000000
#增大該配置,putlist和takelist的大小,該值需小于capacity
a2.channels.c.transactionCapacity = 100000
#增加該配置項,該配置項指event header占用jvm的比例,預設是百分之20,現降低至百分之10
a2.channels.c.byteCapacityBufferPercentage=10
				
a2.sources.s1.interceptors = i1
a2.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a2.sources.s1.interceptors.i1.headerName = key
a2.sources.s1.interceptors.i1.preserveExisting = false
           

啟動指令:

nohup /home/logv/bigData/flume1.8/bin/flume-ng agent -n a2 -c /home/logv/bigData/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/testKafka.properties &

啟動後flume子程序不挂,部分資料也正常寫入kafka,但是報錯:

2018-12-26 23:41:12,598 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
		org.apache.flume.EventDeliveryException: Failed to publish events
			at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:264)
			at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
			at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
			at java.lang.Thread.run(Thread.java:748)
		Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
           

報錯原因提示是請求包含的消息大于伺服器将接受的最大消息大小。這不是flume的錯誤,而是kafka的問題,kafka中,能接受的單條消息的大小是有限制的,預設是1M,由于現有日志中包含圖檔資訊,遠大于1M,是以提升kafka能接受的單條消息的大小程度。有兩種方式,一種是修改某一個topic,一種是修改kafka的配置檔案。

修改某一個topic:/app/bigData/kafka/bin/kafka-topics.sh --zookeeper 10.251.27.123:2181 --alter --topic test1 --config max.message.bytes=209715200 提升至200M

修改配置檔案:在kafka的server.properties配置上添加兩個配置:

#broker能接收消息的最大位元組數

message.max.bytes=209715200

#broker可複制的消息的最大位元組數,該配置項必須不小message.max.bytes,因為該配置項是消費者從partition中擷取消息放入記憶體中所用的記憶體大小,

#如果小于message.max.bytes,可能會導緻給消費者配置設定的記憶體放不下一個message

replica.fetch.max.bytes=209715200

繼續閱讀