天天看点

spark调优之广播大变量

在spark任务的算子引用了外部变量,那么spark默认会将被引用的变量拷贝副本发送到stage中的所有task。

如果我们想一种场景,被引用的变量比较大,比如100M。假设spark任务在stage阶段使用了10个Executor,每个Executor有10个task,则spark会将变量拷贝100份总计10G。spark为了此变量就要消耗10G的网络资源和存储。对于一个spark集群而言这个很糟糕的的任务。

那有没有解决方式?其实spark提供了广播变量。

spark调优之广播大变量

任务运行的时候第一个task会去driver端拉取一份变量数据到自己所在的Executor的BlockManager,

以后其他Executor的task会去存在此变量的BlockManager上拉取或者从Driver上拉取。

在同一个Executor的所有task共享BlockManager信息

我们使用了广播变量后情况如何?

10个Executor,每个Executor有10个task 此时,变量只拷贝10份即1G,相比之前的10G而言降低了不必要的资源消耗

使用方式:

(1)scala版:

object StandAlone {

  def main(args: Array[String]): Unit = {
     val sparkConf=new SparkConf().setMaster("local").setAppName(StandAlone.getClass.getSimpleName)
     val sparkContext=new SparkContext(sparkConf)
     val broadcast=sparkContext.broadcast(List("a","b"))
     val rdd=sparkContext.parallelize(List("hello java","hello scala","hello pythton"))
     rdd.flatMap(line=>line.split(" ")).map(w=>(w,1)).foreach(e=>{
        println(broadcast.value)
     })
  }
}      

(2) java版

public class BroadCastDemo {
    public static void main(String[] args) {
        SparkConf sparkConf=new SparkConf();
        sparkConf.setMaster("local[*]").setAppName(BroadCastDemo.class.getSimpleName());
        JavaSparkContext sc=new JavaSparkContext(sparkConf);
        List<Integer> list= Arrays.asList(1,2,3);
        Broadcast<List<Integer>> broadcast = sc.broadcast(list);
        sc.parallelize(Arrays.asList("hello java","hello scala","hello pythton"))
                .mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
                    @Override
                    public Iterator<String> call(Iterator<String> iterator) throws Exception {
                        List<String> result=new ArrayList<>();
                        while (iterator.hasNext()){
                            String line = iterator.next();
                            String[] s = line.split(" ");
                            for (int i = 0; i <s.length ; i++) {
                                result.add(s[i]);
                            }
                        }
                        return result.iterator();
                    }
                }).foreachPartition(e->{
            System.out.println(broadcast.getValue());
        });
    }
}      

继续阅读