一:Broadcast徹底解析
1,Broadcast就是将資料從一個節點發送到其他的節點上;例如Driver上有一張表,而Executor中的每個并行的Task(100萬個Task)都要查詢這張表,那我們通過Broadcast方式就隻需要往每個Executor把這張表發送一次就行了,Executor中的每個Task查詢這張唯一的表,而不是每次執行的時候都從Driver獲得這張表!
2,這就好像ServletContext的具體作用,隻是Broadcast是分布式的共享資料,預設情況下隻要程式在運作Broadcast變量就會存在,因為Broadcast子底層是通過BroadcastManager管理的!但是你可以手動指定或者配置具體周期來銷毀Broadcast變量!
3,Broadcast一般用于處理共享配置檔案、通用的DataSet、常用的資料結構等等;但是不适合存放太大的資料在Broadcast,Broadcast不會記憶體溢出,因為其資料的儲存的StoreLevel是MEMORY_AND_DISK的方式;雖然如此,我們也不可以放入太大的資料在Broadcast中,因為網絡IO和可能的單點壓力會非常大!
4,Broadcast變量是隻讀變量,最為輕松保持了資料的一緻性!
5,Broadcast的使用:
* {{{
* scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
* broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
*
* scala> broadcastVar.value
* res0: Array[Int] = Array(1, 2, 3)
* }}}
6,HttpBroadcast方式的Broadcast,最開始的時候資料放在Driver的本地檔案系統中,Driver在本地會建立一個檔案夾來存放Broadcast中的data,然後啟動HttpServer來通路檔案夾中的資料,同時寫入到BlockManager(StorageLevel是MEMORY_AND_DISK)中獲得BlockId(BroadcastBlockId),當Executor中的Task要通路Broadcast變量的時候,會向Driver通過HttpServer來通路資料,然後會在Executor中的BlockManager中注冊該Broadcast中的資料,這樣後要的Task需要通路Broadcast的變量的時候會首先查詢BlockManager中有沒有該資料,如果有就直接使用;
7,BroadcastManager是用來管理Broadcast,該執行個體對象是在SparkContext建立SparkEnv的時候建立的:
// Called by SparkContext or Executor before using Broadcast
private def initialize() {
synchronized {
if (!initialized) {
val broadcastFactoryClass =
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
broadcastFactory =
Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory.initialize(isDriver, conf, securityManager)
initialized = true
}
}
}
在執行個體化BlockcastManager的時候會建立BlockcastFactory工廠來建構具體實際的Brockcast類型,預設情況下是TorrentBroadcastFactory;