天天看点

如何在Python Spark Streaming中更新广播变量?

我需要使用Python在Spark Structured Streaming中随时间更新广播变量(例如,在定义的时间间隔内)。好多资料都是用Scala或Java。 用Python编写Broadcast Wrapper类 如下: 

import time
from datetime import datetime
from pyspark import SparkConf, SparkContext

conf = SparkConf() \
    .setMaster("local") \
    .setAppName("My app") \
    .set("spark.executor.memory", "1g")
sc = SparkContext(conf=conf)

TIME_OUT = 1


class BroadcastWrapper(object):
    def __init__(self, data):
        self.broadcast_var = sc.broadcast(data)
        self.last_updated_time = datetime.now()

    def is_should_be_updated(self, data):
        cur_time = datetime.now()
        diff_sec = (cur_time - self.last_updated_time).total_seconds()
        return self.broadcast_var is None or diff_sec > TIME_OUT

    def update_and_get_data(self, spark):
        new_data = ['new data']
        if self.is_should_be_updated(new_data):
            if self.broadcast_var is not None:
                self.broadcast_var.unpersist()
            self.broadcast_var = spark.broadcast(new_data)
            self.last_updated_time = datetime.now()

        return self.broadcast_var


broadcast_wrapper = BroadcastWrapper(["old data"])
time.sleep(3)

for rule in broadcast_wrapper.update_and_get_data(sc).value:
    print(rule)