天天看點

Kafka SparkStreaming 保證資料不丢失問題 >0.10版本

sparkstreaming 處理kafka資料,幾種資料丢失的情況,

1、雪崩效應導緻的異常 kill掉程序 ,導緻資料丢失

2、程式bug 導緻程序挂了,導緻資料丢失

以上是使用自動送出offset會存在的問題,若要保證資料0丢失,需要使用offset commit api

手動送出offset,自己儲存offset,自己送出處理完的offset。

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 官方提供幾種儲存offset的方式

  1. checkpoint的方式

    問題:資料和offset并不同步無法保證事物的概念,生成小檔案太多,存在hdfs,會造成namenode和datanode的壓力

  2. your own data store :zk、 hbase、。。。

    缺點就是需要維護業務,比較麻煩

官網代碼

// begin from the the offsets committed to the database

val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>

new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")

}.toMap

val stream = KafkaUtils.createDirectStream[String, String](

streamingContext,

PreferConsistent,

Assign

String, String

)

stream.foreachRDD { rdd =>

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val results = yourCalculation(rdd)

// begin your transaction

// update results

// update offsets where the end of existing offsets matches the beginning of this batch of offsets

// assert that offsets were updated correctly

// end your transaction

}

3.Kafka itself kafka本身提供的api自我維護

設定enable.auto.commit to false

//坑,foreachRDD 之前不能使用map orderby等生成新的rdd,這樣offset資訊會丢失

// 業務處理,異步送出

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

這裡的問題就是如果在業務處理完還沒異步送出offset,其實再次啟動消費會重複處理沒送出offset的資料。

如何在保證資料不丢失的同時,對重複資料做處理呢?

----若澤資料           

繼續閱讀