天天看點

spark調優之廣播大變量

在spark任務的算子引用了外部變量,那麼spark預設會将被引用的變量拷貝副本發送到stage中的所有task。

如果我們想一種場景,被引用的變量比較大,比如100M。假設spark任務在stage階段使用了10個Executor,每個Executor有10個task,則spark會将變量拷貝100份總計10G。spark為了此變量就要消耗10G的網絡資源和存儲。對于一個spark叢集而言這個很糟糕的的任務。

那有沒有解決方式?其實spark提供了廣播變量。

spark調優之廣播大變量

任務運作的時候第一個task會去driver端拉取一份變量資料到自己所在的Executor的BlockManager,

以後其他Executor的task會去存在此變量的BlockManager上拉取或者從Driver上拉取。

在同一個Executor的所有task共享BlockManager資訊

我們使用了廣播變量後情況如何?

10個Executor,每個Executor有10個task 此時,變量隻拷貝10份即1G,相比之前的10G而言降低了不必要的資源消耗

使用方式:

(1)scala版:

object StandAlone {

  def main(args: Array[String]): Unit = {
     val sparkConf=new SparkConf().setMaster("local").setAppName(StandAlone.getClass.getSimpleName)
     val sparkContext=new SparkContext(sparkConf)
     val broadcast=sparkContext.broadcast(List("a","b"))
     val rdd=sparkContext.parallelize(List("hello java","hello scala","hello pythton"))
     rdd.flatMap(line=>line.split(" ")).map(w=>(w,1)).foreach(e=>{
        println(broadcast.value)
     })
  }
}      

(2) java版

public class BroadCastDemo {
    public static void main(String[] args) {
        SparkConf sparkConf=new SparkConf();
        sparkConf.setMaster("local[*]").setAppName(BroadCastDemo.class.getSimpleName());
        JavaSparkContext sc=new JavaSparkContext(sparkConf);
        List<Integer> list= Arrays.asList(1,2,3);
        Broadcast<List<Integer>> broadcast = sc.broadcast(list);
        sc.parallelize(Arrays.asList("hello java","hello scala","hello pythton"))
                .mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
                    @Override
                    public Iterator<String> call(Iterator<String> iterator) throws Exception {
                        List<String> result=new ArrayList<>();
                        while (iterator.hasNext()){
                            String line = iterator.next();
                            String[] s = line.split(" ");
                            for (int i = 0; i <s.length ; i++) {
                                result.add(s[i]);
                            }
                        }
                        return result.iterator();
                    }
                }).foreachPartition(e->{
            System.out.println(broadcast.getValue());
        });
    }
}      

繼續閱讀