天天看点

Kafka SparkStreaming 保证数据不丢失问题 >0.10版本

sparkstreaming 处理kafka数据,几种数据丢失的情况,

1、雪崩效应导致的异常 kill掉进程 ,导致数据丢失

2、程序bug 导致进程挂了,导致数据丢失

以上是使用自动提交offset会存在的问题,若要保证数据0丢失,需要使用offset commit api

手动提交offset,自己保存offset,自己提交处理完的offset。

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 官方提供几种保存offset的方式

  1. checkpoint的方式

    问题:数据和offset并不同步无法保证事物的概念,生成小文件太多,存在hdfs,会造成namenode和datanode的压力

  2. your own data store :zk、 hbase、。。。

    缺点就是需要维护业务,比较麻烦

官网代码

// begin from the the offsets committed to the database

val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>

new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")

}.toMap

val stream = KafkaUtils.createDirectStream[String, String](

streamingContext,

PreferConsistent,

Assign

String, String

)

stream.foreachRDD { rdd =>

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val results = yourCalculation(rdd)

// begin your transaction

// update results

// update offsets where the end of existing offsets matches the beginning of this batch of offsets

// assert that offsets were updated correctly

// end your transaction

}

3.Kafka itself kafka本身提供的api自我维护

设置enable.auto.commit to false

//坑,foreachRDD 之前不能使用map orderby等生成新的rdd,这样offset信息会丢失

// 业务处理,异步提交

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

这里的问题就是如果在业务处理完还没异步提交offset,其实再次启动消费会重复处理没提交offset的数据。

如何在保证数据不丢失的同时,对重复数据做处理呢?

----若泽数据           

继续阅读