天天看點

Flume + Kafka+sparkstreaming整合Flume、Kafka搭建實時日志收集系統Flume、Kafka整合Sparkstreaming

整合Flume、Kafka搭建實時日志收集系統

Flume收集某一個目錄的日志,設定kafka sink,Kafka從sink中pull資料進行消費。

實體配置

主機名:s201 

zookeeper3.4.12:s201:2181 

kafka0.9.0.1:s201:9092

flume1.7.0

spark:2.2.3

flume配置檔案如下:

# 監聽flume_msg的日志,将資料傳到avroSink
exec-memory-avro.sources=execSrc
exec-memory-avro.channels=memoryChannel
exec-memory-avro.sinks=avroSink

exec-memory-avro.sources.execSrc.type=exec
exec-memory-avro.sources.execSrc.command=tail -F /home/hadoop/data/flume/source/flume_msg/data.log
exec-memory-avro.sources.execSrc.shell=/bin/sh -c

exec-memory-avro.sinks.avroSink.type=avro
exec-memory-avro.sinks.avroSink.hostname=s201
exec-memory-avro.sinks.avroSink.port=33333

exec-memory-avro.sources.execSrc.channels=memoryChannel
exec-memory-avro.sinks.avroSink.channel=memoryChannel

exec-memory-avro.channels.memoryChannel.type=memory
exec-memory-avro.channels.memoryChannel.capacity=100

#-----------------------------------------------------------
#-----------------------------------------------------------

# avro将資料傳到kafka的hello_topic中
avro-memory-kafka.sources=avroSource
avro-memory-kafka.sinks=kafkaSink
avro-memory-kafka.channels=memoryChannel

avro-memory-kafka.sources.avroSource.type=avro
avro-memory-kafka.sources.avroSource.bind=s201
avro-memory-kafka.sources.avroSource.port=33333

avro-memory-kafka.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafkaSink.kafka.bootstrap.servers=s201:9092
avro-memory-kafka.sinks.kafkaSink.kafka.topic=hello
avro-memory-kafka.sinks.kafkaSink.batchSize=5
avro-memory-kafka.sinks.kafkaSink.requiredAcks=1

avro-memory-kafka.channels.memoryChannel.type=memory
avro-memory-kafka.channels.memoryChannel.capacity=100

avro-memory-kafka.sources.avroSource.channels=memoryChannel
avro-memory-kafka.sinks.kafkaSink.channel=memoryChannel
           

kafka中增加hello這個topic,用于接受生産者生産的消息。

kafka-topics.sh --create --zookeeper s201:2181/mykafka --replication-factor 1 --partitions 1 --topic hello
           

測試方法

  1. 啟動 avro-memory-kafka這個flume agent,用于接收日志。
  2. 啟動exec-memory-avro這個flume agent,用于從source發送日志。
  3. 啟動kafka自帶的消費者,消費hello這個topic。
# 首先啟動avro源,監聽其它伺服器發過來的消息
bin/flume-ng agent \
--name avro-memory-kafka \
--conf conf \
--conf-file conf/avro-memory-kafka.properties \
-Dflume.root.logger=INFO,console

# 監聽data.log日志,一旦有變化将新的消息傳出去
bin/flume-ng agent \
--name exec-memory-avro \
--conf conf \
--conf-file conf/exec-memory-avro.properties \
-Dflume.root.logger=INFO,console

# 啟動kafka消費者,消費hello這個topic
kafka-console-consumer.sh --zookeeper s201:2181/mykafka --topic hello
           

 以上安裝好之後,向data.log寫入資料: echo "hello1" >> data.log  ... 

新版本flume支援 taildir的source,exec-memory-avro這個agent可以進行修改。

Flume、Kafka整合Sparkstreaming

Flume整合Sparkstreaming

Push方式

windows10上安裝netcat工具,加入環境變量,使用的時候可以直接 nc.exe hostname port即可。

flume配置檔案如下:

# flume_push_streaming.properties
# netcat-memory-avro
flume_push_streaming.sources=netcatSrc
flume_push_streaming.channels=memoryChannel
flume_push_streaming.sinks=avroSink

flume_push_streaming.sources.netcatSrc.type=netcat
flume_push_streaming.sources.netcatSrc.bind=s201
flume_push_streaming.sources.netcatSrc.port=22222

flume_push_streaming.sinks.avroSink.type=avro
# 本地IDE環境所在IP位址
flume_push_streaming.sinks.avroSink.hostname=192.168.204.1  
flume_push_streaming.sinks.avroSink.port=33333

flume_push_streaming.sources.netcatSrc.channels=memoryChannel
flume_push_streaming.sinks.avroSink.channel=memoryChannel

flume_push_streaming.channels.memoryChannel.type=memory
flume_push_streaming.channels.memoryChannel.capacity=100
           

scala代碼如下:

object FlumePushWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePush")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    // TODO... 如何使用SparkStreaming整合Flume
    // FlumeUtils可以将flume的event流轉換為DStream類型,進而進行處理
    // 0.0.0.0表示任意網卡都可以
    val flumeStream = FlumeUtils.createStream(ssc, "0.0.0.0", 33333)
    flumeStream.map(x => new String(x.event.getBody().array()).trim)
        .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print

    ssc.start()
    ssc.awaitTermination()
  }
}
           

以上代碼需要注意:FlumeUtils.createStream(ssc, hostname, port) 傳回的類型為  ReceiverInputDStream[SparkFlumeEvent]

測試

# 1. 首先啟動本地IDEA編寫的spark-streaming程式。
# 2. 啟動flume
bin/flume-ng agent \
--name flume_push_streaming \
--conf conf \
--conf-file conf/flume_push_streaming.properties \
-Dflume.root.logger=INFO,console
# 3. 使用netcat想flume源監聽端口發消息
# 此處是在windows環境下使用的nc
nc.exe s201 22222
           

Pull方式

flume将資料push到sink,資料被緩存。spark-streaming使用a reliable flume receiver 從sink中拉取資料。

flume的配置檔案 sink的type換成 org.apache.spark.streaming.flume.sink.SparkSink

scala代碼中換成:

FlumeUtils.createPollingStream(ssc, “sink machine hostname”, “sink port”)
           
# flume_pull_streaming.properties
flume_push_streaming.sources=netcatSrc
flume_push_streaming.channels=memoryChannel
flume_push_streaming.sinks=sparkSink  # 此處開始不同

flume_push_streaming.sources.execSrc.type=netcat
flume_push_streaming.sources.execSrc.bind=s201
flume_push_streaming.sources.execSrc.port=22222
# 以下開始不同
flume_push_streaming.sinks.sparkSink.type=org.apache.spark.streaming.flume.sink.SparkSink
flume_push_streaming.sinks.sparkSink.hostname=s201
flume_push_streaming.sinks.sparkSink.port=33333

flume_push_streaming.sources.netcatSrc.channels=memoryChannel
flume_push_streaming.sinks.sparkSink.channel=memoryChannel

flume_push_streaming.channels.memoryChannel.type=memory
flume_push_streaming.channels.memoryChannel.capacity=100
           

小技巧:如果伺服器上需要導入依賴包,可以使用 --jars參數指定依賴包即可

例如:spark-submit --jars org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3 ... 

Kafka整合Sparkstreaming

官網:http://spark.apache.org/docs/2.2.3/streaming-kafka-0-8-integration.html

Receiver-based

使用Kafka的高階API将繳費的消息offsets存在zookeeper中,需要通過receiver将資料存儲在Write Ahead Log中,增加了資料被重複複制的開銷,效率不如directStream高。僅僅能夠保證at least once,可能資料會用重複,無法做到exactly once。

Direct Approach【用的比較多】 1.3之後被引入

不使用Receiver接受資料,而是周期性的查詢Kafka每一個topic+partition中最近的offsets,通過Kafka的simple consumer API讀取Kafka中自定義的offset ranges。

這種方式中SparkStreaming建立的RDD partitions和要消費的Kafka partitions是一樣多的,兩者是一一對應的,簡化了并行度。

*使用低階Kafka API将offsets記錄在sparkstreaming的checkpoint中,而不是zookeeper中,能夠保證exactly once語義。

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc,
  Map[String, String]("metadata.broker.list"->brokers),  // kafkaparams
  topicsSet // 要消費的topic集合
)
           

Flume+Kafka+Sparkstreaming整合

繼續閱讀