天天看点

SparkStreaming对接kafka代码测试

receiver方式的代码测试

package spark.SparkStreaming.test

import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import spark.SparkStreaming.kafkaWordCount.updateFunc

/*
使用receiver的方式连接kafka,使用zookeeper维护偏移量(可能有点延迟)
 */
object receive2Kafka {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("kafkawc").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa1")

    val kafkaParams = Map[String, String](
      "zookeeper.connect"->"mini1:2181,mini2:2181,mini3:2181",
      "key.deserializer" -> classOf[StringDeserializer].getName,
      "value.deserializer" -> classOf[StringDeserializer].getName,
      "group.id" -> "g1",
      //"auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "true"
    )
    val topics = Map[String,Int]("spark"->3)
    val zk = "mini1:2181,mini2:2181,mini3:2181"
    val group = "g1"


    val data = KafkaUtils.createStream(ssc,zk,group,topics)
    //另外一种api链接kafka
//    val data1 = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
//                ssc,
//                kafkaParams,
//                topics,
//                StorageLevel.MEMORY_ONLY
//        )


    //累计计算words数量
    val words = data.map(_._2).flatMap(_.split(" "))
    val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,
      new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

           

directNoOffset(直连方式1,没有基于上次的偏移量)

package spark.SparkStreaming.test

import kafka.serializer.StringDecoder
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
object directNoOffset {
  def main(args: Array[String]): Unit = {
    //SparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName(directNoOffset.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa")

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "mini1:9092,mini2:9092,mini3:9092",
      //      "key.deserializer" -> classOf[StringDeserializer].getName,
      //      "value.deserializer" -> classOf[StringDeserializer].getName,
      "group.id" -> "g1",
      "auto.offset.reset" -> "largest" //smallest
      //      "enable.auto.commit" -> "true"
    )

    val ds: DStream[(String, String)] 
    = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      kafkaParams,
      Set("spark")
    )

    ds.map(_._2)
      .flatMap(_.split("\\s+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .updateStateByKey((nowBatch: Seq[Int], historyResult: Option[Int]) => Some(nowBatch.sum + historyResult.getOrElse(0)))
      .print(100)

    //启动SparkStreaming应用
    ssc.start

    //等待结束(必须要添加)
    ssc.awaitTermination
  }
}

           

directZkOffset(使用zk保存上次消费的偏移量)

此时在streaming消费的偏移量会更新到zk相应的主题中

package spark.SparkStreaming.test

import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}



object directZkOffset {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafkawc").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))

    ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa")


    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "mini1:9092,mini2:9092,mini3:9092",

//      "key.deserializer" -> classOf[StringDeserializer].getName,
//      "value.deserializer" -> classOf[StringDeserializer].getName,
      "group.id" -> "g1"
//      "auto.offset.reset" -> "smallest"
//      "enable.auto.commit" -> "true"
    )
    val topics = Set[String]("spark")
    val zk = "mini1:2181,mini2:2181,mini3:2181"
    val group = "g1"

    val km = new KafkaManager(kafkaParams)


    val data = km.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)


    data.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        // 先处理消息
        processRdd(rdd)
        // 再更新offsets
        km.updateZKOffsets(rdd)
      }
    })


    ssc.start()
    ssc.awaitTermination()
  }
  def processRdd(rdd: RDD[(String, String)]): Unit = {
    rdd.foreach(println)
//    wordCounts.foreach(println)
  }
}
           

directCheckpointOffset(基于checkpoint)

package com.ruozedata.bigdata.streaming05

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object directCheckpointOffset{

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("CheckpointOffsetApp")

    //没有会自动创建
    val checkpointPath = "C:\\Users\\luoyunfan\\Desktop\\aaa"
    val topic="spark"
    val interval =10

    val kafkaParams
    = Map[String, String](
      "metadata.broker.list"->"mini1:9092,mini2:9092,mini3:9092",
      "auto.offset.reset"->"largest")

    val topics = topic.split(",").toSet


    def function2CreateStreamingContext()={
      val ssc = new StreamingContext(conf,Seconds(5))
      //[]里是[key class], [value class], [key decoder(解码) class], [value decoder class] ]
      //(streamingContext, [map of Kafka parameters], [set of topics to consume])
      val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams, topics)
      ssc.checkpoint(checkpointPath)
      messages.checkpoint(Duration(8*10.toInt*1000))

      messages.map(_._2)
        .flatMap(_.split("\\s+"))
        .filter(_.nonEmpty)
        .map((_, 1))
        .updateStateByKey((nowBatch: Seq[Int], historyResult: Option[Int]) => Some(nowBatch.sum + historyResult.getOrElse(0)))
        .print()
      ssc
    }

    //如果检查点数据存在就根据检查点数据重建context,如果不存在就根据第二个参数构建context
    val ssc =StreamingContext.getOrCreate(checkpointPath,function2CreateStreamingContext)
    ssc.start()
    ssc.awaitTermination()


  }

}


           

继续阅读