天天看點

Spark變量的了解1. driver變量副本:2. 累加器 3 廣播變量4.總結

Table of Contents

1. driver變量副本:

2. 累加器

3 廣播變量

為什麼隻能 broadcast 隻讀的變量?

broadcast 到節點而不是 broadcast 到每個 task?

為什麼要使用廣播變量?

4.總結

1. driver變量副本:

可以簡單看做main函數中定義的變量,一般而言,當我們給Spark算子(如 map 或 reduce)傳遞一個函數時,這些函數将會在遠端的叢集節點上運作,并且這些函數所引用的變量都是各個節點上的獨立副本。這些變量都會以副本的形式複制到各個機器

節點上,如果更新這些變量副本的話,這些更新并不會傳回到驅動器(driver)程式。通常來說,支援跨任務的可讀寫共享變量是比較低效的。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.datanucleus.store.types.backed.ArrayList;

import java.util.Arrays;
import java.util.List;

class Count implements Function<Integer, Integer> {
    int count;   
    public Count(){
        count = 0;
    }
    @Override
    public Integer call(Integer o) throws Exception {
        count = count+1;
        System.out.println(o+":count:"+count);
        return o+1;  //這裡不能自加
    }
}
public class RddAccmulator {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("RddAccmulator");
        JavaSparkContext sc = new JavaSparkContext(conf);
        Count count = new Count();  //這裡屬于驅動器的變量,在每個task都會有一個副本
        JavaRDD<Integer> list = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)).repartition(2);
        list.map(count).collect().forEach(x-> System.out.println(x));
        System.out.println("driver count:"+count.count);

    }
}
           

可以看到 ,我們在代碼中把資料reparation在了兩個分區,可以從下面的輸出,兩個分區各有一個Count變量副本,而driver端也有自己的Count,更改這些變量的值并不會影響到driver的對應變量。至于下面那個歌為什麼不是5而是6呢?可以這麼猜測,分區1的資料有2,4,5,6,8,10,其他資料在分區2,因為我們分區的時候沒有指定哪些資料在哪個分區。我們可以寫個程式來看資料的分布。

2:count:1
1:count:1
4:count:2
3:count:2
5:count:3
7:count:3
6:count:4
9:count:4
8:count:5
10:count:6

drivercount:0
           

插入以下代碼,驗證我們的猜想,結果證明事實就是那樣

JavaRDD<String> rddRes = list.mapPartitionsWithIndex((x,it)->{
            List<String> sumList = new ArrayList<String>();
            StringBuffer num  = new StringBuffer();
            while(it.hasNext()){
                num = num.append(it.next()+",");
            }
            sumList.add(x+"|"+num);
            return sumList.iterator();
        },false);
        System.out.println(rddRes.count());
        System.out.println(rddRes.collect());

輸出結果:
2
[0|1,3,5,6,8,10,, 1|2,4,7,9,]
           

2. 累加器

1)累加器是一種隻支援滿足結合律的“累加”操作的變量,是以它可以很高效地支援并行計算。利用累加器可以

實作計數(類似MapReduce中的計數器)或者求和。

2)Spark原生支援了數字類型的累加器,開發者也可以自)定義新的累加器。如果建立累加器的時候給了一個名字,那麼這個名字會展示在Spark UI上,這對于了解程式運作處于哪個階段非常有幫助(注意:Python尚不支援該功能)。

3)創捷累加器時需要賦一個初始值v,調用 SparkContext.accumulator(v) 可以建立一個累加器。後續叢集中運作

的任務可以使用 add 方法 或者 += 操作符 (僅Scala和Python支援)來進行累加操作。不過,任務本身并不)能讀取累加器的值,隻有驅動器程式可以用 value 方法通路累加器的值

4)這是有累加器的實作原理決定的:

Driver端 

   Driver端初始化建構Accumulator并初始化 

   同時完成了Accumulator注冊:Accumulators.register(this) 

   同時Accumulator會在序列化後發送到Executor端

   Driver接收到ResultTask完成的狀态更新後,會去更新Value的值 

   然後在Action操作執行後就可以擷取到Accumulator的值了

Executor端 

   Executor端接收到Task之後會進行反序列化操作,反序列化得到RDD和function 

   同時在反序列化的同時也去反序列化Accumulator(在readObject方法中完成) 

   同時也會向TaskContext完成注冊

   完成任務計算之後,随着Task結果一起傳回給Driver

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;


import java.util.Arrays;

public class RddAccmulater1 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("RddAccmulator");
        JavaSparkContext sc = new JavaSparkContext(conf);

        final Accumulator<Integer> count1 = sc.accumulator(0);

        final Accumulator<Integer> count2 = sc.accumulator(0);

        JavaRDD<Integer> list = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)).repartition(2);
        list.foreach(x->{count1.add(1);});
        System.out.println(list.collect());
        System.out.println(count1.value());  //這個得在action操作之後,否則lazy求值未action之前transf操作都不執行
///
        JavaRDD<Integer> rdd = list.map(x->{
            count2.add(1);
            return x+1;
        });
        System.out.println(rdd.count());
        System.out.println(count2.value());
        System.out.println(rdd.reduce((x,y)->x+y));
        System.out.println(count2.value());
    }
}
           

上面count1輸出正常:10

但是count2第一次輸出10,第二次輸出20,這是為什麼?

因為在沒有對中間結果緩存的情況下,每一次的計算都是從最原始的rdd開始算起,transform中的定義的變量都是副本,都隻在這個生命周期存在,而累加變量是在整個driver的生命周期,是以count2被累加了兩次。要避免這樣的結果隻需要對第一次的rdd的結果做一次緩存就可以,這樣下一次就會從這個緩存開始計算而不是從頭開始計算。 

3 廣播變量

廣播變量提供了一種隻讀的共享變量,它是在每個機器節點上儲存一個緩存,而不是每個任務儲存一份副

本。通常可以用來在每個節點上儲存一個較大的輸入資料集,這要比正常的變量副本更高效(一般的變量

是每個任務一個副本,一個節點上可能有多個任務)。Spark還會嘗試使用高效的廣播算法來分發廣播變

量,以減少通信開銷。

為什麼隻能 broadcast 隻讀的變量?

這就涉及一緻性的問題,如果變量可以被更新,那麼一旦變量被某個節點更新,其他節點要不要一塊更新?如果多個節點同時在更新,更新順序是什麼?怎麼做同步?還會涉及 fault-tolerance 的問題。為了避免維護資料一緻性問題,Spark 目前隻支援 broadcast 隻讀變量。

broadcast 到節點而不是 broadcast 到每個 task?

因為每個 task 是一個線程,而且同在一個程序運作 tasks 都屬于同一個 application。是以每個節點(executor)上放一份就可以被所有 task 共享。

為什麼要使用廣播變量?

主要用于節約記憶體的開銷。

Driver每次分發任務的時候會把task和計算邏輯的變量發送給Executor,不是使用廣播變量會有多份的變量副本。這樣會導緻消耗大量的記憶體導緻嚴重的後果。

  • 不使用廣播變量:
Spark變量的了解1. driver變量副本:2. 累加器 3 廣播變量4.總結

不使用廣播變量

  • 使用廣播變量:
Spark變量的了解1. driver變量副本:2. 累加器 3 廣播變量4.總結

使用廣播變量 

下面給出一個例子,給出一個List,去掉List中和廣播變量中相同的值

public class RddBroadcast {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("RddBroadcast");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //String[] jarPath = new String[]{"/home/linjiaqin/IdeaProjects/JavaSparkTest/out/artifacts/wordcount/wordcount.jar"};
        //conf.setJars(jarPath);

        String hdfs = "hdfs://localhost:9000";
        JavaRDD<Integer> list = sc.parallelize(Arrays.asList(1,2,3,4,5,6,1,2,3,7,8,9,10)).repartition(3);
        final Broadcast<List> bC = sc.broadcast(Arrays.asList(1,2,3));   
        JavaRDD<Integer> rdd = list.filter(x->{
            return !bC.value().contains(x);
        });
        System.out.println(rdd.collect());

    }
}
           

4.總結

1)driver中定義的一般變量或者action操作裡的function的變量,每個task上都會有其副本,所需要的記憶體最大,更新這些副本的值也不會影響到driver中對應變量

2)廣播變量是每個節點一個變量副本(一個節點上有多個并行的task),是隻讀變量,因為更新當個節點,其它節點沒法更新

3)累加器,提供了将各個task中的值聚合到驅動程式的文法,是隻寫變量,節點上的任務不能讀取累加器的值,因為這個值時時變化,隻有最後driver拿到的是有意義的

繼續閱讀