天天看点

Flink中的Broadcast广播变量

在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变量

Broadcast

便是解决这种情况的。如下代码所示:

val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromElements("1", "2", "3", "4", "5")
    val ds2 = env.fromElements("a", "b", "c", "d", "e")

    ds1.map{
      t =>
        (t, ds2)
    }.print()
           

运行上述代码会报

InvalidProgramException

的错,因为在

ds1

map

算子中无法再去调用

ds2

,此时可以使用广播变量将

ds2

这个变量进行广播,使得

ds2

这一被广播的数据集在

ds1

map

算子的所有并行实例中都可用,具体处理方式如下:

object BroadcastTest {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromElements("1", "2", "3", "4", "5")
    val ds2 = env.fromElements("a", "b", "c", "d", "e")

    ds1.map(new RichMapFunction[String, (String, String)] {
      private var ds2: Traversable[String] = null

      override def open(parameters: Configuration) {
        ds2 = getRuntimeContext.getBroadcastVariable[String]("broadCast").asScala
      }

      def map(t: String): (String, String) = {
        var result = ""
        for (broadVariable <- ds2) {
          result = result + broadVariable + " "
        }
        (t, result)
      }
    }).withBroadcastSet(ds2, "broadCast").print()
  }
}
           

上述代码能够成功运行出如下结果:

(1,a b c d e )

(2,a b c d e )

(3,a b c d e )

(4,a b c d e )

(5,a b c d e )

可以看到,

ds1

map

算子通过访问广播变量成功访问到

ds2

中的数据。该过程分为两步:设置广播变量和获取广播变量。

  1. 设置广播变量

    在某个需要用到该广播变量的算子后调用

    withBroadcastSet(var1, var2)

    进行设置,

    var1

    为需要广播变量的变量名,

    var2

    是自定义变量名,为

    String

    类型。注意,被广播的变量只能为

    DataSet

    类型,不能为

    List

    Int

    String

    等类型。
  2. 获取广播变量

    创建该算子对应的富函数类,例如

    map

    函数的富函数类是

    RichMapFunction

    ,该类有两个构造参数,第一个参数为算子输入数据类型,第二个参数为算子输出数据类型。首先创建一个

    Traversable[_]

    接口用于接收广播变量并初始化为空,接收类型与算子输入数据类型相对应;然后重写

    open

    函数,通过

    getRuntimeContext.getBroadcastVariable[_](var)

    获取到广播变量,var即为设置广播变量时的自定义变量名,类型为

    String

    open

    函数在算子生命周期的初始化阶段便会调用;最后在

    map

    方法中对获取到的广播变量进行访问及其它操作。

注意:只有在某个

Operator

中使用到不属于该

Operator

DataSet

时才需要广播变量,在

iterate

内部可以将某个

DataSet

直接作为起始节点,不需要使用广播变量。

继续阅读