天天看點

Spark streaming與kafka相結合

版本:Spark streaming 2.11   Kafka:0.9.0.0  scala:2.11.8

Spark streaming消費kafka主要有兩種方式:receiver方式和直連方式。

一、receiver方式:

1、利用kafka高階API,offset由zookeeper維護。

2、方式:

KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
           

3、主要實作邏輯:

Receiver方式接收資料的方式如下圖:

Spark streaming與kafka相結合

Receiver源源不斷的從資料源kafka接收資料,然後發送資料到spark記憶體,最後更新zookeeper中kafka offset的記憶體。

這樣就存在這樣一個問題:

如果receiver接收kafka消息發送到spark記憶體交由spark處理,更新完offset,這個時候如果driver程式挂掉,那麼spark記憶體中正在進行中的資料就會丢失;driver重新開機後,因為kafka中的offset已經更新為最新,那麼會繼續從最新offset處處理資料,就會造成上一次處理資料的丢失。

Receiver方式為了保證資料零丢失,引入了checkpoint和wal機制。

(1)、checkpoint:spark提供的一種容錯機制,主要有兩種:

①元資訊checkpoint:用來儲存DStreamGraph以及相關配置資訊,

以便在Driver崩潰重新開機的時候能夠接着之前進度繼續進行處理;

②消費資料的checkpoint:儲存生成的RDD到一個可靠的存儲系統中,常用的HDFS,通常有狀态的資料橫跨多個batch流的時候,需要做checkpoint.

(2)、WAL預寫日志:receiver接收資料後,先将資料持久化到日志中(HDFS),如果driver重新開機,将會從日志檔案中恢複資料。

啟用checkpoint和wal機制的receiver方式如下圖:

Spark streaming與kafka相結合

Receiver從kafka擷取資料,将資料通過WAL持久化到HDFS上面,然後發送給spark記憶體進行處理,處理過程中将中繼資料資訊checkpoint到hdfs上面,最後更新kafka的offset。

如果在處理過程中driver挂掉,恢複成功後會從checkpoint目錄中尋找未執行的任務元資訊,然後從wal日志中進行恢複,避免了資料丢失。

但是采用這種方式會存在重複消費的問題,如果最後一步更新kafka offset失敗的話,那麼spark下一次batch會重新從上一次的offset處重新拿去資料,造成另一次處理。

綜上,receiver有如下特點:

1、至少處理一次

2、WAL減少了接收器的吞吐量,因為接受到的資料必須儲存到可靠的分布式檔案系統中;而且kafka和HDFS中存在兩份資料,造成了資源的浪費。

為了解決recevier這些問題,spark streaming1.3引入了kafka直連的方式,而在實際生産環境中,大多數都使用的是直連方式。

二、直連方式:

1、利用kafka低階API,offset由spark checkpoint維護。

2、方式:

createDirectStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Set<String> topics, scala.reflect.ClassTag<K> evidence$19, scala.reflect.ClassTag<V> evidence$20, scala.reflect.ClassTag<KD> evidence$21, scala.reflect.ClassTag<VD> evidence$22)
           

3、主要優點:

(1)、不采用wal機制,減少了資料備援存儲。

(2)、建立的DStream的rdd的partition做到了和Kafka中topic的partition一一對應。

(3)、基于direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并儲存在checkpoint中。Spark自己一定是同步的,是以可以保證資料是消費一次且僅消費一次。

但是直連方式也有一個特點:因為kafka的offset由checkpoint維護,這就導緻了zookeeper無法知道此時kafka的offset資訊,會導緻一些常用的工具如kafkaOffsetMoniter等無法使用,如果想繼續使用這些監控offset的工具,可以在spark處理完資料後手動更新zookeeper的offset。

三、自定義維護kafka offset

在消費kafka過程中,checkpoint起到至關重要的作用,但是checkpoint有個弊端,每次流式程式更新的時候會報錯誤,因為checkpoint第一次持久化的時候會把整個相關的jar給序列化成一個二進制檔案,每次重新開機都會從裡面恢複,但是當你新的程式打包之後序列化加載的仍然是舊的序列化檔案,這就會導緻報錯或者依舊執行舊代碼,為了解決checkpoint這個問題,我們可以廢棄checkpoint,自己維護offset的變化,具體思路如下:

1、首次啟動,先從第三方存儲媒體(可以使redis或者zk)中找是否有上次存儲的偏移量,如果沒有就從最新的消費,然後儲存偏移量至第三方存儲媒體中

2、如果從第三方存儲媒體中找到了偏移量,那麼就從指定的偏移量處開始消費處理,每個批處理處理完畢後,都會更新新的offset到第三方存儲媒體中, 這樣以來無論是程式故障,還是當機,再次啟動後都會從上次的消費的偏移量處繼續開始消費,而且程式的更新或功能改動新版本的釋出都能正常運作 并做到了消息不丢。

 下面看一個具體例子:

val calculateReulstInputDstream = KafkaIO.createCustomDirectKafkaStream(ssc, kafkaParams, calculateResultTopics, dmpRealtimeSequenceUrl,Constants.CROWD_CALCULATE_KAFKA_SEQUENCE_ID)
           

KafkaIO為定義的工具類,createCustomDirectKafkaStream為定義的擷取kafka資訊的方法,它的主要功能為每次從redis中擷取最新的kafka offset,如果擷取到了,那麼根據offset去kafka叢集擷取消息然後更新最新的offset到redis中;如果擷取不到,則從最新offset處開始擷取資料,并更新offset到redis,具體代碼如下:

def createCustomDirectKafkaStream(ssc:  StreamingContext, kafkaParams: Map[String, String], topics: Set[String], key:  String): InputDStream[(String, String)] = {
    val kafkaStream =  readOffsetFromJimDB(dmpRealtimeSequenceUrl, key) match {
        case None =>
            KafkaUtils.createDirectStream[String, String,  StringDecoder, StringDecoder] . 
            (ssc, kafkaParams, topics)
        case Some(offset) => 
            val msgHandler = (mmd:  MessageAndMetadata[String, String]) => 
            (mmd.key, mmd.message)
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,  (String, 
        String)](ssc, kafkaParams, offset, msgHandler)}
    kafkaStream.foreachRDD(rdd => saveOffsetToJimDB(rdd,  dmpRealtimeSequenceUrl, key))
    kafkaStream
}
     
def saveOffsetToJimDB(rdd: RDD[_],key: String): Unit =  {
    var offsets =  rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    var value = offsets.map(f=>s"${f.topic}:${f.partition}:${f.fromOffset}").mkString(",")
    writeOffset(value, key)
}
        
def readOffsetFromJimDB(dmpRealtimeSequenceUrl: String,  key: String): Option[Map[TopicAndPartition, Long]] = {
    var value = readOffset(dmpRealtimeSequenceUrl, key)
    value.map {
        v =>  v.split(",").map(_.split(":")).map {
            case  Array(topic, partition, offset) => TopicAndPartition(topic,  
          partition.toInt) -> offset.toLong
     }.toMap
    }
}
 
def writeOffset(offset: String, key: String): Unit = {
    // 存儲offset資訊到db
}
 
def readOffset(key: String): Option[String] = {
    var offset: String = null
    //  從db中擷取存儲的offset資訊
    Option(offset)
}
           

PS: kafka從0.9版本之後,offset預設不儲存在zk中,而是儲存在broker伺服器上一個名為__consumer_offsets 的Topic中,感興趣的可以了解一下

繼續閱讀