天天看點

Spark中廣播變量詳解

【前言:Spark目前提供了兩種有限定類型的共享變量:廣播變量和累加器,今天主要介紹一下基于Spark2.4版本的廣播變量。先前的版本比如Spark2.1之前的廣播變量有兩種實作:HttpBroadcast和TorrentBroadcast,但是鑒于HttpBroadcast有各種弊端,目前已經舍棄這種實作,本篇文章也主要闡述TorrentBroadcast】

廣播變量概述

廣播變量是一個隻讀變量,通過它我們可以将一些共享資料集或者大變量緩存在Spark叢集中的各個機器上而不用每個task都需要copy一個副本,後續計算可以重複使用,減少了資料傳輸時網絡帶寬的使用,提高效率。相比于Hadoop的分布式緩存,廣播的内容可以跨作業共享。

廣播變量要求廣播的資料不可變、不能太大但也不能太小(一般幾十M以上)、可被序列化和反序列化、并且必須在driver端聲明廣播變量,适用于廣播多個stage公用的資料,存儲級别目前是MEMORY_AND_DISK。

廣播變量存儲目前基于Spark實作的BlockManager分布式存儲系統,Spark中的shuffle資料、加載HDFS資料時切分過來的block塊都存儲在BlockManager中,不是今天的讨論點,這裡先不做詳述了。

廣播變量的建立方式和擷取

//建立廣播變量

val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

//擷取廣播變量
broadcastVar.value           

廣播變量執行個體化過程

1.首先調用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

2.調用BroadcastManager的newBroadcast方法

val bc = env.broadcastManager.newBroadcast[T](value, isLocal)           

3.通過廣播工廠的newBroadcast方法進行建立

broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())           

在調用BroadcastManager的newBroadcast方法時已完成對廣播工廠的初始化(initialize方法),我們隻需看BroadcastFactory的實作TorrentBroadcastFactory中對TorrentBroadcast的執行個體化過程:

new TorrentBroadcast[T](value_, id)           

4.在建構TorrentBroadcast時,将廣播的資料寫入BlockManager

1)首先會将廣播變量序列化後的對象劃分為多個block塊,存儲在driver端的BlockManager,這樣運作在driver端的task就不用建立廣播變量的副本了(具體可以檢視TorrentBroadcast的writeBlocks方法)

2)每個executor在擷取廣播變量時首先從本地的BlockManager擷取。擷取不到就會從driver或者其他的executor上擷取,擷取之後,會将擷取到的資料儲存在自己的BlockManager中

3)塊的大小預設4M

conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024           

廣播變量初始化過程

1.首先調用broadcastVar.value

2.TorrentBroadcast中lazy變量_value進行初始化,調用readBroadcastBlock()

3.先從緩存中讀取,對結果進行模式比對,比對成功的直接傳回

4.讀取不到通過readBlocks()進行讀取

從driver端或者其他的executor中讀取,将讀取的對象存儲到本地,并存于緩存中

new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)           

Spark兩種廣播變量對比

正如【前言】中所說,HttpBroadcast在Spark後續的版本中已經被廢棄,但考慮到部分公司用的Spark版本較低,面試中仍有可能問到兩種實作的相關問題,這裡簡單介紹一下:

HttpBroadcast會在driver端的BlockManager裡面存儲廣播變量對象,并且将該廣播變量序列化寫入檔案中去。所有擷取廣播資料請求都在driver端,是以存在單點故障和網絡IO性能問題。

TorrentBroadcast會在driver端的BlockManager裡面存儲廣播變量對象,并将廣播對象分割成若幹序列化block塊(預設4M),存儲于BlockManager。小的block存儲位置資訊,存儲于Driver端的BlockManagerMaster。資料請求并非集中于driver端,避免了單點故障和driver端網絡磁盤IO過高。

TorrentBroadcast在executor端存儲一個對象的同時會将擷取的block存儲于BlockManager,并向driver端的BlockManager彙報block的存儲資訊。

請求資料的時候會先擷取block的所有存儲位置資訊,并且是随機的在所有存儲了該executor的BlockManager去擷取,避免了資料請求服務集中于一點。

總之就是HttpBroadcast導緻擷取廣播變量的請求集中于driver端,容易引起driver端單點故障,網絡IO過高影響性能等問題,而TorrentBroadcast擷取廣播變量的請求服務即可以請求到driver端也可以在executor,避免了上述問題,當然這隻是主要的優化點。

繼續閱讀