Receiver
receiver是使用kafka的高级consumer API来实现的,Receiver每个一段batch时间去kafka获取这段时间里最新的相应topic数据,从kafka哪里获取来的数据都存在Spark Excutor的内存中,然后Spark Streaming启动job去处理这些数据。
其中,谁来消费分区不是由SparkStreaming来决定的,而是由高阶消费者决定的,它决定分区向消费者的分配,即高阶消费者Api决定消费者消费哪个分区。高阶消费者会根据配置参数决定,消费者读取数据后什么时候提交offset。
这会引起一个问题,当Spark Streaming中的Receiver读取Kafka分区数据时,假设读取了100条数据,高阶消费者API会执行offset的提交,例如每隔3秒,这100条数据就是RDD,假设此RDD还没有处理完,高阶消费者API执行了offset提交,但是Spark Streaming挂掉了,由于RDD在内存中,那么RDD的数据就丢失了,如果想重新拿数据,从哪里去拿不是由Spark Streaming说了算的,是由高阶API决定的,由于offset已经提交,高阶API认为这个数据Spark Streaming已经拿过了,再拿要拿100条以后的数据,那么之前丢失的100条数据就永远丢失了。
针对这一问题,Spark Streaming设计了一个规则,即Spark Streaming预写日志规则(Write Ahead Log,WAL),每读取一批数据,会写一个WAL文件,在WAL文件中,读了多少条就写多少条,WAL文件存储于HDFS上。假设RDD中有100条数据,那么WAL文件中也有100条数据,此时如果Spark Streaming挂掉,那么回去读取HDFS上的WAL文件,把WAL文件中的100条数据取出再生成RDD,然后再去消费。由于这一设计需要写HDFS,会对整体性能造成影响。
假设有6个分区,高阶消费者的话会在Spark集群的Worker上启动Receiver,有6个分区则会用6个线程去读取分区数据,这是在一个Worker的一个Receiver中有6个线程同时读取6个分区的数据,随着数据量越来越大,数据读取会成为瓶颈,此时可以创建多个Receiver分散读取分区数据,然后每个Receiver创建一个Dstream,再把这些流全部都合并起来,然后进行计算。读取时,一方面把RDD放在内存中,一方面写HDFS中的WAL文件。
根据上面的情景,又要创建多个Receiver,又要进行合并,又要在内存中存储RDD,又要写HDFS上的WAL文件,高级API的缺点还是比较多的。
高阶消费者是由高阶消费者API自己提交offset到ZooKeeper中。
Direct
低阶消费者需要自己维护offset, Spark Streaming从分区里读一部分数据,然后将offset保存到CheckpointPath目录中,比如几秒生成一个Spark Streaming job(每个action操作启动一次job),每个job生成的时候,会写一次CheckpointPath下的文件,Checkpoint中有job信息和offset信息(当然还有RDD依赖关系等其他信息),即保存了未完成的job和分区读取的offset,一旦Spark Streaming挂掉后重启,可以通过从CheckpointPath中的文件中反序列化来读取Checkpoint的数据。