天天看點

Spark Broadcast内幕解密(簡約版)

一:Broadcast徹底解析

1,Broadcast就是将資料從一個節點發送到其他的節點上;例如Driver上有一張表,而Executor中的每個并行的Task(100萬個Task)都要查詢這張表,那我們通過Broadcast方式就隻需要往每個Executor把這張表發送一次就行了,Executor中的每個Task查詢這張唯一的表,而不是每次執行的時候都從Driver獲得這張表!

2,這就好像ServletContext的具體作用,隻是Broadcast是分布式的共享資料,預設情況下隻要程式在運作Broadcast變量就會存在,因為Broadcast子底層是通過BroadcastManager管理的!但是你可以手動指定或者配置具體周期來銷毀Broadcast變量!

3,Broadcast一般用于處理共享配置檔案、通用的DataSet、常用的資料結構等等;但是不适合存放太大的資料在Broadcast,Broadcast不會記憶體溢出,因為其資料的儲存的StoreLevel是MEMORY_AND_DISK的方式;雖然如此,我們也不可以放入太大的資料在Broadcast中,因為網絡IO和可能的單點壓力會非常大!

4,Broadcast變量是隻讀變量,最為輕松保持了資料的一緻性!

5,Broadcast的使用:

* {{{
* scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
* broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
*
* scala> broadcastVar.value
* res0: Array[Int] = Array(1, 2, 3)
* }}}      

6,HttpBroadcast方式的Broadcast,最開始的時候資料放在Driver的本地檔案系統中,Driver在本地會建立一個檔案夾來存放Broadcast中的data,然後啟動HttpServer來通路檔案夾中的資料,同時寫入到BlockManager(StorageLevel是MEMORY_AND_DISK)中獲得BlockId(BroadcastBlockId),當Executor中的Task要通路Broadcast變量的時候,會向Driver通過HttpServer來通路資料,然後會在Executor中的BlockManager中注冊該Broadcast中的資料,這樣後要的Task需要通路Broadcast的變量的時候會首先查詢BlockManager中有沒有該資料,如果有就直接使用;

7,BroadcastManager是用來管理Broadcast,該執行個體對象是在SparkContext建立SparkEnv的時候建立的:

// Called by SparkContext or Executor before using Broadcast
private def initialize() {
  synchronized {
    if (!initialized) {
      val broadcastFactoryClass =
        conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")

      broadcastFactory =
        Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

      // Initialize appropriate BroadcastFactory and BroadcastObject
      broadcastFactory.initialize(isDriver, conf, securityManager)

      initialized = true
    }
  }
}      

在執行個體化BlockcastManager的時候會建立BlockcastFactory工廠來建構具體實際的Brockcast類型,預設情況下是TorrentBroadcastFactory;

繼續閱讀