天天看点

Pyspark中的全局作用域和局部作用域

1.全局作用域和局部作用域

在集群模式下,提交了任务之后,任务被发送给了驱动程序节点(也就是master节点)。由主程序节点为任务创建DAG,并且决定哪一个执行者节点将要运行的特定任务。在驱动程序指示下执行者节点执行任务,并在执行结束后将结果返回给驱动程序。在执行者执行任务的过程中,驱动程序为每一个任务的终止做准备:驱动程序有一组变量和方法,以便执行者节点在RDD上执行任务。

这组变量和方法在执行者节点的上下文中本质上是静态的,也就是说每个执行器从驱动程序中获得一份变量和方法的副本。运行任务时,如果执行者改变这些变量或者覆盖这些方法,并不会对其他执行者的副本或者驱动程序的变量和方法。

RDD在计算的时候,每个分区都有一个执行者程序,所以RDD的分区数目决定了总的执行者程序数目。申请的计算节点数目和每个计算节点的核数,决定了同一时刻可以并行执行的执行者程序。一个节点可拥有的最大核数为该节点的逻辑CPU数量。

2.累加器和广播变量

累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破上述限制。

2.1 累加器

累加器提供了将工作节点中的值聚合聚合到驱动器程序中的简单语法。

from pyspark import SparkContext,SparkConf

def extractCallSigns(line):
    global blank
    if line.startswith("hello"):
        blank+=1
    return line.split(' ')

if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc = SparkContext(conf=conf)
    rdd=sc.textFile('word_count.txt,word_count_2.txt',2)
    blank=sc.accumulator(0)
    callSings=rdd.flatMap(extractCallSigns)
    print(callSings.count())
    print(blank.value)
           

上述实验中代码设置了一个累加器用于统计两个文件中以“hello”开头的行数。最后的blank的值为3。需要注意的是, 由于RDD是惰性计算的,所以如果不执行一个action算子的话,blank的值是不会变的仍然为0,所以必须执行一个类似callSings.count()的action操作才能获得正确的值。

但是,工作节点上的任务不能访问累加器的值,从这些任务的角度来看,累加器是一个只写变量。这种模式,累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。

2.2 广播变量

第二种共享变量是广播变量。它可以让程序高效地向所有工作节点发送一个较大的只读值,供一个或多个Spark操作使用。广播变量其实是类型为spark.broadcast.Broadcast的一个对象。可以在任务中通过对象Broadcast对象调用value来获取该对象的值。这个值只会被发送到各节点一次。如果不适用广播变量的话,则在程序中每使用该变量一次,就需要向所有节点传送一次。

当广播一个较大的值, 应该选择既快又好的序列化格式。因为如果序列化对象时间很长后很长或者传送花费的时间太久,则很可能成为性能瓶颈。

参考:

1.PySpark实践指南

2.http://spark.apache.org/docs/latest/rdd-programming-guide.html#local-vs-cluster-modes

继续阅读