文章目錄
代碼已上傳至github
https://github.com/2NaCl/sparkstreaming_kafka-flume-demo/
我們首先來看一下架構的圖,友善我們來了解并且複習一下之前所提到的知識。
由外部的軟體實時産生一些資料,然後用flume實時對這些資料進行采集,利用KafkaSink将資料遞接到kafka,做到一個緩存的作用,然後這些消息隊列再作為SparkStreaming的資料源,完成業務運算,最後入庫或者可視化。
然後就來用代碼來表示一下
- 首先我們用代碼來模拟AppServer實時來産生資料。
-
然後我們對這個日志的輸入進行一定的配置
輸出級别為INFO,使用SYSTEM.out的方式在控制台輸出,格式為Pattern所示
輸出如下:
此時,日志的産生就完成了。
- flume的日志采集
streaming.conf
# Name the components on this agent
agent1.sources = avro-source
agent1.channels = logger-channel
agent1.sinks = log-sink
# Describe/configure the source
agent1.sources.avro-source.type = avro
agent1.sources.avro-source.bind = 0.0.0.0
agent1.sources.avro-source.port = 41414
# Describe the channel
agent1.channels.logger-channel.type = memory
agent1.channels.logger-channel.capacity = 1000
agent1.channels.logger-channel.transactionCapacity = 100
# Describe the sink
agent1.sinks.log-sink.type = logger
# Bind the source and sink to the channel
agent1.sources.avro-source.channels = logger-channel
agent1.sinks.log-sink.channel = logger-channel
這時,我們完成了flume的配置
- 然後我們要做到讓産生的日志資訊和flume對接上,先來看看flume官方的Log4j.Appender是如何定義的
然後根據官方進行一下我們的log4j.properties的相關配置
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern= %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.243.20
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
然後添加一個相關的jar包
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.7.0</version>
</dependency>
重新啟動flume
[[email protected] conf]$ flume-ng agent \
> --name agent \ 上面配置的agent名字就是agent
> --conf $FLUME_HOME/conf \ 系統配置的目錄
> --conf-file $FLUME_HOME/conf/streaming.conf \ 系統配置的檔案
> -Dflume.root.logger=INFO,console 将日志列印到控制台
這時,我們已經能讓産生的日志對接到flume上了,緊接着要做的是将flume采集到資料對接到kafka
- flume對接kafka
首先啟動背景的kafka程序
kafka-server-start.sh
-daemon /home/centos01/modules/kafka_2.11-0.11.0.2/config/server.properties
先來看看我們已經建立過的topic
現在我們在這個基礎上再建立一個topic,幫助我們測試這個案例
[[email protected] kafka_2.11-0.11.0.2]$ kafka-topics.sh
--create
--zookeeper linux01:2181
--replication-factor 1
--partitions 1
--topic streamingtopic
建立一個新的flume conf,幫助我們能将采集到的資料對接到kafka,也是來看官方文檔
其實kafka那篇講過配置,就不多說了,下面直接貼上flume conf
# Name the components on this agent
agent1.sources = avro-source
agent1.channels = logger-channel
agent1.sinks = kafka-sink
# Describe/configure the source
agent1.sources.avro-source.type = avro
agent1.sources.avro-source.bind = 0.0.0.0
agent1.sources.avro-source.port = 41414
# Describe the channel
agent1.channels.logger-channel.type = memory
agent1.channels.logger-channel.capacity = 1000
agent1.channels.logger-channel.transactionCapacity = 100
# Describe the sink
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.kafka.topic = streamingtopic
agent1.sinks.kafka-sink.kafka.bootstrap.servers = 192.168.243.20:9092
agent1.sinks.kafka-sink.kafka.flumeBatchSize = 20
agent1.sinks.kafka-sink.kafka.producer.acks = 1
agent1.sinks.kafka-sink.kafka.producer.linger.ms = 1
# Bind the source and sink to the channel
agent1.sources.avro-source.channels = logger-channel
agent1.sinks.kafka-sink.channel = logger-channel
啟動flume,指令就不說了。
然後啟動kafka消費者
[[email protected] kafka_2.11-0.11.0.2]$ kafka-console-consumer.sh
--bootstrap-server linux01:9092
--topic streamingtopic
然後運作程式
成功。
- SparkStreaming處理kafka内的資料,進行業務計算。
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
var Array(zkQuorm,group,topics,numThreads) = args
var sc = new SparkConf().setAppName("kafka").setMaster("local[*]")
var ssc = new StreamingContext(sc,Seconds(5))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val messages = KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)
messages.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}
然後進行系統參數的配置
運作程式之前同時啟動之前的操作: