receiver方式的代碼測試
package spark.SparkStreaming.test
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import spark.SparkStreaming.kafkaWordCount.updateFunc
/*
使用receiver的方式連接配接kafka,使用zookeeper維護偏移量(可能有點延遲)
*/
object receive2Kafka {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafkawc").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa1")
val kafkaParams = Map[String, String](
"zookeeper.connect"->"mini1:2181,mini2:2181,mini3:2181",
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> classOf[StringDeserializer].getName,
"group.id" -> "g1",
//"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "true"
)
val topics = Map[String,Int]("spark"->3)
val zk = "mini1:2181,mini2:2181,mini3:2181"
val group = "g1"
val data = KafkaUtils.createStream(ssc,zk,group,topics)
//另外一種api連結kafka
// val data1 = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
// ssc,
// kafkaParams,
// topics,
// StorageLevel.MEMORY_ONLY
// )
//累計計算words數量
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,
new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
directNoOffset(直連方式1,沒有基于上次的偏移量)
package spark.SparkStreaming.test
import kafka.serializer.StringDecoder
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object directNoOffset {
def main(args: Array[String]): Unit = {
//SparkSession
val spark: SparkSession = SparkSession.builder()
.appName(directNoOffset.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "mini1:9092,mini2:9092,mini3:9092",
// "key.deserializer" -> classOf[StringDeserializer].getName,
// "value.deserializer" -> classOf[StringDeserializer].getName,
"group.id" -> "g1",
"auto.offset.reset" -> "largest" //smallest
// "enable.auto.commit" -> "true"
)
val ds: DStream[(String, String)]
= KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Set("spark")
)
ds.map(_._2)
.flatMap(_.split("\\s+"))
.filter(_.nonEmpty)
.map((_, 1))
.updateStateByKey((nowBatch: Seq[Int], historyResult: Option[Int]) => Some(nowBatch.sum + historyResult.getOrElse(0)))
.print(100)
//啟動SparkStreaming應用
ssc.start
//等待結束(必須要添加)
ssc.awaitTermination
}
}
directZkOffset(使用zk儲存上次消費的偏移量)
此時在streaming消費的偏移量會更新到zk相應的主題中
package spark.SparkStreaming.test
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object directZkOffset {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafkawc").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "mini1:9092,mini2:9092,mini3:9092",
// "key.deserializer" -> classOf[StringDeserializer].getName,
// "value.deserializer" -> classOf[StringDeserializer].getName,
"group.id" -> "g1"
// "auto.offset.reset" -> "smallest"
// "enable.auto.commit" -> "true"
)
val topics = Set[String]("spark")
val zk = "mini1:2181,mini2:2181,mini3:2181"
val group = "g1"
val km = new KafkaManager(kafkaParams)
val data = km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
data.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
// 先處理消息
processRdd(rdd)
// 再更新offsets
km.updateZKOffsets(rdd)
}
})
ssc.start()
ssc.awaitTermination()
}
def processRdd(rdd: RDD[(String, String)]): Unit = {
rdd.foreach(println)
// wordCounts.foreach(println)
}
}
directCheckpointOffset(基于checkpoint)
package com.ruozedata.bigdata.streaming05
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
object directCheckpointOffset{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("CheckpointOffsetApp")
//沒有會自動建立
val checkpointPath = "C:\\Users\\luoyunfan\\Desktop\\aaa"
val topic="spark"
val interval =10
val kafkaParams
= Map[String, String](
"metadata.broker.list"->"mini1:9092,mini2:9092,mini3:9092",
"auto.offset.reset"->"largest")
val topics = topic.split(",").toSet
def function2CreateStreamingContext()={
val ssc = new StreamingContext(conf,Seconds(5))
//[]裡是[key class], [value class], [key decoder(解碼) class], [value decoder class] ]
//(streamingContext, [map of Kafka parameters], [set of topics to consume])
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams, topics)
ssc.checkpoint(checkpointPath)
messages.checkpoint(Duration(8*10.toInt*1000))
messages.map(_._2)
.flatMap(_.split("\\s+"))
.filter(_.nonEmpty)
.map((_, 1))
.updateStateByKey((nowBatch: Seq[Int], historyResult: Option[Int]) => Some(nowBatch.sum + historyResult.getOrElse(0)))
.print()
ssc
}
//如果檢查點資料存在就根據檢查點資料重建context,如果不存在就根據第二個參數建構context
val ssc =StreamingContext.getOrCreate(checkpointPath,function2CreateStreamingContext)
ssc.start()
ssc.awaitTermination()
}
}