在spark任務的算子引用了外部變量,那麼spark預設會将被引用的變量拷貝副本發送到stage中的所有task。
如果我們想一種場景,被引用的變量比較大,比如100M。假設spark任務在stage階段使用了10個Executor,每個Executor有10個task,則spark會将變量拷貝100份總計10G。spark為了此變量就要消耗10G的網絡資源和存儲。對于一個spark叢集而言這個很糟糕的的任務。
那有沒有解決方式?其實spark提供了廣播變量。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9MWbiZHbzIGaSJjYxA3MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3ADN0QzNwMTMzIzNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
任務運作的時候第一個task會去driver端拉取一份變量資料到自己所在的Executor的BlockManager,
以後其他Executor的task會去存在此變量的BlockManager上拉取或者從Driver上拉取。
在同一個Executor的所有task共享BlockManager資訊
我們使用了廣播變量後情況如何?
10個Executor,每個Executor有10個task 此時,變量隻拷貝10份即1G,相比之前的10G而言降低了不必要的資源消耗
使用方式:
(1)scala版:
object StandAlone {
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setMaster("local").setAppName(StandAlone.getClass.getSimpleName)
val sparkContext=new SparkContext(sparkConf)
val broadcast=sparkContext.broadcast(List("a","b"))
val rdd=sparkContext.parallelize(List("hello java","hello scala","hello pythton"))
rdd.flatMap(line=>line.split(" ")).map(w=>(w,1)).foreach(e=>{
println(broadcast.value)
})
}
}
(2) java版
public class BroadCastDemo {
public static void main(String[] args) {
SparkConf sparkConf=new SparkConf();
sparkConf.setMaster("local[*]").setAppName(BroadCastDemo.class.getSimpleName());
JavaSparkContext sc=new JavaSparkContext(sparkConf);
List<Integer> list= Arrays.asList(1,2,3);
Broadcast<List<Integer>> broadcast = sc.broadcast(list);
sc.parallelize(Arrays.asList("hello java","hello scala","hello pythton"))
.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterator<String> call(Iterator<String> iterator) throws Exception {
List<String> result=new ArrayList<>();
while (iterator.hasNext()){
String line = iterator.next();
String[] s = line.split(" ");
for (int i = 0; i <s.length ; i++) {
result.add(s[i]);
}
}
return result.iterator();
}
}).foreachPartition(e->{
System.out.println(broadcast.getValue());
});
}
}