天天看点

Spark Streaming: 理解一下到底 foreachRDD,foreach 在干啥Dstream.foreachRDD:ps:RDD.foreach()

首先Spark算是一个分布式系统(分布式,分布式),对于每一个RDD,可以把他看成里面储存的是一堆指针,这些指针指向每一个RDD里的partition储存的位置。

Dstream.foreachRDD:

首先他操作的是Dstream, Dstream是一个由RDD组成的流,foreachRDD是一个输出的操作,它可以操作RDD,比如把RDD的数据写入的数据库

要是想要操作RDD里面的数据,就要用RDD.foreach

foreeachRDD操作的是Dstream,而foreach操作的是RDD,举个栗子,有一群人排队看中国打波兰的世界杯。你知道排队的都是要进去看比赛的,但是不知道到底有多少人,这样就要用foreachRDD。 而对于每一个人,要查身份证,安检一堆此操作,这就是对每个观众干的活,就是RDD里的elements。

ps:RDD.foreach()

这是一个挺奇怪的方法,他跟map的区别就是,map会返回操作的结果然后生成一个RDD,而foreach就只能操作每一个item,操作完了就没有然后了。

就像这样

def f(x):  print(x+1) 
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
           

比如对于RDD.map()他是操作每一个element后记录返回的结果,而foreach则会直接丢弃返回的结果

但是。。我发现个问题

A = []
def f(x):  
    print(x+1)
    A.append(x) 
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
           

这样A里却不会有任何结果

但是对于foreachRDD

min_of_elements  = []
average_per_RDD = []

def aaa(x):
    if x.isEmpty():
        print("empty")
        min_of_elements.append(0)
        average_per_RDD.append(0)
    else:
        min_of_elements.append(x.min())
        average_per_RDD.append(x.sum()/x.count())
           

这个就能把得到的结果存储来

对于一个RDD,他不一定会存在一个地方,可能一个partition在不同的位置,而操作RDD是并行的,所以每一个node都会去操作RDD的一部分,而那个A = 【】 每个node里是没有的,

Spark Streaming: 理解一下到底 foreachRDD,foreach 在干啥Dstream.foreachRDD:ps:RDD.foreach()

能看到其实每个node里是没有A的。

再看看print的效果

Spark Streaming: 理解一下到底 foreachRDD,foreach 在干啥Dstream.foreachRDD:ps:RDD.foreach()

打印时没有顺序的,因为他们时并行输出的。但是对于foreach,只能进行操作,比如把他们都存到mongoDB里,时刻都要记住,他只是在每个node里有你要操作的命令和一部分RDD。最重要的是他是并行的

 那为啥foreachRDD能行

foreachRDD操作的时Dstream。Dstream已经就是一个完整的流,如果一个node里处理这个dstream的,那在处理这个dstream的node里储存的变量就可以一直update。

图有点丑。。

Spark Streaming: 理解一下到底 foreachRDD,foreach 在干啥Dstream.foreachRDD:ps:RDD.foreach()

继续阅读