天天看點

Spark Streaming: 了解一下到底 foreachRDD,foreach 在幹啥Dstream.foreachRDD:ps:RDD.foreach()

首先Spark算是一個分布式系統(分布式,分布式),對于每一個RDD,可以把他看成裡面儲存的是一堆指針,這些指針指向每一個RDD裡的partition儲存的位置。

Dstream.foreachRDD:

首先他操作的是Dstream, Dstream是一個由RDD組成的流,foreachRDD是一個輸出的操作,它可以操作RDD,比如把RDD的資料寫入的資料庫

要是想要操作RDD裡面的資料,就要用RDD.foreach

foreeachRDD操作的是Dstream,而foreach操作的是RDD,舉個栗子,有一群人排隊看中國打波蘭的世界杯。你知道排隊的都是要進去看比賽的,但是不知道到底有多少人,這樣就要用foreachRDD。 而對于每一個人,要查身份證,安檢一堆此操作,這就是對每個觀衆幹的活,就是RDD裡的elements。

ps:RDD.foreach()

這是一個挺奇怪的方法,他跟map的差別就是,map會傳回操作的結果然後生成一個RDD,而foreach就隻能操作每一個item,操作完了就沒有然後了。

就像這樣

def f(x):  print(x+1) 
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
           

比如對于RDD.map()他是操作每一個element後記錄傳回的結果,而foreach則會直接丢棄傳回的結果

但是。。我發現個問題

A = []
def f(x):  
    print(x+1)
    A.append(x) 
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
           

這樣A裡卻不會有任何結果

但是對于foreachRDD

min_of_elements  = []
average_per_RDD = []

def aaa(x):
    if x.isEmpty():
        print("empty")
        min_of_elements.append(0)
        average_per_RDD.append(0)
    else:
        min_of_elements.append(x.min())
        average_per_RDD.append(x.sum()/x.count())
           

這個就能把得到的結果存儲來

對于一個RDD,他不一定會存在一個地方,可能一個partition在不同的位置,而操作RDD是并行的,是以每一個node都會去操作RDD的一部分,而那個A = 【】 每個node裡是沒有的,

Spark Streaming: 了解一下到底 foreachRDD,foreach 在幹啥Dstream.foreachRDD:ps:RDD.foreach()

能看到其實每個node裡是沒有A的。

再看看print的效果

Spark Streaming: 了解一下到底 foreachRDD,foreach 在幹啥Dstream.foreachRDD:ps:RDD.foreach()

列印時沒有順序的,因為他們時并行輸出的。但是對于foreach,隻能進行操作,比如把他們都存到mongoDB裡,時刻都要記住,他隻是在每個node裡有你要操作的指令和一部分RDD。最重要的是他是并行的

 那為啥foreachRDD能行

foreachRDD操作的時Dstream。Dstream已經就是一個完整的流,如果一個node裡處理這個dstream的,那在處理這個dstream的node裡儲存的變量就可以一直update。

圖有點醜。。

Spark Streaming: 了解一下到底 foreachRDD,foreach 在幹啥Dstream.foreachRDD:ps:RDD.foreach()

繼續閱讀