receiver方式的代码测试
package spark.SparkStreaming.test
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import spark.SparkStreaming.kafkaWordCount.updateFunc
/*
使用receiver的方式连接kafka,使用zookeeper维护偏移量(可能有点延迟)
*/
object receive2Kafka {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafkawc").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa1")
val kafkaParams = Map[String, String](
"zookeeper.connect"->"mini1:2181,mini2:2181,mini3:2181",
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> classOf[StringDeserializer].getName,
"group.id" -> "g1",
//"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "true"
)
val topics = Map[String,Int]("spark"->3)
val zk = "mini1:2181,mini2:2181,mini3:2181"
val group = "g1"
val data = KafkaUtils.createStream(ssc,zk,group,topics)
//另外一种api链接kafka
// val data1 = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
// ssc,
// kafkaParams,
// topics,
// StorageLevel.MEMORY_ONLY
// )
//累计计算words数量
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,
new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
directNoOffset(直连方式1,没有基于上次的偏移量)
package spark.SparkStreaming.test
import kafka.serializer.StringDecoder
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object directNoOffset {
def main(args: Array[String]): Unit = {
//SparkSession
val spark: SparkSession = SparkSession.builder()
.appName(directNoOffset.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "mini1:9092,mini2:9092,mini3:9092",
// "key.deserializer" -> classOf[StringDeserializer].getName,
// "value.deserializer" -> classOf[StringDeserializer].getName,
"group.id" -> "g1",
"auto.offset.reset" -> "largest" //smallest
// "enable.auto.commit" -> "true"
)
val ds: DStream[(String, String)]
= KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Set("spark")
)
ds.map(_._2)
.flatMap(_.split("\\s+"))
.filter(_.nonEmpty)
.map((_, 1))
.updateStateByKey((nowBatch: Seq[Int], historyResult: Option[Int]) => Some(nowBatch.sum + historyResult.getOrElse(0)))
.print(100)
//启动SparkStreaming应用
ssc.start
//等待结束(必须要添加)
ssc.awaitTermination
}
}
directZkOffset(使用zk保存上次消费的偏移量)
此时在streaming消费的偏移量会更新到zk相应的主题中
package spark.SparkStreaming.test
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object directZkOffset {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafkawc").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("C:\\Users\\luoyunfan\\Desktop\\aaa")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "mini1:9092,mini2:9092,mini3:9092",
// "key.deserializer" -> classOf[StringDeserializer].getName,
// "value.deserializer" -> classOf[StringDeserializer].getName,
"group.id" -> "g1"
// "auto.offset.reset" -> "smallest"
// "enable.auto.commit" -> "true"
)
val topics = Set[String]("spark")
val zk = "mini1:2181,mini2:2181,mini3:2181"
val group = "g1"
val km = new KafkaManager(kafkaParams)
val data = km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
data.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
// 先处理消息
processRdd(rdd)
// 再更新offsets
km.updateZKOffsets(rdd)
}
})
ssc.start()
ssc.awaitTermination()
}
def processRdd(rdd: RDD[(String, String)]): Unit = {
rdd.foreach(println)
// wordCounts.foreach(println)
}
}
directCheckpointOffset(基于checkpoint)
package com.ruozedata.bigdata.streaming05
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
object directCheckpointOffset{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("CheckpointOffsetApp")
//没有会自动创建
val checkpointPath = "C:\\Users\\luoyunfan\\Desktop\\aaa"
val topic="spark"
val interval =10
val kafkaParams
= Map[String, String](
"metadata.broker.list"->"mini1:9092,mini2:9092,mini3:9092",
"auto.offset.reset"->"largest")
val topics = topic.split(",").toSet
def function2CreateStreamingContext()={
val ssc = new StreamingContext(conf,Seconds(5))
//[]里是[key class], [value class], [key decoder(解码) class], [value decoder class] ]
//(streamingContext, [map of Kafka parameters], [set of topics to consume])
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams, topics)
ssc.checkpoint(checkpointPath)
messages.checkpoint(Duration(8*10.toInt*1000))
messages.map(_._2)
.flatMap(_.split("\\s+"))
.filter(_.nonEmpty)
.map((_, 1))
.updateStateByKey((nowBatch: Seq[Int], historyResult: Option[Int]) => Some(nowBatch.sum + historyResult.getOrElse(0)))
.print()
ssc
}
//如果检查点数据存在就根据检查点数据重建context,如果不存在就根据第二个参数构建context
val ssc =StreamingContext.getOrCreate(checkpointPath,function2CreateStreamingContext)
ssc.start()
ssc.awaitTermination()
}
}