首先Spark算是一個分布式系統(分布式,分布式),對于每一個RDD,可以把他看成裡面儲存的是一堆指針,這些指針指向每一個RDD裡的partition儲存的位置。
Dstream.foreachRDD:
首先他操作的是Dstream, Dstream是一個由RDD組成的流,foreachRDD是一個輸出的操作,它可以操作RDD,比如把RDD的資料寫入的資料庫
要是想要操作RDD裡面的資料,就要用RDD.foreach
foreeachRDD操作的是Dstream,而foreach操作的是RDD,舉個栗子,有一群人排隊看中國打波蘭的世界杯。你知道排隊的都是要進去看比賽的,但是不知道到底有多少人,這樣就要用foreachRDD。 而對于每一個人,要查身份證,安檢一堆此操作,這就是對每個觀衆幹的活,就是RDD裡的elements。
ps:RDD.foreach()
這是一個挺奇怪的方法,他跟map的差別就是,map會傳回操作的結果然後生成一個RDD,而foreach就隻能操作每一個item,操作完了就沒有然後了。
就像這樣
def f(x): print(x+1)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
比如對于RDD.map()他是操作每一個element後記錄傳回的結果,而foreach則會直接丢棄傳回的結果
但是。。我發現個問題
A = []
def f(x):
print(x+1)
A.append(x)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
這樣A裡卻不會有任何結果
但是對于foreachRDD
min_of_elements = []
average_per_RDD = []
def aaa(x):
if x.isEmpty():
print("empty")
min_of_elements.append(0)
average_per_RDD.append(0)
else:
min_of_elements.append(x.min())
average_per_RDD.append(x.sum()/x.count())
這個就能把得到的結果存儲來
對于一個RDD,他不一定會存在一個地方,可能一個partition在不同的位置,而操作RDD是并行的,是以每一個node都會去操作RDD的一部分,而那個A = 【】 每個node裡是沒有的,
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL2Z1RiNTV61EeNRVT0UkMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL0YDNxMTN1MDMxITOwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
能看到其實每個node裡是沒有A的。
再看看print的效果
列印時沒有順序的,因為他們時并行輸出的。但是對于foreach,隻能進行操作,比如把他們都存到mongoDB裡,時刻都要記住,他隻是在每個node裡有你要操作的指令和一部分RDD。最重要的是他是并行的
那為啥foreachRDD能行
foreachRDD操作的時Dstream。Dstream已經就是一個完整的流,如果一個node裡處理這個dstream的,那在處理這個dstream的node裡儲存的變量就可以一直update。
圖有點醜。。