实现功能:
从kafka读取某一主题,消费者组的偏移量
基于读出的offset,创建kafka读取流程
把各个分区的偏移量 保存到redis。
import Kafka010.Utils.{MyKafkaUtils, RedisUtilsDemo}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Test{
def main(args: Array[String]): Unit = {
//创建spark环境
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(s"${this.getClass.getCanonicalName}")
//创建sparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(2))
//创建kafka相关参数
val groupId = "SparkKafka010"
val topics = List("datacollection")
//获取kafka参数,这里是自定义的MyKafkaUtils类,后面给出类的具体内容
val kafkaParams = MyKafkaUtils.getKafkaConsumerParams(groupId, "false")
// 从redis读取offset
val offsets: Map[TopicPartition, Long] = RedisUtilsDemo.getOffsetFromRedis("datacollection", groupId)
//获取kafka数据
val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsets)
)
ds.foreachRDD(rdd => {
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 代表对数据进行处理
if (! rdd.isEmpty())
println(rdd.count)
// 代表对offset进行处理
ranges.foreach(offset =>
println(s"${offset.partition}, ${offset.fromOffset}, ${offset.untilOffset}")
)
//保存offset到redis
RedisUtilsDemo.saveOffsetToRedis(ranges, groupId)
})
ssc.start()
ssc.awaitTermination()
}
}
MyKafkaUtils工具类
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
object MyKafkaUtils {
def getKafkaConsumerParams(groupId: String = "SparkStreaming010", autoCommit: String="true"): Map[String, String] = {
val kafkaParams = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "mini1:9092,mini2:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> autoCommit)
kafkaParams
}
def main(args: Array[String]): Unit = {
}
}
RedisUtilDemo工具类
import cn.bigdata.antispider.common.util.jedis.{JedisConUtil, JedisConnectionUtil}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
object RedisUtilsDemo {
//获取jedis连接
private val jedis = JedisConUtil.getJedisClient()
// 读取offset
// key(kafka:topic:groupid) value(parition:offset;parition:offset)
// 没有考虑 key 不存在的情况,为了方便测试可以先提前set一下
def getOffsetFromRedis(topic: String, groupid: String): Map[TopicPartition, Long] = {
val key = s"kafka:$topic:$groupid"
val offsetStr: String = jedis.get(key)
offsetStr.split(";").map(str => {
val fileds = str.split(":")
val parition: Int = fileds.head.toInt
val offset: Long = fileds.last.toLong
(new TopicPartition(topic, parition) -> offset)
}).toMap
}
// 可以给定多个topic;返回多个topic对应的对应的offset
def getOffsetFromRedis2(topics: Iterator[String], groupid: String): Iterator[Option[Map[TopicPartition, Long]]] = {
topics.map { topic =>
val key = s"kafka:$topic:$groupid"
val offsetStr: String = jedis.get(key)
if (offsetStr != null && offsetStr.trim.size>0 ) {
val offsets = offsetStr.split(";").map { str =>
val fileds = str.split(":")
val parition: Int = fileds.head.toInt
val offset: Long = fileds.last.toLong
(new TopicPartition(topic, parition) -> offset)
}.toMap
Some(offsets)
}
else None
}
}
def getOffsetFromRedis1(topic: String, groupid: String): Option[Map[TopicPartition, Long]] = {
val key = s"kafka:$topic:$groupid"
val offsetStr: String = jedis.get(key)
if (offsetStr != null && offsetStr.trim.size>0 ) {
val offsets = offsetStr.split(";").map(str => {
val fileds = str.split(":")
val parition: Int = fileds.head.toInt
val offset: Long = fileds.last.toLong
(new TopicPartition(topic, parition) -> offset)
}).toMap
Some(offsets)
}
else
None
}
// 保存offset
// key(kafka:topic:groupid) value(parition:offset;parition:offset)
def saveOffsetToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
ranges.map(offsets => (offsets.topic, (offsets.partition, offsets.untilOffset)))
.groupBy(_._1)
.foreach{case ((topic, buffer)) =>
val key = s"kafka:$topic:$groupId"
val value = buffer.map{case (_, (partition, untilOffset)) => s"$partition:$untilOffset"}.mkString(";")
jedis.set(key, value)
}
}
}