使用者session随機抽取map示例
使用者session分析,按時間比例随機抽取
task執行過程中,如果使用了外部的局部變量,預設情況下,每個task都會擷取一份變量的副本,有什麼缺點?
首先 map本身是不小,存放資料的一個機關是Entry,還有可能會用連結清單的格式來存放Entry 鍊條。是以map是比較消耗記憶體的資料格式。
比如,map是1M。前面調優都很好的情況下,資源給的到位,配合資源并行度到位,1000個task,大量task都的确在并行運作。
- 這些task裡面都用到了占用1M記憶體的map,那麼 map會拷貝1000份副本,通過網絡傳輸到各個task中去,給task使用。總計有1G資料,會通過網絡傳輸,網絡傳輸開銷,會消耗掉Spark作業運作的總時間一小部分。
-
其次 map副本,傳輸到了各個task之後,是要占用記憶體的。1個map的确不大,1M,1000個map分布在叢集中,一下子耗費掉1G的記憶體。
不必要的記憶體消耗和占用,就導緻了再進行RDD持久化到記憶體,也許就沒法完全在記憶體中放下;就隻能持久化到磁盤,就會導緻後續操作在磁盤IO上消耗性能;
task在建立對象的時候,也許會出現堆記憶體放不下所有對象,也許會導緻頻繁垃圾回收期的回收,GC。GC的時候,一定會導緻工作線程停止,也就是導緻Spark暫停工作那麼一點時間,頻繁GC 會對Spark作業速度有很大影響。
比如随機抽取map ,1M,這種比較小。如果說從某些表讀取一些次元資料,比如說,所有商品品類資訊,在某個算子函數中要用到,大小為100M
100個task, 100G資料,網絡傳輸,叢集瞬間會因為這個消耗掉100G記憶體。
廣播大變量
如果說,task使用大變量(1m~100m),明知道會導緻性能出現惡劣的影響。那麼我們怎麼來解決呢?
廣播,Broadcast,将大變量廣播出去。而不是直接使用。
變量定義
//穿進來 SparkContext
private static void randomExtractSession(
JavaSparkContext sc,
final long taskid,
JavaPairRDD<String,String> filteredSessionid2AggrInfoRDD,
JavaPairRDD<String,Row> sessionid2detailRDD) {
/**
* 廣播變量, SparkContext的 broadcast()方法
*/
final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast = sc.broadcast(fastutilDateHourExtractMap);
``
變量使用
/**
* 使用廣播變量直接調用Broadcast類型的value ()/getValue()
*/
Map<String, Map<String, IntList>> dateHourExtractMap = dateHourExtractMapBroadcast.value();
For example
50個executor, 1000task。一個map,10M。
那麼按照預設模式,1000個task,1000份副本,1000*10M=10G記憶體,網絡傳輸,耗費10G記憶體資源
那麼如果使用了廣播變量呢? 50個executor, 50個副本,500M的資料,網絡傳輸,而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近節點的executor 的blockManager 上拉取變量副本,網絡傳輸速度大大增加,500M的記憶體消耗。
10000M,500M,20倍。20倍~以上的網絡傳輸性能消耗的降低;20倍的記憶體消耗的減少。
雖然說,不一定會對性能産生決定性的作用。比如運作30分鐘的spark作業,可能做了廣播變量以後,速度快了2分鐘,或者5分鐘。但是一點一滴的調優,積少成多。最後還是會有效果的。
沒有經過任何調優手段的spark作業,16個小時;三闆斧下來,就可以到5個小時;然後非常重要的一個調優,影響特别大,shuffle調優,2~3個小時;應用了10個以上的性能調優的技術點,JVM+廣播,30分鐘。16小時 ~30分鐘。