天天看點

《Spark大資料分析:核心概念、技術及實踐》一3.9 共享變量

 本節書摘來自華章出版社《spark大資料分析:核心概念、技術及實踐》一書中的第3章,第3.9節,作者[美] 穆罕默德·古勒(mohammed guller),更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

3.9 共享變量

spark使用的架構是無共享的。資料分布在叢集的各個節點上,每個節點都有自己的cpu、記憶體和存儲資源。沒有全局的記憶體空間用于任務間共享。驅動程式和任務之間通過消息共享資料。

舉例來說,如果一個rdd操作的函數參數是驅動程式中變量的引用,spark會将這個變量的副本以及任務一起發送給執行者。每個任務都有一份變量的副本并把它當成隻讀變量使用。任何對這個變量的更新都隻存在任務的内部,改動并不會回傳給驅動程式。而且spark會把這個變量在每一個階段的開始發送給worker節點。

對于一些應用而言,這種預設行為是低效的。在一個實際的使用場景中,驅動程式在作業的任務間共享了一個巨大的查找表。而這個作業由多個階段構成。預設情況下,spark會自動将這個變量及其相關任務發送給每個執行者。然而,spark會在每個階段做這件事。如果這個查找表存儲了100mb的資料,并且這個作業涉及10個階段,那麼spark就會給每個worker節點發送10次100mb的相同資料。

另外一個使用場景是在每個運作在不同節點上的任務中需要更新全局變量。預設情況下,任務中對變量的更新是不會回傳給驅動程式的。

spark通過共享變量的概念來滿足這些使用場景的需求。

3.9.1 廣播變量

廣播變量的使用使得spark應用可以有效地在驅動程式和執行作業的任務之間共享資料。spark隻會給worker節點發送一次廣播變量,并且将它反序列化成隻讀變量存儲在執行者的記憶體中。而且,spark采用一種更高效的算法來釋出廣播變量。

注意,如果一個作業由多個階段構成,且階段中的任務使用同一個驅動程式的變量,那麼使用廣播變量是十分有用的。如果你不想在開始執行每個任務之前反序列化變量,使用廣播變量也是有益的。預設情況下,spark會将傳輸過來的變量以序列化的形式緩存在執行者的記憶體中,在開始執行任務之前再反序列化它。

sparkcontext 類提供了一個叫作broadcast的方法用于建立廣播變量。它把一個待廣播的變量作為參數,傳回一個broadcast類執行個體。一個任務必須使用broadcast對象的value方法才可以擷取廣播變量的值。

考慮這樣一個應用,它根據電商交易資訊生成交易詳情。在現實世界的應用中會有一張顧客表、一張商品表和一張交易表。為了簡化起見,我們直接用一些簡單的資料結構來代替這些表作為輸入資料。

使用廣播變量使得我們可以高效地實作顧客資料、商品資料和交易資料之間的連接配接。我們可以通過使用rdd api來實作連接配接操作,但是這會在網絡間對顧客資料、商品資料和交易資料做shuffle操作。使用廣播變量,我們使得spark隻将顧客資料和商品資料發送給每個節點一次,并且用簡單的map操作來代替耗時的join操作。

3.9.2 累加器

累加器是隻增變量,它可以被運作在不同節點上的任務更改并且被驅動程式讀取。它可以用于計數器和聚合操作。spark提供了數值類型的累加器,也支援建立自定義類型的累加器。

sparkcontext類提供了一個叫作accumulator的方法用于建立累加器變量。它有兩個參數。第一個參數是累加器的初值,第二個是在spark ui中顯示的名字,這是一個可選參數。它傳回一個accumulator類執行個體。這個類執行個體為操作累加器變量提供操作符。任務隻能采用add方法或者+=操作符來增加累加器變量的值。隻有驅動程式可以通過value方法來擷取累加器的值。

考慮這樣一個應用,它需要從顧客表中過濾出不合法的顧客并計數。在現實世界的應用中,我們會從硬碟中讀取資料并将過濾後的資料寫入到硬碟中的另外一個檔案。為簡化起見,我們跳過讀寫硬碟的部分。

在使用累加器的時候需要注意,轉換操作期間對累加器的更新無法保證恰好隻有一次。如果一個任務或一個階段重複執行,每一個任務的更新操作就會多次執行。

而且,對累加器的更新操作并不是在rdd的操作方法被調用時才執行的。rdd的轉換操作是惰性的,轉換操作中對累加器的更新并不會立即執行。是以,如果驅動程式在操作方法被調用之前就使用累加器的值,那麼它将得到一個錯誤的值。

繼續閱讀