天天看點

Spark Streaming整合Kafka的兩種方式

Spark Streaming整合Kafka,兩種整合方式:Receiver-based和Direct方式

一:Kafka準備

1、分别啟動zookeeper

./zkServer.sh start
           

2、分别啟動kafka

3、建立topic

./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic kafka-streaming_topic
           

4、通過控制台測試topic能否正常的生産和消費

啟動生産者腳本:

./kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka-streaming_topic
           

啟動消費者腳本:

./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic kafka-streaming_topic --from-beginning
           

準備工作已經就緒。

二:Receiver-based方式整合

1 添加kafka依賴

<!--  kafka依賴-->
 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
     <version>2.2.0</version>
 </dependency>
           

2 本地代碼編寫

package com.kinglone.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {

    if(args.length != 4) {
      System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
    }

    val Array(zkQuorum, group, topics, numThreads) = args

    val sparkConf = new SparkConf()//.setAppName("KafkaReceiverWordCount").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    /**
     * * @param ssc       StreamingContext object
     * * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
     * * @param groupId   The group id for this consumer topic所在的組,可以設定為自己想要的名稱
     * * @param topics    Map of (topic_name to numPartitions) to consume. Each partition is consumed
     * *                  in its own thread
     * * @param storageLevel  Storage level to use for storing the received objects
     * *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
     */
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)

    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
           

3 送出到服器上運作

如果生産中沒有聯網,需要使用 --jars 傳入kafka的jar包

把項目打成jar包

mvn clean package -DskipTests
           

使用local模式送出,送出的腳本:

./spark-submit --class com.kinglone.streaming.KafkaReceiverWordCount 
--master local[2] --name KafkaReceiverWordCount 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 
/opt/script/kafkaReceiverWordCount.jar 
hadoop01:2181 test kafka-streaming_topic 1
           

運作結果

  首先在控制台,啟動kafka生産者,輸入一些單詞,然後,啟動SparkStreaming程式。

Spark Streaming整合Kafka的兩種方式

三:Direct方式整合(推薦使用)

使用的是:Simple Consumer API,自己管理offset,把kfka看成存儲資料的地方,根據offset去讀。沒有使用zk管理消費者的offset,spark自己管理,預設的offset在記憶體中,如果設定了checkpoint,那麼也也有一份,一般要設定。Direct模式生成的Dstream中的RDD的并行度與讀取的topic中的partition一緻(增加topic的partition個數)

注意點:

沒有使用receive,直接查詢的kafka偏移量

1 添加kafka依賴

<!--  kafka依賴-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
           

2 代碼編寫

package com.kinglone.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import _root_.kafka.serializer.StringDecoder

object KafkaDirectWordCount {

  def main(args: Array[String]): Unit = {

        if(args.length != 2) {
          System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
           System.exit(1)
         }

         val Array(brokers, topics) = args

         val sparkConf = new SparkConf()  //.setAppName("KafkaReceiverWordCount").setMaster("local[2]")

         val ssc = new StreamingContext(sparkConf, Seconds(5))

         val topicsSet = topics.split(",").toSet
         val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)

         val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
         ssc,kafkaParams,topicsSet
         )

         messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

         ssc.start()
         ssc.awaitTermination()
       }
}

           

3 送出到伺服器上

./spark-submit --class com.kinglone.streaming.KafkaDirectWordCount 
--master local[2] --name KafkaDirectWordCount 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 
/opt/script/kafkaDirectWordCount.jar hadoop01:9092  kafka-streaming_topic
           
Spark Streaming整合Kafka的兩種方式

4 總結

  注意兩種模式差别,receive模式幾乎被淘汰,可以擴充的地方,1)使程式具備高可用的能力,挂掉之後,能否從上次的狀态恢複過來,2)手動管理offset,改變了業務邏輯也能從上次的狀态恢複過來

繼續閱讀