在spark任务的算子引用了外部变量,那么spark默认会将被引用的变量拷贝副本发送到stage中的所有task。
如果我们想一种场景,被引用的变量比较大,比如100M。假设spark任务在stage阶段使用了10个Executor,每个Executor有10个task,则spark会将变量拷贝100份总计10G。spark为了此变量就要消耗10G的网络资源和存储。对于一个spark集群而言这个很糟糕的的任务。
那有没有解决方式?其实spark提供了广播变量。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9MWbiZHbzIGaSJjYxA3MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3ADN0QzNwMTMzIzNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
任务运行的时候第一个task会去driver端拉取一份变量数据到自己所在的Executor的BlockManager,
以后其他Executor的task会去存在此变量的BlockManager上拉取或者从Driver上拉取。
在同一个Executor的所有task共享BlockManager信息
我们使用了广播变量后情况如何?
10个Executor,每个Executor有10个task 此时,变量只拷贝10份即1G,相比之前的10G而言降低了不必要的资源消耗
使用方式:
(1)scala版:
object StandAlone {
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setMaster("local").setAppName(StandAlone.getClass.getSimpleName)
val sparkContext=new SparkContext(sparkConf)
val broadcast=sparkContext.broadcast(List("a","b"))
val rdd=sparkContext.parallelize(List("hello java","hello scala","hello pythton"))
rdd.flatMap(line=>line.split(" ")).map(w=>(w,1)).foreach(e=>{
println(broadcast.value)
})
}
}
(2) java版
public class BroadCastDemo {
public static void main(String[] args) {
SparkConf sparkConf=new SparkConf();
sparkConf.setMaster("local[*]").setAppName(BroadCastDemo.class.getSimpleName());
JavaSparkContext sc=new JavaSparkContext(sparkConf);
List<Integer> list= Arrays.asList(1,2,3);
Broadcast<List<Integer>> broadcast = sc.broadcast(list);
sc.parallelize(Arrays.asList("hello java","hello scala","hello pythton"))
.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterator<String> call(Iterator<String> iterator) throws Exception {
List<String> result=new ArrayList<>();
while (iterator.hasNext()){
String line = iterator.next();
String[] s = line.split(" ");
for (int i = 0; i <s.length ; i++) {
result.add(s[i]);
}
}
return result.iterator();
}
}).foreachPartition(e->{
System.out.println(broadcast.getValue());
});
}
}