版本:Spark streaming 2.11 Kafka:0.9.0.0 scala:2.11.8
Spark streaming消費kafka主要有兩種方式:receiver方式和直連方式。
一、receiver方式:
1、利用kafka高階API,offset由zookeeper維護。
2、方式:
KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
3、主要實作邏輯:
Receiver方式接收資料的方式如下圖:

Receiver源源不斷的從資料源kafka接收資料,然後發送資料到spark記憶體,最後更新zookeeper中kafka offset的記憶體。
這樣就存在這樣一個問題:
如果receiver接收kafka消息發送到spark記憶體交由spark處理,更新完offset,這個時候如果driver程式挂掉,那麼spark記憶體中正在進行中的資料就會丢失;driver重新開機後,因為kafka中的offset已經更新為最新,那麼會繼續從最新offset處處理資料,就會造成上一次處理資料的丢失。
Receiver方式為了保證資料零丢失,引入了checkpoint和wal機制。
(1)、checkpoint:spark提供的一種容錯機制,主要有兩種:
①元資訊checkpoint:用來儲存DStreamGraph以及相關配置資訊,
以便在Driver崩潰重新開機的時候能夠接着之前進度繼續進行處理;
②消費資料的checkpoint:儲存生成的RDD到一個可靠的存儲系統中,常用的HDFS,通常有狀态的資料橫跨多個batch流的時候,需要做checkpoint.
(2)、WAL預寫日志:receiver接收資料後,先将資料持久化到日志中(HDFS),如果driver重新開機,将會從日志檔案中恢複資料。
啟用checkpoint和wal機制的receiver方式如下圖:
Receiver從kafka擷取資料,将資料通過WAL持久化到HDFS上面,然後發送給spark記憶體進行處理,處理過程中将中繼資料資訊checkpoint到hdfs上面,最後更新kafka的offset。
如果在處理過程中driver挂掉,恢複成功後會從checkpoint目錄中尋找未執行的任務元資訊,然後從wal日志中進行恢複,避免了資料丢失。
但是采用這種方式會存在重複消費的問題,如果最後一步更新kafka offset失敗的話,那麼spark下一次batch會重新從上一次的offset處重新拿去資料,造成另一次處理。
綜上,receiver有如下特點:
1、至少處理一次
2、WAL減少了接收器的吞吐量,因為接受到的資料必須儲存到可靠的分布式檔案系統中;而且kafka和HDFS中存在兩份資料,造成了資源的浪費。
為了解決recevier這些問題,spark streaming1.3引入了kafka直連的方式,而在實際生産環境中,大多數都使用的是直連方式。
二、直連方式:
1、利用kafka低階API,offset由spark checkpoint維護。
2、方式:
createDirectStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Set<String> topics, scala.reflect.ClassTag<K> evidence$19, scala.reflect.ClassTag<V> evidence$20, scala.reflect.ClassTag<KD> evidence$21, scala.reflect.ClassTag<VD> evidence$22)
3、主要優點:
(1)、不采用wal機制,減少了資料備援存儲。
(2)、建立的DStream的rdd的partition做到了和Kafka中topic的partition一一對應。
(3)、基于direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并儲存在checkpoint中。Spark自己一定是同步的,是以可以保證資料是消費一次且僅消費一次。
但是直連方式也有一個特點:因為kafka的offset由checkpoint維護,這就導緻了zookeeper無法知道此時kafka的offset資訊,會導緻一些常用的工具如kafkaOffsetMoniter等無法使用,如果想繼續使用這些監控offset的工具,可以在spark處理完資料後手動更新zookeeper的offset。
三、自定義維護kafka offset
在消費kafka過程中,checkpoint起到至關重要的作用,但是checkpoint有個弊端,每次流式程式更新的時候會報錯誤,因為checkpoint第一次持久化的時候會把整個相關的jar給序列化成一個二進制檔案,每次重新開機都會從裡面恢複,但是當你新的程式打包之後序列化加載的仍然是舊的序列化檔案,這就會導緻報錯或者依舊執行舊代碼,為了解決checkpoint這個問題,我們可以廢棄checkpoint,自己維護offset的變化,具體思路如下:
1、首次啟動,先從第三方存儲媒體(可以使redis或者zk)中找是否有上次存儲的偏移量,如果沒有就從最新的消費,然後儲存偏移量至第三方存儲媒體中
2、如果從第三方存儲媒體中找到了偏移量,那麼就從指定的偏移量處開始消費處理,每個批處理處理完畢後,都會更新新的offset到第三方存儲媒體中, 這樣以來無論是程式故障,還是當機,再次啟動後都會從上次的消費的偏移量處繼續開始消費,而且程式的更新或功能改動新版本的釋出都能正常運作 并做到了消息不丢。
下面看一個具體例子:
val calculateReulstInputDstream = KafkaIO.createCustomDirectKafkaStream(ssc, kafkaParams, calculateResultTopics, dmpRealtimeSequenceUrl,Constants.CROWD_CALCULATE_KAFKA_SEQUENCE_ID)
KafkaIO為定義的工具類,createCustomDirectKafkaStream為定義的擷取kafka資訊的方法,它的主要功能為每次從redis中擷取最新的kafka offset,如果擷取到了,那麼根據offset去kafka叢集擷取消息然後更新最新的offset到redis中;如果擷取不到,則從最新offset處開始擷取資料,并更新offset到redis,具體代碼如下:
def createCustomDirectKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String], key: String): InputDStream[(String, String)] = {
val kafkaStream = readOffsetFromJimDB(dmpRealtimeSequenceUrl, key) match {
case None =>
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] .
(ssc, kafkaParams, topics)
case Some(offset) =>
val msgHandler = (mmd: MessageAndMetadata[String, String]) =>
(mmd.key, mmd.message)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,
String)](ssc, kafkaParams, offset, msgHandler)}
kafkaStream.foreachRDD(rdd => saveOffsetToJimDB(rdd, dmpRealtimeSequenceUrl, key))
kafkaStream
}
def saveOffsetToJimDB(rdd: RDD[_],key: String): Unit = {
var offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
var value = offsets.map(f=>s"${f.topic}:${f.partition}:${f.fromOffset}").mkString(",")
writeOffset(value, key)
}
def readOffsetFromJimDB(dmpRealtimeSequenceUrl: String, key: String): Option[Map[TopicAndPartition, Long]] = {
var value = readOffset(dmpRealtimeSequenceUrl, key)
value.map {
v => v.split(",").map(_.split(":")).map {
case Array(topic, partition, offset) => TopicAndPartition(topic,
partition.toInt) -> offset.toLong
}.toMap
}
}
def writeOffset(offset: String, key: String): Unit = {
// 存儲offset資訊到db
}
def readOffset(key: String): Option[String] = {
var offset: String = null
// 從db中擷取存儲的offset資訊
Option(offset)
}
PS: kafka從0.9版本之後,offset預設不儲存在zk中,而是儲存在broker伺服器上一個名為__consumer_offsets 的Topic中,感興趣的可以了解一下