sparkstreaming 處理kafka資料,幾種資料丢失的情況,
1、雪崩效應導緻的異常 kill掉程序 ,導緻資料丢失
2、程式bug 導緻程序挂了,導緻資料丢失
以上是使用自動送出offset會存在的問題,若要保證資料0丢失,需要使用offset commit api
手動送出offset,自己儲存offset,自己送出處理完的offset。
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 官方提供幾種儲存offset的方式-
checkpoint的方式
問題:資料和offset并不同步無法保證事物的概念,生成小檔案太多,存在hdfs,會造成namenode和datanode的壓力
-
your own data store :zk、 hbase、。。。
缺點就是需要維護業務,比較麻煩
官網代碼
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign
String, String)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
3.Kafka itself kafka本身提供的api自我維護
設定enable.auto.commit to false
//坑,foreachRDD 之前不能使用map orderby等生成新的rdd,這樣offset資訊會丢失
// 業務處理,異步送出
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
這裡的問題就是如果在業務處理完還沒異步送出offset,其實再次啟動消費會重複處理沒送出offset的資料。
如何在保證資料不丢失的同時,對重複資料做處理呢?
----若澤資料