天天看點

Sparkstreaming之KafKa持久化offsets到Zookpeer資料讀取

SparkStreaming+Kafka的兩種模式receiver模式和Direct模式

  • Sparkstreming + kafka recevier模式了解
    Sparkstreaming之KafKa持久化offsets到Zookpeer資料讀取

    receiver模式了解:

    在SparkStreaming程式運作起來後,Executor中會有receiver tasks接收kafka推送過來的資料。資料會被持久化,預設級别為MEMORY_AND_DISK_SER_2,這個級别也可以修改。receiver task對接收過來的資料進行存儲和備份,這個過程會有節點之間的資料傳輸。備份完成後去zookeeper中更新消費偏移量,然後向Driver中的receiver tracker彙報資料的位置。最後Driver根據資料本地化将task分發到不同節點上執行。

    receiver模式中存在的問題:

    當Driver程序挂掉後,Driver下的Executor都會被殺掉,當更新完zookeeper消費偏移量的時候,Driver如果挂掉了,就會存在找不到資料的問題,相當于丢失資料。

  • dirct模式了解
    Sparkstreaming之KafKa持久化offsets到Zookpeer資料讀取
  1. 簡化資料處理流程
  2. 自己定義offset存儲,保證資料0丢失,但是會存在重複消費問題。(解決消費等幂問題)
  3. 不用接收資料,自己去kafka中拉取

開發

  • 引入maven依賴
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>${spark.version}-${cdh.version}</version>
    </dependency>
     <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>${spark.version}-${cdh.version}</version>
    </dependency>
           
  • KafkaManager 類代碼
package org.apache.spark.streaming.kafka


import com.alibaba.fastjson.TypeReference
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import scala.reflect.ClassTag

/**
  * 包名說明 :KafkaCluster是私有類,隻能在spark包中使用,
  *           是以包名保持和 KafkaCluster 一緻才能調用
  * @param kafkaParams
  * @param autoUpdateoffset
  */
class KafkaManager(val kafkaParams: Map[String, String],
                   val autoUpdateoffset:Boolean = true) extends Serializable with Logging{

  @transient
  private var cluster = new KafkaCluster(kafkaParams)

  def kc(): KafkaCluster ={
    if(cluster == null){
      cluster =  new KafkaCluster(kafkaParams);
    }
    cluster
  }

  /**
    * 泛型流讀取器
    * @param ssc
    * @param topics  kafka topics,多個topic按","分割
    * @tparam K   泛型 K
    * @tparam V   泛型 V
    * @tparam KD  scala泛型 KD <: Decoder[K] 說明KD 的類型必須是Decoder[K]的子類型  上下界
    * @tparam VD  scala泛型 VD <: Decoder[V] 說明VD 的類型必須是Decoder[V]的子類型  上下界
    * @return
    */
  def createDirectStream[K: ClassTag, V: ClassTag,
  KD <: Decoder[K]: ClassTag,
  VD <: Decoder[V]: ClassTag](ssc: StreamingContext , topics: Set[String]): InputDStream[(K, V)] =  {

    //擷取消費者組
    val groupId = kafkaParams.get("group.id").getOrElse("default")
    // 在zookeeper上讀取offsets前先根據實際情況更新offsets
    setOrUpdateOffsets(topics, groupId)

    //把所有的offsets處理完成,就可以從zookeeper上讀取offset開始消費message
    val messages = {
      //擷取kafka分區資訊  為了列印資訊
      val partitionsE = kc.getPartitions(topics)
      require(partitionsE.isRight,s"擷取 kafka topic ${topics}`s partition 失敗。" )
      val partitions = partitionsE.right.get
      println("列印分區資訊")
      partitions.foreach(println(_))

      //擷取分區的offset
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      require(consumerOffsetsE.isRight,s"擷取 kafka topic ${topics}`s consumer offsets 失敗。" )
      val consumerOffsets = consumerOffsetsE.right.get
      println("列印消費者分區偏移資訊")
      consumerOffsets.foreach(println(_))
      //讀取資料
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }

    if(autoUpdateoffset){
      //更新offset
      messages.foreachRDD(rdd => {
        logInfo("RDD 消費成功,開始更新zookeeper上的偏移")
        updateZKOffsets(rdd)
      })
    }
    messages
  }

  /**
    * 建立資料流前,根據實際消費情況更新消費offsets
    *
    * @param topics
    * @param groupId
    */
  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {

      //擷取kafka  partions的節點資訊
      val partitionsE = kc.getPartitions(Set(topic))
      logInfo(partitionsE+"")
      //檢測
      require(partitionsE.isRight, s"擷取 kafka topic ${topic}`s partition 失敗。")
      val partitions = partitionsE.right.get

      //擷取最早的 partions offsets資訊
      val earliestLeader = kc.getEarliestLeaderOffsets(partitions)
      val earliestLeaderOffsets = earliestLeader.right.get
      println("kafka中最早的消息偏移")
      earliestLeaderOffsets.foreach(println(_))


      //擷取最末的 partions offsets資訊
      val latestLeader = kc.getLatestLeaderOffsets(partitions)
      val latestLeaderOffsets = latestLeader.right.get
      println("kafka中最末的消息偏移")
      latestLeaderOffsets.foreach(println(_))

      //擷取消費者組的 offsets資訊
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      //如果消費者offset存在
      if (consumerOffsetsE.isRight) {
        /**
          * 如果zk上儲存的offsets已經過時了,即kafka的定時清理政策已經将包含該offsets的檔案删除。
          * 針對這種情況,隻要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時,
          * 這時把consumerOffsets更新為earliestLeaderOffsets
          */
        //如果earliestLeader 存在
        if(earliestLeader.isRight) {
          //擷取最早的offset 也就是最小的offset
          val earliestLeaderOffsets = earliestLeader.right.get
          //擷取消費者組的offset
          val consumerOffsets = consumerOffsetsE.right.get
          // 将 consumerOffsets 和 earliestLeaderOffsets 的offsets 做比較
          // 可能隻是存在部分分區consumerOffsets過時,是以隻更新過時分區的consumerOffsets為earliestLeaderOffsets
          var offsets: Map[TopicAndPartition, Long] = Map()

          consumerOffsets.foreach({ case (tp, n) =>
            val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
            //如果消費者的偏移小于 kafka中最早的offset,那麽,將最早的offset更新到zk
            if (n < earliestLeaderOffset) {
              logWarning("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
                " offsets已經過時,更新為" + earliestLeaderOffset)
              offsets += (tp -> earliestLeaderOffset)
            }
          })
          //設定offsets
          setOffsets(groupId, offsets)
        }
      } else {
        // 消費者還沒有消費過  也就是zookeeper中還沒有消費者的資訊
        if(earliestLeader.isLeft)
          logError(s"${topic} hasConsumed but earliestLeaderOffsets is null。")
        //看是從頭消費還是從末開始消費  smallest表示從頭開始消費
        val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase).getOrElse("smallest")
        //建構消費者 偏移
        var leaderOffsets: Map[TopicAndPartition, Long] = Map.empty
        //從頭消費
        if (reset.equals("smallest")) {
          //分為 存在 和 不存在 最早的消費記錄 兩種情況
          //如果kafka 最小偏移存在,則将消費者偏移設定為和kafka偏移一樣
          if(earliestLeader.isRight){
            leaderOffsets = earliestLeader.right.get.map {
              case (tp, offset) => (tp, offset.offset)
            }
          }else{
            // 如果不存在,則從新建構偏移全部為0 offsets
            leaderOffsets = partitions.map(tp => (tp, 0L)).toMap
          }
        } else {
          //直接擷取最新的offset
          leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get.map {
            case (tp, offset) => (tp, offset.offset)
          }
        }
        //設定offsets
        setOffsets(groupId, leaderOffsets)
      }
    })
  }


  /**
    * 設定消費者組的offsets
    * @param groupId
    * @param offsets
    */
  private def setOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit ={
    if(offsets.nonEmpty){
      //更新offset
      val o = kc.setConsumerOffsets(groupId, offsets)
      logInfo(s"更新zookeeper中消費組為:${groupId} 的 topic offset資訊為: ${offsets}")
      if (o.isLeft) {
        logError(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }

  /**
    * 通過spark的RDD 更新zookeeper上的消費offsets
    * @param rdd
    */
  def updateZKOffsets[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) : Unit = {

    //擷取消費者組
    val groupId = kafkaParams.get("group.id").getOrElse("default")
    //spark使用kafka低階API進行消費的時候,每個partion的offset是儲存在 spark的RDD中,是以這裡可以直接在
    //RDD的 HasOffsetRanges 中擷取倒offsets資訊。因為這個資訊spark不會把則個資訊存儲到zookeeper中,是以
    //我們需要自己實作将這部分offsets資訊存儲到zookeeper中
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //列印出spark中儲存的offsets資訊
    offsetsList.foreach(x=>{
      println("擷取spark 中的偏移資訊"+x)
    })

    for (offsets <- offsetsList) {
      //根據topic和partition 建構topicAndPartition
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      logInfo("将SPARK中的 偏移資訊 存到zookeeper中")
      //将消費者組的offsets更新到zookeeper中
      setOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
    }
  }


  //(null,{"rksj":"1558178497","latitude":"24.000000","imsi":"000000000000000"})
  //讀取kafka流,并将json資料轉為map
  def createJsonToJMapObjectDirectStreamWithOffset(ssc:StreamingContext ,
                                                   topicsSet:Set[String]): DStream[java.util.Map[String,Object]] = {


    //一個轉換器
    val converter = {json:String =>

      println(json)

      var res : java.util.Map[String,Object] = null
      try {
        //JSON轉map的操作
        res = com.alibaba.fastjson.JSON.parseObject(json,
          new TypeReference[java.util.Map[String, Object]]() {})
      } catch {
        case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
      }
      res
    }
    createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
  }


  /**
    * 根據converter建立流資料
    * @param ssc
    * @param topicsSet
    * @param converter
    * @tparam T
    * @return
    */
  def createDirectStreamWithOffset[T:ClassTag](ssc:StreamingContext ,
                                               topicsSet:Set[String], converter:String => T): DStream[T] = {

    createDirectStream[String, String, StringDecoder, StringDecoder](ssc, topicsSet)
      .map(pair =>converter(pair._2))
  }

  def createJsonToJMapDirectStreamWithOffset(ssc:StreamingContext ,
                                             topicsSet:Set[String]): DStream[java.util.Map[String,String]] = {

    val converter = {json:String =>
      var res : java.util.Map[String,String] = null
      try {

        res = com.alibaba.fastjson.JSON.parseObject(json,
          new TypeReference[java.util.Map[String, String]]() {})

      } catch {
        case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
      }
      res
    }
    createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
  }


  /**
    *
    * @param ssc
    * @param topicsSet
    * @return
    */
  def createJsonToJavaBeanDirectStreamWithOffset(ssc:StreamingContext ,
                                                 topicsSet:Set[String]): DStream[Object] = {

    val converter = {json:String =>
      var res : Object = null
      try {

        res = com.alibaba.fastjson.JSON.parseObject(json,
          new TypeReference[Object]() {})

      } catch {
        case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
      }
      res
    }
    createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
  }





  def createStringDirectStreamWithOffset(ssc:StreamingContext ,
                                         topicsSet:Set[String]): DStream[String] = {

    val converter = {json:String =>
      json
    }
    createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
  }






  /**
    * 讀取JSON的流 并将JSON流 轉為MAP流  并且這個流支援RDD向zookeeper中記錄消費資訊
    * @param ssc   spark ssc
    * @param topicsSet topic 集合 支援從多個kafka topic同時讀取資料
    * @return  DStream[java.util.Map[String,String
    */
  def createJsonToJMapStringDirectStreamWithOffset(ssc:StreamingContext , topicsSet:Set[String]): DStream[java.util.Map[String,String]] = {

    val converter = {json:String =>
      var res : java.util.Map[String,String] = null
      try {
        res = com.alibaba.fastjson.JSON.parseObject(json, new TypeReference[java.util.Map[String, String]]() {})
      } catch {
        case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
      }
      res
    }
    createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
  }


  /**
    * 讀取JSON的流 并将JSON流 轉為MAP流  并且這個流支援RDD向zookeeper中記錄消費資訊
    * @param ssc   spark ssc
    * @param topicsSet topic 集合 支援從多個kafka topic同時讀取資料
    * @return  DStream[java.util.Map[String,String
    */
  def createJsonToJMapStringDirectStreamWithoutOffset(ssc:StreamingContext , topicsSet:Set[String]): DStream[java.util.Map[String,String]] = {

    val converter = {json:String =>
      var res : java.util.Map[String,String] = null
      try {
        res = com.alibaba.fastjson.JSON.parseObject(json, new TypeReference[java.util.Map[String, String]]() {})
      } catch {
        case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
      }
      res
    }
    createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
  }


}

object KafkaManager extends Logging{

  def apply(broker:String, groupId:String = "default",
            numFetcher:Int = 1, offset:String = "smallest",
            autoUpdateoffset:Boolean = true): KafkaManager ={
    new KafkaManager(
      createKafkaParam(broker, groupId, numFetcher, offset),
      autoUpdateoffset)
  }

  def createKafkaParam(broker:String, groupId:String = "default",
                       numFetcher:Int = 1, offset:String = "smallest"): Map[String, String] ={

    //建立 stream 時使用的 topic 名字集合
    Map[String, String](
      "metadata.broker.list" -> broker,
      "auto.offset.reset" -> offset,
      "group.id" -> groupId,
      "num.consumer.fetchers" -> numFetcher.toString)
  }

}
           
  • 建構消費入口:
object Kafka2esStreaming extends Serializable with Logging{
  //擷取資料類型
  private val dataTypes: util.Set[String] = DataTypeProperties.dataTypeMap.keySet()

  val kafkaConfig: Properties = ConfigUtil.getInstance().getProperties("kafka/kafka-server-config.properties")

  def main(args: Array[String]): Unit = {

    val topics = args(1).split(",")
     val ssc = SparkContextFactory.newSparkStreamingContext("Kafka2esStreaming", java.lang.Long.valueOf(10))

    //建構kafkaManager
    val kafkaManager = new KafkaManager(
      Spark_Kafka_ConfigUtil.getKafkaParam(kafkaConfig.getProperty("metadata.broker.list"), "TZ3")
    )
    //使用kafkaManager建立DStreaming流
    val kafkaDS = kafkaManager.createJsonToJMapStringDirectStreamWithOffset(ssc, topics.toSet)
                              //添加一個日期分組字段
                              //如果資料其他的轉換,可以先在這裡進行統一轉換
                              .map(map=>{
                                  map.put("index_date",TimeTranstationUtils.Date2yyyyMMddHHmmss(java.lang.Long.valueOf(map.get("collect_time")+"000")))
                                  map
                              }).persist(StorageLevel.MEMORY_AND_DISK)

    //使用par并發集合可以是任務并發執行。在資源充足的情況下
    dataTypes.foreach(datatype=>{
      //過濾出單個類别的資料種類
      val tableDS = kafkaDS.filter(x=>{datatype.equals(x.get("table"))})
      Kafka2EsJob.insertData2Es(datatype,tableDS)
    })

    ssc.start()
    ssc.awaitTermination()
  }
  }
           

繼續閱讀