- 運用場景:我們機器上每天或者定期都要跑很多任務,很多時候任務出現錯誤不能及時發現,導緻發現的時候任務已經挂了很久了。
- 解決方法:基于 Flume+Kafka+Spark Streaming 的架構對這些任務的輸出日志進行實時監控,當檢測到日志出現Error的資訊就發送郵件給項目的負責人。
- 目的:通過這個小項目熟悉基于 Flume+Kafka+Spark Streaming 架構實時分析處理日志,能用到真實項目就更好了。
一、Flume
Flume是用來收集、彙聚并且傳輸日志資料Kafka去。可以設定多個sources對應多個任務的日志,到一個kafka sinks。配置檔案如下:
#define agent
agent_log.sources = s1 s2
agent_log.channels = c1
agent_log.sinks = k1
#define sources.s1
agent_log.sources.s1.type=exec
agent_log.sources.s1.command=tail -F /data/log1.log
#define sources.s2
agent_log.sources.s2.type=exec
agent_log.sources.s2.command=tail -F /data/log2.log
#定義攔截器
agent_log.sources.s1.interceptors = i1
agent_log.sources.s1.interceptors.i1.type = static
agent_log.sources.s1.interceptors.i1.preserveExisting = false
agent_log.sources.s1.interceptors.i1.key = projectName
agent_log.sources.s1.interceptors.i1.value= project1
agent_log.sources.s2.interceptors = i2
agent_log.sources.s2.interceptors.i2.type = static
agent_log.sources.s2.interceptors.i2.preserveExisting = false
agent_log.sources.s2.interceptors.i2.key = projectName
agent_log.sources.s2.interceptors.i2.value= project2
#define channels
agent_log.channels.c1.type = memory
agent_log.channels.c1.capacity = 1000
agent_log.channels.c1.transactionCapacity = 1000
#define sinks
#設定Kafka接收器
agent_log.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#設定Kafka的broker位址和端口号
agent_log.sinks.k1.brokerList=cdh1:9092,cdh2:9092,cdh3:9092
#設定Kafka的Topic
agent_log.sinks.k1.topic=result_log
#包含header
agent_log.sinks.k1.useFlumeEventFormat = true
#設定序列化方式
agent_log.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent_log.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition
agent_log.sinks.k1.partition.key=1
agent_log.sinks.k1.request.required.acks=0
agent_log.sinks.k1.max.message.size=1000000
agent_log.sinks.k1.agent_log.type=sync
agent_log.sinks.k1.custom.encoding=UTF-8
# bind the sources and sinks to the channels
agent_log.sources.s1.channels=c1
agent_log.sources.s2.channels=c1
agent_log.sinks.k1.channel=c1
執行flume-ng指令啟動flume:
flume-ng agent -c /etc/flume-ng/conf -f result_log.conf -n agent_log
二、Kafka
Kafka是一個消息系統,可以緩沖消息。Flume收集的日志傳送到Kafka消息隊列中(Flume作為生産者),然後就可以被Spark Streaming消費了,而且可以保證不丢失資料。kafka的具體知識可以閱讀:https://www.cnblogs.com/likehua/p/3999538.html
#建立result_log主題
kafka-topics --zookeeper cdh1:2181,cdh1:2181,cdh3:2181 --create --topic result_log --partitions 3 --replication-factor 1
#測試-檢視kafka主題清單,觀察result_log是否建立成功
kafka-topics --list --zookeeper cdh1:2181,cdh1:2181,cdh3:2181
#測試-啟動一個消費者測試flume傳輸日志到kafka這一環節是否正常運作
kafka-console-consumer --bootstrap-server cdh1:9092,cdh1:9092,cdh3:9092 --topic result_log
三、Spark Streaming (程式設計語言:scala,開發工具:Idea)
建立一個maven項目,配置pom.xml添加依賴。//具體見項目代碼
我們用Zookeeper來管理spark streaming 消費者的offset。調用
KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams, newOffset))
與kafka建立連接配接,傳回InputDStream,擷取資料流,
stream.foreachRDD(rdd => {//處理程式}) //處理資料流。
val ssc = new StreamingContext(sc, Durations.seconds(60))
ssc.start() //啟動ssc
發送郵件的功能配置org.apache.commons.mail這個包的 HtmlEmail 這個類,調用 HtmlEmail.send 發送郵件。
編寫一個start.sh腳本啟動 Spark Streaming 程式,最後 sh start.sh 啟動腳本。
#!/bin/bash
export HADOOP_USER_NAME=hdfs
spark2-submit \
--master yarn \
--deploy-mode client \
--executor-cores 3 \
--num-executors 10 \
--driver-memory 2g \
--executor-memory 1G \
--conf spark.default.parallelism=30 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
--conf spark.reducer.maxSizeInFlight=128m \
--driver-class-path mysql-connector-java-5.1.38.jar \
--jars mysql-connector-java-5.1.38.jar,qqwry-java-0.7.0.jar,fastjson-1.2.47.jar,spark-streaming-kafka-10_2.11-2.2.0.jar,hive-hbase-handler-1.1.0-cdh5.13.0.jar,commons-email-1.5.jar,commons-email-1.5-sources.jar,mail-1.4.7.jar \
--class com.lin.monitorlog.mianer.Handler \
monitorLog.jar
#[END]
spark streaming 程式代碼連結: https://download.csdn.net/download/linge1995/10576773