天天看點

SparkStreaming 手動維護kafka Offset到Mysql執行個體

官網詳解位址 http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

手動送出offset,以保證資料不會丢失,尤其是在網絡抖動嚴重的情況下,但是如果kafka挂掉重新開機後,可能會造成一些其他問題,

例如找不到儲存的offset,這個具體問題再具體分析,先上代碼。

import java.sql.{DriverManager, ResultSet}

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.TopicPartition

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka010.{OffsetRange, _}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**

*

  • 使用Spark-Kafka-0-10版本整合,并手動送出偏移量,維護到MySQL中

    */

object SparkKafkaTest2 {

def main(args: Array[String]): Unit = {

//1.建立StreamingContext
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
//準備連接配接Kafka的參數
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "server1:9092,server2:9092,server3:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "SparkKafkaTest",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
           

val topics = Array("spark_kafka_test").toSet

val recordDStream: DStream[ConsumerRecord[String, String]] = if (offsetMap.size > 0) { //有記錄offset
  println("MySQL中記錄了offset,則從該offset處開始消費")
  KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, //位置政策,源碼強烈推薦使用該政策,會讓Spark的Executor和Kafka的Broker均勻對應
    Subscribe[String, String](topics, kafkaParams, offsetMap)) //消費政策,源碼強烈推薦使用該政策
} else { //沒有記錄offset
  println("沒有記錄offset,則直接連接配接,從latest開始消費")
  KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, //位置政策,源碼強烈推薦使用該政策,會讓Spark的Executor和Kafka的Broker均勻對應
    Subscribe[String, String](topics, kafkaParams)) //消費政策,源碼強烈推薦使用該政策
}

recordDStream.foreachRDD {
  messages =>
    if (messages.count() > 0) { //目前這一時間批次有資料
      messages.foreachPartition { messageIter =>
        messageIter.foreach { message =>
          //println(message.toString())
        }
      }
      val offsetRanges: Array[OffsetRange] = messages.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}")
      }
      //手動送出offset,預設送出到Checkpoint中
      //recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      //實際中偏移量可以送出到MySQL/Redis中
      saveOffsetRanges("SparkKafkaTest", offsetRanges)
    }
}
           

ssc.start()

ssc.awaitTermination()

}

  • 從資料庫讀取偏移量

def getOffsetMap(groupid: String, topic: String) = {

Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web")
val sqlselect = connection.prepareStatement("""
      select * from kafka_offset 
      where groupid=? and topic =?
     """)
sqlselect.setString(1, groupid)
sqlselect.setString(2, topic)
val rs: ResultSet = sqlselect.executeQuery()
val offsetMap = mutable.Map[TopicPartition, Long]()
while (rs.next()) {
  offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
}
rs.close()
sqlselect.close()
connection.close()
offsetMap
           
  • 将偏移量儲存到資料庫

def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {

val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web")
//replace into表示之前有就替換,沒有就插入
val select_ps = connection.prepareStatement("""
  select count(*) as count from kafka_offset
  where  `groupid`=? and `topic`=? and `partition`=?
  """)
val update_ps = connection.prepareStatement("""
  update kafka_offset set  `offset`=?
  where `groupid`=? and `topic`=? and `partition`=?
  """)
val insert_ps = connection.prepareStatement("""
  INSERT INTO kafka_offset(`groupid`, `topic`, `partition`, `offset`) 
  VALUE(?,?,?,?)
  """)
for (o <- offsetRange) {
  select_ps.setString(1, groupid)
  select_ps.setString(2, o.topic)
  select_ps.setInt(3, o.partition)
  val select_resut = select_ps.executeQuery()
  // println(select_resut.)// .getInt("count"))
  while (select_resut.next()) {
    println(select_resut.getInt("count"))
    if (select_resut.getInt("count") > 0) {
      //update
      update_ps.setLong(1, o.untilOffset)
      update_ps.setString(2, groupid)
      update_ps.setString(3, o.topic)
      update_ps.setInt(4, o.partition)
      update_ps.executeUpdate()
    } else {
      //insert
      insert_ps.setString(1, groupid)
      insert_ps.setString(2, o.topic)
      insert_ps.setInt(3, o.partition)
      insert_ps.setLong(4, o.untilOffset)
      insert_ps.executeUpdate()
    }
  }

}
select_ps.close()
update_ps.close()
insert_ps.close()
connection.close()           

如果報錯連不上資料庫或連接配接資料庫位址失敗,請檢視是否添加了mysql用戶端jar包。

--------五維空間s