我需要使用Python在Spark Structured Streaming中随时间更新广播变量(例如,在定义的时间间隔内)。好多资料都是用Scala或Java。 用Python编写Broadcast Wrapper类 如下:
import time
from datetime import datetime
from pyspark import SparkConf, SparkContext
conf = SparkConf() \
.setMaster("local") \
.setAppName("My app") \
.set("spark.executor.memory", "1g")
sc = SparkContext(conf=conf)
TIME_OUT = 1
class BroadcastWrapper(object):
def __init__(self, data):
self.broadcast_var = sc.broadcast(data)
self.last_updated_time = datetime.now()
def is_should_be_updated(self, data):
cur_time = datetime.now()
diff_sec = (cur_time - self.last_updated_time).total_seconds()
return self.broadcast_var is None or diff_sec > TIME_OUT
def update_and_get_data(self, spark):
new_data = ['new data']
if self.is_should_be_updated(new_data):
if self.broadcast_var is not None:
self.broadcast_var.unpersist()
self.broadcast_var = spark.broadcast(new_data)
self.last_updated_time = datetime.now()
return self.broadcast_var
broadcast_wrapper = BroadcastWrapper(["old data"])
time.sleep(3)
for rule in broadcast_wrapper.update_and_get_data(sc).value:
print(rule)