天天看點

SparkStreaming整合Kafka&Flume

文章目錄

代碼已上傳至github

https://github.com/2NaCl/sparkstreaming_kafka-flume-demo/

我們首先來看一下架構的圖,友善我們來了解并且複習一下之前所提到的知識。

SparkStreaming整合Kafka&Flume
由外部的軟體實時産生一些資料,然後用flume實時對這些資料進行采集,利用KafkaSink将資料遞接到kafka,做到一個緩存的作用,然後這些消息隊列再作為SparkStreaming的資料源,完成業務運算,最後入庫或者可視化。

然後就來用代碼來表示一下

  1. 首先我們用代碼來模拟AppServer實時來産生資料。
SparkStreaming整合Kafka&Flume
  1. 然後我們對這個日志的輸入進行一定的配置

    輸出級别為INFO,使用SYSTEM.out的方式在控制台輸出,格式為Pattern所示

SparkStreaming整合Kafka&Flume

輸出如下:

SparkStreaming整合Kafka&Flume

此時,日志的産生就完成了。

  1. flume的日志采集

streaming.conf

# Name the components on this agent
agent1.sources = avro-source
agent1.channels = logger-channel
agent1.sinks = log-sink

# Describe/configure the source
agent1.sources.avro-source.type = avro
agent1.sources.avro-source.bind = 0.0.0.0
agent1.sources.avro-source.port = 41414

# Describe the channel
agent1.channels.logger-channel.type = memory
agent1.channels.logger-channel.capacity = 1000
agent1.channels.logger-channel.transactionCapacity = 100

# Describe the sink
agent1.sinks.log-sink.type = logger

# Bind the source and sink to the channel
agent1.sources.avro-source.channels = logger-channel
agent1.sinks.log-sink.channel = logger-channel
           

這時,我們完成了flume的配置

  1. 然後我們要做到讓産生的日志資訊和flume對接上,先來看看flume官方的Log4j.Appender是如何定義的
SparkStreaming整合Kafka&Flume

然後根據官方進行一下我們的log4j.properties的相關配置

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern= %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.243.20
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
           

然後添加一個相關的jar包

<dependency>
            <groupId>org.apache.flume.flume-ng-clients</groupId>
            <artifactId>flume-ng-log4jappender</artifactId>
            <version>1.7.0</version>
        </dependency>
           

重新啟動flume

[[email protected] conf]$ flume-ng agent \
> --name agent \  上面配置的agent名字就是agent
> --conf $FLUME_HOME/conf \  系統配置的目錄
> --conf-file $FLUME_HOME/conf/streaming.conf  \   系統配置的檔案
> -Dflume.root.logger=INFO,console   将日志列印到控制台
           

這時,我們已經能讓産生的日志對接到flume上了,緊接着要做的是将flume采集到資料對接到kafka

  1. flume對接kafka

首先啟動背景的kafka程序

kafka-server-start.sh 
-daemon /home/centos01/modules/kafka_2.11-0.11.0.2/config/server.properties
           

先來看看我們已經建立過的topic

SparkStreaming整合Kafka&amp;Flume

現在我們在這個基礎上再建立一個topic,幫助我們測試這個案例

[[email protected] kafka_2.11-0.11.0.2]$ kafka-topics.sh 
--create 
--zookeeper linux01:2181 
--replication-factor 1 
--partitions 1 
--topic streamingtopic
           

建立一個新的flume conf,幫助我們能将采集到的資料對接到kafka,也是來看官方文檔

SparkStreaming整合Kafka&amp;Flume

其實kafka那篇講過配置,就不多說了,下面直接貼上flume conf

# Name the components on this agent
agent1.sources = avro-source
agent1.channels = logger-channel
agent1.sinks = kafka-sink

# Describe/configure the source
agent1.sources.avro-source.type = avro
agent1.sources.avro-source.bind = 0.0.0.0
agent1.sources.avro-source.port = 41414

# Describe the channel
agent1.channels.logger-channel.type = memory
agent1.channels.logger-channel.capacity = 1000
agent1.channels.logger-channel.transactionCapacity = 100

# Describe the sink
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.kafka.topic = streamingtopic
agent1.sinks.kafka-sink.kafka.bootstrap.servers = 192.168.243.20:9092
agent1.sinks.kafka-sink.kafka.flumeBatchSize = 20
agent1.sinks.kafka-sink.kafka.producer.acks = 1
agent1.sinks.kafka-sink.kafka.producer.linger.ms = 1

# Bind the source and sink to the channel
agent1.sources.avro-source.channels = logger-channel
agent1.sinks.kafka-sink.channel = logger-channel
           

啟動flume,指令就不說了。

然後啟動kafka消費者

[[email protected] kafka_2.11-0.11.0.2]$ kafka-console-consumer.sh 
--bootstrap-server linux01:9092
--topic streamingtopic
           

然後運作程式

SparkStreaming整合Kafka&amp;Flume
SparkStreaming整合Kafka&amp;Flume

成功。

  1. SparkStreaming處理kafka内的資料,進行業務計算。
object KafkaStreamingApp {

  def main(args: Array[String]): Unit = {
    var Array(zkQuorm,group,topics,numThreads) = args

    var sc = new SparkConf().setAppName("kafka").setMaster("local[*]")
    var ssc = new StreamingContext(sc,Seconds(5))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

    messages.map(_._2).count().print()
    ssc.start()
    ssc.awaitTermination()


  }
}
           

然後進行系統參數的配置

SparkStreaming整合Kafka&amp;Flume

運作程式之前同時啟動之前的操作:

SparkStreaming整合Kafka&amp;Flume