Spark Streaming整合Kafka,兩種整合方式:Receiver-based和Direct方式
一:Kafka準備
1、分别啟動zookeeper
./zkServer.sh start
2、分别啟動kafka
3、建立topic
./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic kafka-streaming_topic
4、通過控制台測試topic能否正常的生産和消費
啟動生産者腳本:
./kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka-streaming_topic
啟動消費者腳本:
./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic kafka-streaming_topic --from-beginning
準備工作已經就緒。
二:Receiver-based方式整合
1 添加kafka依賴
<!-- kafka依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
2 本地代碼編寫
package com.kinglone.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaReceiverWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 4) {
System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf()//.setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
/**
* * @param ssc StreamingContext object
* * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* * @param groupId The group id for this consumer topic所在的組,可以設定為自己想要的名稱
* * @param topics Map of (topic_name to numPartitions) to consume. Each partition is consumed
* * in its own thread
* * @param storageLevel Storage level to use for storing the received objects
* * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
3 送出到服器上運作
如果生産中沒有聯網,需要使用 --jars 傳入kafka的jar包
把項目打成jar包
mvn clean package -DskipTests
使用local模式送出,送出的腳本:
./spark-submit --class com.kinglone.streaming.KafkaReceiverWordCount
--master local[2] --name KafkaReceiverWordCount
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
/opt/script/kafkaReceiverWordCount.jar
hadoop01:2181 test kafka-streaming_topic 1
運作結果
首先在控制台,啟動kafka生産者,輸入一些單詞,然後,啟動SparkStreaming程式。

三:Direct方式整合(推薦使用)
使用的是:Simple Consumer API,自己管理offset,把kfka看成存儲資料的地方,根據offset去讀。沒有使用zk管理消費者的offset,spark自己管理,預設的offset在記憶體中,如果設定了checkpoint,那麼也也有一份,一般要設定。Direct模式生成的Dstream中的RDD的并行度與讀取的topic中的partition一緻(增加topic的partition個數)
注意點:
沒有使用receive,直接查詢的kafka偏移量
1 添加kafka依賴
<!-- kafka依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
2 代碼編寫
package com.kinglone.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import _root_.kafka.serializer.StringDecoder
object KafkaDirectWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2) {
System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topicsSet
)
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
3 送出到伺服器上
./spark-submit --class com.kinglone.streaming.KafkaDirectWordCount
--master local[2] --name KafkaDirectWordCount
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
/opt/script/kafkaDirectWordCount.jar hadoop01:9092 kafka-streaming_topic
4 總結
注意兩種模式差别,receive模式幾乎被淘汰,可以擴充的地方,1)使程式具備高可用的能力,挂掉之後,能否從上次的狀态恢複過來,2)手動管理offset,改變了業務邏輯也能從上次的狀态恢複過來