天天看点

Spark变量的理解1. driver变量副本:2. 累加器 3 广播变量4.总结

Table of Contents

1. driver变量副本:

2. 累加器

3 广播变量

为什么只能 broadcast 只读的变量?

broadcast 到节点而不是 broadcast 到每个 task?

为什么要使用广播变量?

4.总结

1. driver变量副本:

可以简单看做main函数中定义的变量,一般而言,当我们给Spark算子(如 map 或 reduce)传递一个函数时,这些函数将会在远程的集群节点上运行,并且这些函数所引用的变量都是各个节点上的独立副本。这些变量都会以副本的形式复制到各个机器

节点上,如果更新这些变量副本的话,这些更新并不会传回到驱动器(driver)程序。通常来说,支持跨任务的可读写共享变量是比较低效的。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.datanucleus.store.types.backed.ArrayList;

import java.util.Arrays;
import java.util.List;

class Count implements Function<Integer, Integer> {
    int count;   
    public Count(){
        count = 0;
    }
    @Override
    public Integer call(Integer o) throws Exception {
        count = count+1;
        System.out.println(o+":count:"+count);
        return o+1;  //这里不能自加
    }
}
public class RddAccmulator {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("RddAccmulator");
        JavaSparkContext sc = new JavaSparkContext(conf);
        Count count = new Count();  //这里属于驱动器的变量,在每个task都会有一个副本
        JavaRDD<Integer> list = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)).repartition(2);
        list.map(count).collect().forEach(x-> System.out.println(x));
        System.out.println("driver count:"+count.count);

    }
}
           

可以看到 ,我们在代码中把数据reparation在了两个分区,可以从下面的输出,两个分区各有一个Count变量副本,而driver端也有自己的Count,更改这些变量的值并不会影响到driver的对应变量。至于下面那个歌为什么不是5而是6呢?可以这么猜测,分区1的数据有2,4,5,6,8,10,其他数据在分区2,因为我们分区的时候没有指定哪些数据在哪个分区。我们可以写个程序来看数据的分布。

2:count:1
1:count:1
4:count:2
3:count:2
5:count:3
7:count:3
6:count:4
9:count:4
8:count:5
10:count:6

drivercount:0
           

插入以下代码,验证我们的猜想,结果证明事实就是那样

JavaRDD<String> rddRes = list.mapPartitionsWithIndex((x,it)->{
            List<String> sumList = new ArrayList<String>();
            StringBuffer num  = new StringBuffer();
            while(it.hasNext()){
                num = num.append(it.next()+",");
            }
            sumList.add(x+"|"+num);
            return sumList.iterator();
        },false);
        System.out.println(rddRes.count());
        System.out.println(rddRes.collect());

输出结果:
2
[0|1,3,5,6,8,10,, 1|2,4,7,9,]
           

2. 累加器

1)累加器是一种只支持满足结合律的“累加”操作的变量,因此它可以很高效地支持并行计算。利用累加器可以

实现计数(类似MapReduce中的计数器)或者求和。

2)Spark原生支持了数字类型的累加器,开发者也可以自)定义新的累加器。如果创建累加器的时候给了一个名字,那么这个名字会展示在Spark UI上,这对于了解程序运行处于哪个阶段非常有帮助(注意:Python尚不支持该功能)。

3)创捷累加器时需要赋一个初始值v,调用 SparkContext.accumulator(v) 可以创建一个累加器。后续集群中运行

的任务可以使用 add 方法 或者 += 操作符 (仅Scala和Python支持)来进行累加操作。不过,任务本身并不)能读取累加器的值,只有驱动器程序可以用 value 方法访问累加器的值

4)这是有累加器的实现原理决定的:

Driver端 

   Driver端初始化构建Accumulator并初始化 

   同时完成了Accumulator注册:Accumulators.register(this) 

   同时Accumulator会在序列化后发送到Executor端

   Driver接收到ResultTask完成的状态更新后,会去更新Value的值 

   然后在Action操作执行后就可以获取到Accumulator的值了

Executor端 

   Executor端接收到Task之后会进行反序列化操作,反序列化得到RDD和function 

   同时在反序列化的同时也去反序列化Accumulator(在readObject方法中完成) 

   同时也会向TaskContext完成注册

   完成任务计算之后,随着Task结果一起返回给Driver

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;


import java.util.Arrays;

public class RddAccmulater1 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("RddAccmulator");
        JavaSparkContext sc = new JavaSparkContext(conf);

        final Accumulator<Integer> count1 = sc.accumulator(0);

        final Accumulator<Integer> count2 = sc.accumulator(0);

        JavaRDD<Integer> list = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)).repartition(2);
        list.foreach(x->{count1.add(1);});
        System.out.println(list.collect());
        System.out.println(count1.value());  //这个得在action操作之后,否则lazy求值未action之前transf操作都不执行
///
        JavaRDD<Integer> rdd = list.map(x->{
            count2.add(1);
            return x+1;
        });
        System.out.println(rdd.count());
        System.out.println(count2.value());
        System.out.println(rdd.reduce((x,y)->x+y));
        System.out.println(count2.value());
    }
}
           

上面count1输出正常:10

但是count2第一次输出10,第二次输出20,这是为什么?

因为在没有对中间结果缓存的情况下,每一次的计算都是从最原始的rdd开始算起,transform中的定义的变量都是副本,都只在这个生命周期存在,而累加变量是在整个driver的生命周期,所以count2被累加了两次。要避免这样的结果只需要对第一次的rdd的结果做一次缓存就可以,这样下一次就会从这个缓存开始计算而不是从头开始计算。 

3 广播变量

广播变量提供了一种只读的共享变量,它是在每个机器节点上保存一个缓存,而不是每个任务保存一份副

本。通常可以用来在每个节点上保存一个较大的输入数据集,这要比常规的变量副本更高效(一般的变量

是每个任务一个副本,一个节点上可能有多个任务)。Spark还会尝试使用高效的广播算法来分发广播变

量,以减少通信开销。

为什么只能 broadcast 只读的变量?

这就涉及一致性的问题,如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步?还会涉及 fault-tolerance 的问题。为了避免维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。

broadcast 到节点而不是 broadcast 到每个 task?

因为每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application。因此每个节点(executor)上放一份就可以被所有 task 共享。

为什么要使用广播变量?

主要用于节约内存的开销。

Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor,不是使用广播变量会有多份的变量副本。这样会导致消耗大量的内存导致严重的后果。

  • 不使用广播变量:
Spark变量的理解1. driver变量副本:2. 累加器 3 广播变量4.总结

不使用广播变量

  • 使用广播变量:
Spark变量的理解1. driver变量副本:2. 累加器 3 广播变量4.总结

使用广播变量 

下面给出一个例子,给出一个List,去掉List中和广播变量中相同的值

public class RddBroadcast {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("RddBroadcast");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //String[] jarPath = new String[]{"/home/linjiaqin/IdeaProjects/JavaSparkTest/out/artifacts/wordcount/wordcount.jar"};
        //conf.setJars(jarPath);

        String hdfs = "hdfs://localhost:9000";
        JavaRDD<Integer> list = sc.parallelize(Arrays.asList(1,2,3,4,5,6,1,2,3,7,8,9,10)).repartition(3);
        final Broadcast<List> bC = sc.broadcast(Arrays.asList(1,2,3));   
        JavaRDD<Integer> rdd = list.filter(x->{
            return !bC.value().contains(x);
        });
        System.out.println(rdd.collect());

    }
}
           

4.总结

1)driver中定义的一般变量或者action操作里的function的变量,每个task上都会有其副本,所需要的内存最大,更新这些副本的值也不会影响到driver中对应变量

2)广播变量是每个节点一个变量副本(一个节点上有多个并行的task),是只读变量,因为更新当个节点,其它节点没法更新

3)累加器,提供了将各个task中的值聚合到驱动程序的语法,是只写变量,节点上的任务不能读取累加器的值,因为这个值时时变化,只有最后driver拿到的是有意义的

继续阅读