天天看點

11SparkStreaming消費kafka以及offset送出一、Receiver方式消費kafka二、Direct方式消費kafka三、消費kafka時offset送出四、手動維護offset簡單代碼示範維護到zookeeper五、kafka中earliest/latest詳解

park streaming流式處理kafka中的資料,第一步是先把資料接收過來,轉換為spark streaming中的資料結構Dstream。接收資料的方式有兩種:1.利用Receiver接收資料,2.直接從kafka讀取資料。

一、Receiver方式消費kafka

這種方式利用接收器(Receiver)來接收kafka中的資料,其最基本是使用Kafka高階使用者API接口。對于所有的接收器,receiver從kafka接收來的資料會存儲在spark的executor記憶體中(如果突然資料暴增,大量batch堆積,很容易出現記憶體溢出的問題),之後spark streaming送出的job會處理這些資料。

在預設的配置下,這種方式可能會因為底層的失敗而丢失資料。如果要啟用高可靠機制,讓資料零丢失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地将接收到的Kafka資料寫入分布式檔案系統(比如HDFS)上的預寫日志中。是以,即使底層節點出現了失敗,也可以使用預寫日志中的資料進行恢複。

Receiver方式如下圖:

11SparkStreaming消費kafka以及offset送出一、Receiver方式消費kafka二、Direct方式消費kafka三、消費kafka時offset送出四、手動維護offset簡單代碼示範維護到zookeeper五、kafka中earliest/latest詳解

使用Receiver方式消費kafka需要注意的點:

  • 1、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以,在KafkaUtils.createStream()中,提高partition的數量,隻會增加一個Receiver中,讀取partition的線程的數量(隻是增加資料拉取的并行度,不是資料處理)。不會增加Spark處理資料的并行度。
  • 2、可以建立多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收資料,之後可以利用union來統一成一個Dstream。
  • 3、如果基于容錯的檔案系統,比如HDFS,啟用了預寫日志機制,接收到的資料都會被複制一份到預寫日志中。是以,在KafkaUtils.createStream()中,設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER。

二、Direct方式消費kafka

在spark1.3之後,引入了Direct方式。不同于Receiver的方式,Direct方式沒有receiver這一層,其會周期性的擷取Kafka中每個topic的每個partition中的最新offsets,進而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用Kafka的簡單consumer api來擷取Kafka指定offset範圍的資料。

Direct方式如下圖:

11SparkStreaming消費kafka以及offset送出一、Receiver方式消費kafka二、Direct方式消費kafka三、消費kafka時offset送出四、手動維護offset簡單代碼示範維護到zookeeper五、kafka中earliest/latest詳解

使用Direct方式的優勢:

  • 1、簡化并行讀取:如果要讀取多個partition,不需要建立多個輸入DStream然後對它們進行union操作。Spark會建立跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取資料。是以Direct方式在Kafka partition和RDD partition之間,有一個一對一的映射關系。
  • 2、高性能:如果要保證零資料丢失,在基于receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為資料實際上被複制了兩份,Kafka自己本身就有高可靠的機制,會對資料複制一份,而這裡又會複制一份到WAL中。而基于direct的方式,不依賴Receiver,不需要開啟WAL機制,隻要Kafka中作了資料的複制,那麼就可以通過Kafka的副本進行恢複。
  • 3、一次且僅一次的事務機制:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中擷取offset值,這也是傳統的從Kafka中讀取資料的方式,但由于Spark Streaming消費的資料和Zookeeper中記錄的offset不同步,這種方式偶爾會造成資料重複消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一緻性。(當然,offset自己記錄的話,可以利用checkpoint、資料庫或檔案記錄或者回寫到zookeeper中或者調用API寫入Kafka的topic中進行記錄)

三、消費kafka時offset送出

首先是思路,有好幾種方式:

  • 1、将offset手動維護到外部媒體中,如Zookeeper、mysql、redis。
  • 2、将offset通過foreachRDD的方式維護到kafka中。
  • 3、通過監聽的方式将offset維護到kafka中,當然也可以是Zookeeper、mysql、redis等外部媒體。
    11SparkStreaming消費kafka以及offset送出一、Receiver方式消費kafka二、Direct方式消費kafka三、消費kafka時offset送出四、手動維護offset簡單代碼示範維護到zookeeper五、kafka中earliest/latest詳解
    11SparkStreaming消費kafka以及offset送出一、Receiver方式消費kafka二、Direct方式消費kafka三、消費kafka時offset送出四、手動維護offset簡單代碼示範維護到zookeeper五、kafka中earliest/latest詳解

    建議使用監聽的方式進行維護,因為kafka和SparkStreaming中維護的分區的對應關系,直接使用foreachRDD的當時也可以,但是這樣會将業務邏輯置于RDD中進行處理,喪失了SparkStreaming特有的算子特性,例如視窗算子之類的,使用監聽方式,繼承Listene中的onBatchCompleted方法,在該方法中實作壽佛那個維護offset即可。

    注意:手動維護offset的時候,最好做一下任務是否有報錯的判斷,防止丢數,當任務有失敗時,不送出offset。

四、手動維護offset簡單代碼示範

維護到zookeeper

  • 使用到了commitAsync() api直接操作Zookeeper

在Kafka 0.10+版本中,offset的預設存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。Spark Streaming也專門提供了commitAsync() API用于送出offset。使用方法如下。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 確定結果都已經正确且幂等地輸出了
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
           

維護到zookeeper

  • 使用到了KafkaCluster 中的api直接操作Zookeeper
package com.bigdata.spark
 
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
import scala.collection.mutable
 
/**
  * 
  *   kafka對接sparkstreaming,手動維護offset到zookeeper
  */
object KafkaStreaming {
 
  def main(args: Array[String]): Unit = {
 
    //初始化ssc
    val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")
 
 
    //kafka參數
    val brokers = "linux1:9092,linux2:9092,linux3:9092"
    val topic = "first"
    val group = "bigdata"
    val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
 
    val kafkaParams = Map(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)
 
 
    //建立KafkaCluster對象,維護offset
    val cluster = new KafkaCluster(kafkaParams)
 
    //擷取初始偏移量
    val fromOffset: Map[TopicAndPartition, Long] = getOffset(cluster, group, topic)
 
    //建立流
    val kafkaStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc, kafkaParams, fromOffset, (mess: MessageAndMetadata[String, String]) => mess.message())
 
    //轉換邏輯
    kafkaStream.map((_, 1)).reduceByKey(_ + _).print()
 
    //送出offset
    setOffset(cluster, kafkaStream, group)
 
 
    ssc.start()
    ssc.awaitTermination()
  }
 
  /**
    * 擷取偏移量
    *
    * @param cluster
    * @param group
    * @param topic
    * @return
    */
  def getOffset(cluster: KafkaCluster, group: String, topic: String) = {
    var partitionToLong = new mutable.HashMap[TopicAndPartition, Long]()
 
    //擷取所有主題的分區
    val topicAndPartition: Either[Err, Set[TopicAndPartition]] = cluster.getPartitions(Set(topic))
 
    val partitions: Set[TopicAndPartition] = topicAndPartition.right.get
 
    //擷取偏移量資訊
    val offsetInfo: Either[Err, Map[TopicAndPartition, Long]] = cluster.getConsumerOffsets(group, partitions)
 
    if (offsetInfo.isRight) {
      // 如果有offset資訊則存儲offset
      val offsets: Map[TopicAndPartition, Long] = offsetInfo.right.get
      for (offset <- offsets) {
        partitionToLong += offset
      }
 
    } else {
      //如果沒有則設定為0
      for (p <- partitions) {
        partitionToLong += (p -> 0L)
      }
    }
 
    partitionToLong.toMap
  }
 
  /**
    * 送出偏移量
    *
    * @param cluster
    * @param kafkaStream
    * @param group
    */
  def setOffset(cluster: KafkaCluster, kafkaStream: InputDStream[String], group: String): Unit = {
    kafkaStream.foreachRDD { rdd =>
      val offsetRangeArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (offset <- offsetRangeArray) {
 
        val ack: Either[Err, Map[TopicAndPartition, Short]] = cluster.setConsumerOffsets(group, Map(offset.topicAndPartition() -> offset.untilOffset))
 
        if (ack.isRight) {
          println(s"成功更新了消費kafka的偏移量:${offset.untilOffset}")
        } else {
          println(s"失敗更新消費kafka的偏移量:${ack.left.get}")
        }
      }
    }
  }
 
}
           

維護到mysql

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{OffsetRange, _}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.kafka.common.TopicPartition
import java.sql.{DriverManager, ResultSet}
import scala.collection.mutable

/**
  *  手動維護偏移量offset到MySQL資料庫中
  */
object SparkKafkaOffset {
  def main(args: Array[String]): Unit = {
  
    //1.準備環境
    val conf = new SparkConf().setAppName("offset").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    
    //五秒中切分一次資料形成一個RDD
    val ssc = new StreamingContext(sc,Seconds(5))
    
    //設定連接配接Kafka的參數
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "cdh01:9092,cdh02:9092,cdh03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaOffset",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("spark_kafka")
    
    //2.使用KafkaUtil連接配接Kafak擷取資料
    val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("SparkKafkaOffset","spark_kafka")
    
    val recordDStream: InputDStream[ConsumerRecord[String, String]] = if(offsetMap.size > 0){
      //有記錄offset,從該offset處開始消費
      KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,//位置政策:該政策,會讓Spark的Executor和Kafka的Broker均勻對應
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetMap))//消費政策
    }else{
      //MySQL中沒有記錄offset,則直接連接配接,從latest開始消費
      KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    }
    //3.操作資料
    //注意:我們要自己手動維護偏移量,也就意味着,消費了一小批資料就應該送出一次offset
    //而這一小批資料在DStream的表現形式就是RDD,是以我們需要對DStream中的RDD進行操作
    //而對DStream中的RDD進行操作的API有transform(轉換)和foreachRDD(動作)
    recordDStream.foreachRDD(rdd=>{
      if(rdd.count() > 0){//目前這一時間批次有資料
        rdd.foreach(record => println("接收到的Kafk發送過來的資料為:" + record))
        //接收到的Kafk發送過來的資料為:ConsumerRecord(topic = spark_kafka, partition = 1, offset = 6, CreateTime = 1565400670211, checksum = 1551891492, serialized key size = -1, serialized value size = 43, key = null, value = hadoop spark ...)
        //注意:通過列印接收到的消息可以看到,裡面有我們需要維護的offset,和要處理的資料
        //接下來可以對資料進行處理....或者使用transform傳回和之前一樣處理
        //維護offset:為了友善我們對offset的維護/管理,spark提供了一個類,幫我們封裝offset的資料
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (o <- offsetRanges){
          println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}")
        }
        //手動送出offset,預設送出到Checkpoint中
        //recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        OffsetUtil.saveOffsetRanges("SparkKafkaDemo",offsetRanges)
      }
    })

   /* val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord
    val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是發過來的value,即一行資料
    val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
    val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
    result.print()*/
    ssc.start()//開啟
    ssc.awaitTermination()//等待優雅停止
  }

  /*
  手動維護offset的工具類
  首先在MySQL建立如下表
    CREATE TABLE `t_offset` (
      `topic` varchar(255) NOT NULL,
      `partition` int(11) NOT NULL,
      `groupid` varchar(255) NOT NULL,
      `offset` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`topic`,`partition`,`groupid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   */
  object OffsetUtil {

    /**
      * 從資料庫讀取偏移量
      */
    def getOffsetMap(groupid: String, topic: String) = {
      val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8", "root", "123456")
      val pstmt = connection.prepareStatement("select * from t_offset where groupid=? and topic=?")
      pstmt.setString(1, groupid)
      pstmt.setString(2, topic)
      val rs: ResultSet = pstmt.executeQuery()
      val offsetMap = mutable.Map[TopicPartition, Long]()
      while (rs.next()) {
        offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
      }
      rs.close()
      pstmt.close()
      connection.close()
      offsetMap
    }

    /**
      * 将偏移量儲存到資料庫
      */
    def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
      val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8", "root", "root")
      //replace into表示之前有就替換,沒有就插入
      val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")
      for (o <- offsetRange) {
        pstmt.setString(1, o.topic)
        pstmt.setInt(2, o.partition)
        pstmt.setString(3, groupid)
        pstmt.setLong(4, o.untilOffset)
        pstmt.executeUpdate()
      }
      pstmt.close()
      connection.close()
    }
  }
}
           

維護到redis

  • 使用資料結構string,其中key為topic:partition,value為offset。

    例如bobo這個topic下有3個分區,則key-value結構如下:

    bobo:0的偏移量為10

    bobo:1的偏移量為12

    bobo:2的偏移量為11

代碼如下:

  • 消費時指定offset
/**
  * kakfa參數
  */
private val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  // 注意這裡是none。
  "auto.offset.reset" -> "none",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// `bobo`topic下有3個分區
private val topicPartitions = Map[String, Int]("bobo" -> 3)

// 從redis中擷取offsets
def getOffsets: Map[TopicPartition, Long] = {
  val jedis = InternalRedisClient.getResource

  // 設定每個分區起始的offset
  val offsets = mutable.Map[TopicPartition, Long]()

  topicPartitions.foreach { it =>
    val topic = it._1
    val partitions = it._2
    // 周遊分區,設定每個topic下對應partition的offset
    for (partition <- 0 until partitions) {
      val topicPartitionKey = topic + ":" + partition
      var lastOffset = 0L
      val lastSavedOffset = jedis.get(topicPartitionKey)

      if (null != lastSavedOffset) {
        try {
          lastOffset = lastSavedOffset.toLong
        } catch {
          case e: Exception =>
            log.error("get lastSavedOffset error", e)
            System.exit(1)
        }
      }
      log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)

      // 添加
      offsets += (new TopicPartition(topic, partition) -> lastOffset)
    }
  }

  InternalRedisClient.returnResource(jedis)

  offsets.toMap
}

/**
  * 建立kakfa流
  *
  * @param ssc StreamingContext
  * @return InputDStream
  */
def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
  val offsets = getOffsets

  // 建立kafka stream
  val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
  )
  stream
}
           

其中:核心是通過ConsumerStrategies.Assign方法來指定topic下對應partition的offset資訊。

  • 更新offset到redis
/**
  * 消費
  *
  * @param stream InputDStream
  */
def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // 擷取offset資訊
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    // 計算相關名額,這裡就統計下條數了
    val total = rdd.count()

    val jedis = InternalRedisClient.getResource
    val pipeline = jedis.pipelined()
    // 會阻塞redis
    pipeline.multi()

    // 更新相關名額
    pipeline.incrBy("totalRecords", total)

    // 更新offset
    offsetRanges.foreach { offsetRange =>
      log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
      val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
      pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
    }

    // 執行,釋放
    pipeline.exec()
    pipeline.sync()
    pipeline.close()
    InternalRedisClient.returnResource(jedis)
  }
}
           

其中使用到了:jedis.pipelined() 、pipeline.multi()。

五、kafka中earliest/latest詳解

  • earliest

    當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,從頭開始消費

  • latest

    當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,消費新産生的該分區下的資料

繼續閱讀