天天看点

Kafka 数据源、Receiver 和 Direct 方式接收数据_3|学习笔记

开发者学堂课程【大数据实时计算框架 Spark 快速入门:Kafka 数据源、Receiver 和 Direct 方式接收数据_3】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:

https://developer.aliyun.com/learning/course/100/detail/1731

Kafka 数据源、Receiver 和 Direct 方式接收数据_3

不需要 Receivers:

Spark 1.3 引入了这种新的无接收端的“直接”方法,确保了更强的端到端保证。

这种方法不使用接收器来接收数据,而是定期查询 Katka 在每个主题+分区中的最新偏移量,并相应地定义在每个批次中处理的偏移量范围,当处理数据的作业启动时,Kafka 的简单消费者 AP 用于从 Kafka 读取定义的偏移范围(类似于从文件系统读取文件)。

注意,这是 Spark 1.3 中针对 Scala 和 Java AP 引入的实验性特性,在 Spark 1.4 中针对 Python API 引入的实验性特性。  

与基于接收方的方法(即方法 1 )相比,这种方法有以下优点。

简化的并行性:不需要创建多个输入 Katka 流并将它们联合起来。使用directStream. Spark Streaming 将创建与 Katka 分区一样多的 RDD 分区。它们将同时从Katka读取数据。所以 Katka 分区和 RDD 分区之间是一一对应的。这更容易理解和调整。

效率:要在第一种方法中实现零数据丢失,需要将数据存储在 Write Ahead Log中,这将进一步复制数据。

这实际上是低效的,因为数据被有效地复制了两次——一次由 Katka 复制,另一次由 Write Ahead Log 复制。

第二种方法消除了这个问题,因为没有接收器,因此不需要提前写入日志。只要你有足够的 Karka 保留,消息可以从 Kafka 恢复。

Exactly-once-semantics:第一种方法使用 Katka 的高级 APl 在 Zookeeper 中存储消耗的偏移量。

这是传统的来消费来自 Kafka 的数据。而这种方法(结合提前写日志)可以确保零数据丢失(即至少一次语义)。在某些失败情况下,有些记录有可能被消耗两次。

这是因为 Spark Streaming 可靠接收的数据与 Zookeeper 跟踪的偏移量之间不一致。

因此,在第二种方法中,我们使用了不使用 Zookeeper 的简单 Kaka API。

Spark Streaming 在其检查点内跟踪偏移量。这消除了 Spark 流和Zookeeper/Kafka 之间的不一致性,因此每个记录都被 Spark 流有效地精确接收一次,尽管失败。为了实现输出结果的精确一次语义。将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

注意,这种方法的一个缺点是它不会在 Zookeeper 中更新偏移量,基于 Zookeeper 的 Kafka 监控不会显示进展。然而,你可以在每批中访问用这种方法处理的偏移量,并自己更新 Zookeeper。代码如下:

directKafikaStream.transformToPair(

new Function<JavaPairRDD<String,String>,3avaPairRDD<String,String>>( ){

@Override

public JavaPairRDDeString,String ca11(3avaPairRDDeString,Strings rdd) throws Exception {

OffsetRange[] offsets - ((HasOffsetRanges) rdd.rdd().offsetRanges();

offsetRanges.set(offsets);

return rdd;

}

}

). map(

).foreachRDD(

new Function<JavaPairRDD<String,String>, void>(){

@Override

public void call(JavaPairRDD-<String,Strings rdd) throws IOException {

for (OffsetRange o : offsetRanges.get({

System. out.peintln(

o.topic( + " " + o.partition() + " " + o.fromOffset( + "  " + o.unti1offset();

);

}

return null;

}

}

);