天天看点

spark消费kafka数据,并把偏移量保存在redis

实现功能:

从kafka读取某一主题,消费者组的偏移量

基于读出的offset,创建kafka读取流程

把各个分区的偏移量 保存到redis。

import Kafka010.Utils.{MyKafkaUtils, RedisUtilsDemo}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test{
  def main(args: Array[String]): Unit = {
    //创建spark环境
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName(s"${this.getClass.getCanonicalName}")
    //创建sparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(2))
    //创建kafka相关参数
    val groupId = "SparkKafka010"
    val topics = List("datacollection")
    //获取kafka参数,这里是自定义的MyKafkaUtils类,后面给出类的具体内容
    val kafkaParams = MyKafkaUtils.getKafkaConsumerParams(groupId, "false")

    // 从redis读取offset
    val offsets: Map[TopicPartition, Long] = RedisUtilsDemo.getOffsetFromRedis("datacollection", groupId)
    //获取kafka数据
    val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsets)
    )

    ds.foreachRDD(rdd => {
      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // 代表对数据进行处理
      if (! rdd.isEmpty())
        println(rdd.count)

      // 代表对offset进行处理
      ranges.foreach(offset =>
        println(s"${offset.partition}, ${offset.fromOffset}, ${offset.untilOffset}")
      )
      //保存offset到redis
      RedisUtilsDemo.saveOffsetToRedis(ranges, groupId)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
           
MyKafkaUtils工具类
           
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

object MyKafkaUtils {
  def getKafkaConsumerParams(groupId: String = "SparkStreaming010", autoCommit: String="true"): Map[String, String] = {
    val kafkaParams = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "mini1:9092,mini2:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> autoCommit)
    kafkaParams
  }

  def main(args: Array[String]): Unit = {
   
  }
}
           

RedisUtilDemo工具类

import cn.bigdata.antispider.common.util.jedis.{JedisConUtil, JedisConnectionUtil}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange


object RedisUtilsDemo {
  //获取jedis连接
  private  val jedis = JedisConUtil.getJedisClient()
  // 读取offset
  // key(kafka:topic:groupid) value(parition:offset;parition:offset)
  // 没有考虑 key 不存在的情况,为了方便测试可以先提前set一下
  def getOffsetFromRedis(topic: String, groupid: String): Map[TopicPartition, Long] = {
    val key = s"kafka:$topic:$groupid"
    val offsetStr: String = jedis.get(key)

      offsetStr.split(";").map(str => {
        val fileds = str.split(":")
        val parition: Int = fileds.head.toInt
        val offset: Long = fileds.last.toLong
        (new TopicPartition(topic, parition) -> offset)
      }).toMap

  }

  // 可以给定多个topic;返回多个topic对应的对应的offset
  def getOffsetFromRedis2(topics: Iterator[String], groupid: String): Iterator[Option[Map[TopicPartition, Long]]] = {
    topics.map { topic =>
      val key = s"kafka:$topic:$groupid"
      val offsetStr: String = jedis.get(key)

      if (offsetStr != null && offsetStr.trim.size>0 ) {
        val offsets = offsetStr.split(";").map { str =>
          val fileds = str.split(":")
          val parition: Int = fileds.head.toInt
          val offset: Long = fileds.last.toLong
          (new TopicPartition(topic, parition) -> offset)
        }.toMap
        Some(offsets)
      }
      else None
    }
  }

  def getOffsetFromRedis1(topic: String, groupid: String): Option[Map[TopicPartition, Long]] = {
    val key = s"kafka:$topic:$groupid"
    val offsetStr: String = jedis.get(key)

    if (offsetStr != null && offsetStr.trim.size>0 ) {
      val offsets = offsetStr.split(";").map(str => {
        val fileds = str.split(":")
        val parition: Int = fileds.head.toInt
        val offset: Long = fileds.last.toLong
        (new TopicPartition(topic, parition) -> offset)
      }).toMap
      Some(offsets)
    }
    else
      None
  }

  // 保存offset
  // key(kafka:topic:groupid) value(parition:offset;parition:offset)
  def saveOffsetToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
    ranges.map(offsets => (offsets.topic, (offsets.partition, offsets.untilOffset)))
      .groupBy(_._1)
      .foreach{case ((topic, buffer)) =>
        val key = s"kafka:$topic:$groupId"
        val value = buffer.map{case (_, (partition, untilOffset)) => s"$partition:$untilOffset"}.mkString(";")
        jedis.set(key, value)
      }
  }

  
  
}