SparkStreaming 消費Kafka中資料
import java.text.{DateFormat, SimpleDateFormat}
import java.util.Date
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.Map
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object AdReadKafkaSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("as").setMaster("local[3]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("D:\\")
sc.setLogLevel(logLevel = "WARN")
val topic = List("xpc")
val map:scala.collection.immutable.Map[String,Object]=scala.collection.immutable.Map("bootstrap.servers"->"hadoop101:9092","key.deserializer"->classOf[StringDeserializer],
"value.deserializer"->classOf[StringDeserializer],"group.id"->"xpc")
val ds = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topic,map))
val mapDs= ds.map(_.value())
//x為rdd transfrom是遍歷RDD對rdd進行操作 x.map是對rdd中的每一行進行操作
val userDS=mapDs.transform(x=>x.map(line=>{
val arr = line.split("\t")
val day=new Date(arr(0).toLong)//2019-12-12
def da(d:Date) = new SimpleDateFormat("yyyy-MM-dd").format(d)
val day1=da(day)
val userID=arr(3)
val adid=arr(4)
(day1+"+"+userID+"-"+adid,1)
}))
val reduceDS=userDS.reduceByKey(_+_)
val totalDs=reduceDS.updateStateByKey((nowValues:Seq[Int],bfValues:Option[Int])=>{
val now = nowValues.sum
val bf = bfValues.getOrElse(0)
Option(now+bf)
})
totalDs.print()
// mapDs.print()
ssc.start()
ssc.awaitTermination()
}
}