整合Flume、Kafka搭建實時日志收集系統
Flume收集某一個目錄的日志,設定kafka sink,Kafka從sink中pull資料進行消費。
實體配置
主機名:s201
zookeeper3.4.12:s201:2181
kafka0.9.0.1:s201:9092
flume1.7.0
spark:2.2.3
flume配置檔案如下:
# 監聽flume_msg的日志,将資料傳到avroSink
exec-memory-avro.sources=execSrc
exec-memory-avro.channels=memoryChannel
exec-memory-avro.sinks=avroSink
exec-memory-avro.sources.execSrc.type=exec
exec-memory-avro.sources.execSrc.command=tail -F /home/hadoop/data/flume/source/flume_msg/data.log
exec-memory-avro.sources.execSrc.shell=/bin/sh -c
exec-memory-avro.sinks.avroSink.type=avro
exec-memory-avro.sinks.avroSink.hostname=s201
exec-memory-avro.sinks.avroSink.port=33333
exec-memory-avro.sources.execSrc.channels=memoryChannel
exec-memory-avro.sinks.avroSink.channel=memoryChannel
exec-memory-avro.channels.memoryChannel.type=memory
exec-memory-avro.channels.memoryChannel.capacity=100
#-----------------------------------------------------------
#-----------------------------------------------------------
# avro将資料傳到kafka的hello_topic中
avro-memory-kafka.sources=avroSource
avro-memory-kafka.sinks=kafkaSink
avro-memory-kafka.channels=memoryChannel
avro-memory-kafka.sources.avroSource.type=avro
avro-memory-kafka.sources.avroSource.bind=s201
avro-memory-kafka.sources.avroSource.port=33333
avro-memory-kafka.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafkaSink.kafka.bootstrap.servers=s201:9092
avro-memory-kafka.sinks.kafkaSink.kafka.topic=hello
avro-memory-kafka.sinks.kafkaSink.batchSize=5
avro-memory-kafka.sinks.kafkaSink.requiredAcks=1
avro-memory-kafka.channels.memoryChannel.type=memory
avro-memory-kafka.channels.memoryChannel.capacity=100
avro-memory-kafka.sources.avroSource.channels=memoryChannel
avro-memory-kafka.sinks.kafkaSink.channel=memoryChannel
kafka中增加hello這個topic,用于接受生産者生産的消息。
kafka-topics.sh --create --zookeeper s201:2181/mykafka --replication-factor 1 --partitions 1 --topic hello
測試方法
- 啟動 avro-memory-kafka這個flume agent,用于接收日志。
- 啟動exec-memory-avro這個flume agent,用于從source發送日志。
- 啟動kafka自帶的消費者,消費hello這個topic。
# 首先啟動avro源,監聽其它伺服器發過來的消息
bin/flume-ng agent \
--name avro-memory-kafka \
--conf conf \
--conf-file conf/avro-memory-kafka.properties \
-Dflume.root.logger=INFO,console
# 監聽data.log日志,一旦有變化将新的消息傳出去
bin/flume-ng agent \
--name exec-memory-avro \
--conf conf \
--conf-file conf/exec-memory-avro.properties \
-Dflume.root.logger=INFO,console
# 啟動kafka消費者,消費hello這個topic
kafka-console-consumer.sh --zookeeper s201:2181/mykafka --topic hello
以上安裝好之後,向data.log寫入資料: echo "hello1" >> data.log ...
新版本flume支援 taildir的source,exec-memory-avro這個agent可以進行修改。
Flume、Kafka整合Sparkstreaming
Flume整合Sparkstreaming
Push方式
windows10上安裝netcat工具,加入環境變量,使用的時候可以直接 nc.exe hostname port即可。
flume配置檔案如下:
# flume_push_streaming.properties
# netcat-memory-avro
flume_push_streaming.sources=netcatSrc
flume_push_streaming.channels=memoryChannel
flume_push_streaming.sinks=avroSink
flume_push_streaming.sources.netcatSrc.type=netcat
flume_push_streaming.sources.netcatSrc.bind=s201
flume_push_streaming.sources.netcatSrc.port=22222
flume_push_streaming.sinks.avroSink.type=avro
# 本地IDE環境所在IP位址
flume_push_streaming.sinks.avroSink.hostname=192.168.204.1
flume_push_streaming.sinks.avroSink.port=33333
flume_push_streaming.sources.netcatSrc.channels=memoryChannel
flume_push_streaming.sinks.avroSink.channel=memoryChannel
flume_push_streaming.channels.memoryChannel.type=memory
flume_push_streaming.channels.memoryChannel.capacity=100
scala代碼如下:
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePush")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// TODO... 如何使用SparkStreaming整合Flume
// FlumeUtils可以将flume的event流轉換為DStream類型,進而進行處理
// 0.0.0.0表示任意網卡都可以
val flumeStream = FlumeUtils.createStream(ssc, "0.0.0.0", 33333)
flumeStream.map(x => new String(x.event.getBody().array()).trim)
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print
ssc.start()
ssc.awaitTermination()
}
}
以上代碼需要注意:FlumeUtils.createStream(ssc, hostname, port) 傳回的類型為 ReceiverInputDStream[SparkFlumeEvent]
測試
# 1. 首先啟動本地IDEA編寫的spark-streaming程式。
# 2. 啟動flume
bin/flume-ng agent \
--name flume_push_streaming \
--conf conf \
--conf-file conf/flume_push_streaming.properties \
-Dflume.root.logger=INFO,console
# 3. 使用netcat想flume源監聽端口發消息
# 此處是在windows環境下使用的nc
nc.exe s201 22222
Pull方式
flume将資料push到sink,資料被緩存。spark-streaming使用a reliable flume receiver 從sink中拉取資料。
flume的配置檔案 sink的type換成 org.apache.spark.streaming.flume.sink.SparkSink
scala代碼中換成:
FlumeUtils.createPollingStream(ssc, “sink machine hostname”, “sink port”)
# flume_pull_streaming.properties
flume_push_streaming.sources=netcatSrc
flume_push_streaming.channels=memoryChannel
flume_push_streaming.sinks=sparkSink # 此處開始不同
flume_push_streaming.sources.execSrc.type=netcat
flume_push_streaming.sources.execSrc.bind=s201
flume_push_streaming.sources.execSrc.port=22222
# 以下開始不同
flume_push_streaming.sinks.sparkSink.type=org.apache.spark.streaming.flume.sink.SparkSink
flume_push_streaming.sinks.sparkSink.hostname=s201
flume_push_streaming.sinks.sparkSink.port=33333
flume_push_streaming.sources.netcatSrc.channels=memoryChannel
flume_push_streaming.sinks.sparkSink.channel=memoryChannel
flume_push_streaming.channels.memoryChannel.type=memory
flume_push_streaming.channels.memoryChannel.capacity=100
小技巧:如果伺服器上需要導入依賴包,可以使用 --jars參數指定依賴包即可
例如:spark-submit --jars org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3 ...
Kafka整合Sparkstreaming
官網:http://spark.apache.org/docs/2.2.3/streaming-kafka-0-8-integration.html
Receiver-based
使用Kafka的高階API将繳費的消息offsets存在zookeeper中,需要通過receiver将資料存儲在Write Ahead Log中,增加了資料被重複複制的開銷,效率不如directStream高。僅僅能夠保證at least once,可能資料會用重複,無法做到exactly once。
Direct Approach【用的比較多】 1.3之後被引入
不使用Receiver接受資料,而是周期性的查詢Kafka每一個topic+partition中最近的offsets,通過Kafka的simple consumer API讀取Kafka中自定義的offset ranges。
這種方式中SparkStreaming建立的RDD partitions和要消費的Kafka partitions是一樣多的,兩者是一一對應的,簡化了并行度。
*使用低階Kafka API将offsets記錄在sparkstreaming的checkpoint中,而不是zookeeper中,能夠保證exactly once語義。
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
Map[String, String]("metadata.broker.list"->brokers), // kafkaparams
topicsSet // 要消費的topic集合
)