天天看點

SparkStreaming 消費Kafka中資料

SparkStreaming 消費Kafka中資料

import java.text.{DateFormat, SimpleDateFormat}
import java.util.Date

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.Map
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object AdReadKafkaSpark {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("as").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(5))
    ssc.checkpoint("D:\\")
    sc.setLogLevel(logLevel = "WARN")
    val topic = List("xpc")
  val map:scala.collection.immutable.Map[String,Object]=scala.collection.immutable.Map("bootstrap.servers"->"hadoop101:9092","key.deserializer"->classOf[StringDeserializer],
  "value.deserializer"->classOf[StringDeserializer],"group.id"->"xpc")

    val ds = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topic,map))
      val mapDs= ds.map(_.value())
    //x為rdd transfrom是遍歷RDD對rdd進行操作 x.map是對rdd中的每一行進行操作
    val userDS=mapDs.transform(x=>x.map(line=>{
      val arr = line.split("\t")
      val day=new Date(arr(0).toLong)//2019-12-12
      def da(d:Date)  = new SimpleDateFormat("yyyy-MM-dd").format(d)
      val day1=da(day)
      val userID=arr(3)
      val adid=arr(4)
      (day1+"+"+userID+"-"+adid,1)
    }))
    val reduceDS=userDS.reduceByKey(_+_)
    val totalDs=reduceDS.updateStateByKey((nowValues:Seq[Int],bfValues:Option[Int])=>{
      val now = nowValues.sum
      val bf = bfValues.getOrElse(0)
      Option(now+bf)
    })
    totalDs.print()
   // mapDs.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

           

繼續閱讀