天天看點

Flink執行個體(三十七):狀态管理(八)自定義操作符狀态(三)廣播狀态(Broadcast state)(一) KeyedBroadcastProcessFunction

什麼是 Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在開發過程中,如果遇到需要下發/廣播配置、規則等低吞吐事件流到下遊所有 task 時,就可以使用 Broadcast State 特性。下遊的 task 接收這些配置、規則并儲存為 BroadcastState, 将這些配置應用到另一個資料流的計算中 。英語好的同學可以直接移步 Flink 官方介紹

Broadcast State 差別于其他 operator state 的地方有:

  • Broadcast State 類似 Map 結構,可以 put get putAll remove 等
  • 必須有一條廣播流和一條非廣播流
  • 符合條件的 operator 可以有多個不同名字的 BroadcastState(疑惑:普通的 operator 也可以有多個不同名字的 state 吧,隻是不是 BroadcastState。這麼想也說得通了)

Broadcast state 示例

下面從一個示例來認識如何使用 Broadcast state. 我們對 wordcount 的例子都很熟悉,就簡單改造下 wordcount吧。我們的改造目标是:實時控制輸出結果中的單詞長度。

首先大體說一下思路,準備兩個流,一個資料流(wordcount 需要統計的流) A,一個配置流(即廣播流,後面有生成方法) B,這兩個流的來源都可以自己定義,這裡我們都用 kafka 作為輸入源;然後用 

A.keyBy(0).connect(B)

, 這裡注意,一定是用資料流[.func()].connect(廣播流),生成一個新的 BroadcastConnectedStream C;最後 C.process(new KeyedBroadcastProcessFunction<…>(…)) 進行邏輯處理。

1.資料流消費者

資料流就是 wordcount 程式統計的普通文本

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic-data",...);      

2.廣播流消費者

我們定義了廣播流一條消息的格式為 {“length”:n} .其中 n 為數字,表示單詞的最大長度。

FlinkKafkaConsumer<String> consumerBroadcast = new FlinkKafkaConsumer<>("input-topic-config",...);      

3.生成資料流 A

這裡對資料流進行了 wordcount 中的分詞操作,輸出流為 <單詞, 數量, 生成時間>

DataStream<Tuple3<String, Integer, Long>> dataStream = env.addSource(consumer).flatMap(new LineSplitter());      

4.生成廣播流 B 并廣播

我們知道,在 Flink 中,通路 state 前先要定義狀态描述符(StateDescriptor). BroadcastState 的狀态描述符是 MapStateDescriptor. MapStateDescriptor 的 value 類型即是廣播流的元素類型,這個例子裡是 Map<String,Object>。前面我們定義了廣播的原始消息格式為 json 字元串,在這裡我們通過 flatMap 函數轉化成 Map<String,Object> 類型。轉化完成後,調用 broadcast() 将這個流廣播出去,這樣,每個 task 都會收到廣播流資料并在本地儲存一份 BroadcastState,實際上,生成 BroadcastState 是在後續的 process() 中操作的。

// 定義 MapStateDescriptor
final MapStateDescriptor<String,Map<String,Object>> broadCastConfigDescriptor = new MapStateDescriptor<>("broadCastConfig",BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, Object.class));
// i.e. {"length":5}
BroadcastStream<Map<String,Object>> broadcastStream = env.addSource(consumerBroadcast).
         flatMap(new FlatMapFunction<String, Map<String,Object>>() {
                    // 解析 json 資料
                    private final ObjectMapper mapper = new ObjectMapper();
                    @Override
                    public void flatMap(String value, Collector<Map<String,Object>> out) {
                        try {
                             out.collect(mapper.readValue(value, Map.class));
                        } catch (IOException e) {
                             e.printStackTrace();
                             System.out.println(value);
                        }
                    }
               }
        // 這裡需要調用 broadcast 廣播出去,并且隻能是 MapStateDescriptor 類型。可以指定多個
        ).broadcast(broadCastConfigDescriptor); //這裡可以指定多個descriptor      

5.連接配接兩個流

接下來是兩個流的連接配接部分了。前面說過,必須是 資料流.connect(廣播流). 這裡又分成兩種情況

  • noKeyedStream.connect(BroadcastStream).process(new BroadcastProcessFunction<>(…)) 非 KeyedStream 連接配接 BroadcastStream 的,隻能使用 BroadcastProcessFunction 函數處理連接配接邏輯
  • KeyedStream.connect(BroadcastStream).process(new KeyedBroadcastProcessFunction<>(…)) KeyedStream 連接配接 BroadcastStream 的,隻能使用 KeyedBroadcastProcessFunction 函數處理連接配接邏輯

    KeyedBroadcastProcessFunction 比 BroadcastProcessFunction 多了計時器服務和擷取目前 key 接口,當然,這兩個功能不一定能用到。

我們這裡使用的是 KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>:

  KS 是 KeyedStream 中 key 的類型;IN1 是資料流(即非廣播流)的元素類型;IN2 是廣播流的元素類型;OUT 是兩個流連接配接完成後,輸出流的元素類型。

dataStream.keyBy(0).connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple3<String, Integer, Long>, Map<String, Object>, Tuple2<String,Integer>>(){...}      

我們單獨把 KeyedBroadcastProcessFunction 摘出來,這個函數用于處理具體的連接配接邏輯和業務邏輯。主要需要實作以下兩個函數:

  • public void processElement(Tuple3<String, Integer, Long> value, ReadOnlyContext ctx, Collector<Tuple2<String, Integer>> out):

     這個函數處理資料流的資料,這裡之隻能擷取到 ReadOnlyBroadcastState,因為 Flink 不允許在這裡修改 BroadcastState 的狀态。value 是資料流中的一個元素;ctx 是上下文,可以提供計時器服務、目前 key和隻讀的 BroadcastState;out 是輸出流收集器。

  • public void processBroadcastElement(Map<String, Object> value, Context ctx, Collector<Tuple2<String,Integer>> out):

    這裡處理廣播流的資料,将廣播流資料儲存到 BroadcastState 中。value 是廣播流中的一個元素;ctx 是上下文,提供 BroadcastState 和修改方法;out 是輸出流收集器。

下面是示例源碼:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

public class BroadCastWordCountExample {
    public static void main (String[] args) throws Exception {
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        if (parameterTool.getNumberOfParameters() < 5) {
            System.out.println("Missing parameters!\n" +
                    "Usage: Kafka --input-topic-data <topic> --input-topic-config <topic> --output-topic <topic> " +
                    "--bootstrap.servers <kafka brokers> " +
                    "--group.id <some id> --auto.offset.reset <latest, earliest, none>");
            return;
        }

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(parameterTool.getInt("checkpoint.interval",60000)); // create a checkpoint every n mill seconds

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // make sure 500 ms of progress happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within one minute, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(parameterTool);

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                parameterTool.getRequired("input-topic-data"),
                new SimpleStringSchema(),
                parameterTool.getProperties());
        FlinkKafkaConsumer<String> consumerBroadcast = new FlinkKafkaConsumer<>(
                parameterTool.getRequired("input-topic-config"),
                new SimpleStringSchema(),
                parameterTool.getProperties());

        DataStream<Tuple3<String, Integer, Long>> dataStream = env.addSource(consumer).flatMap(new LineSplitter());
        final MapStateDescriptor<String,Map<String,Object>> broadCastConfigDescriptor = new MapStateDescriptor<>("broadCastConfig",
                BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, Object.class));
        // e.g. {"length":5}
        BroadcastStream<Map<String,Object>> broadcastStream = env.addSource(consumerBroadcast).
                flatMap(new FlatMapFunction<String, Map<String, Object>>() {
                            // 解析 json 資料
                            private final ObjectMapper mapper = new ObjectMapper();

                            @Override
                            public void flatMap(String value, Collector<Map<String, Object>> out) {
                                try {
                                    out.collect(mapper.readValue(value, Map.class));
                                } catch (IOException e) {
                                    e.printStackTrace();
                                    System.out.println(value);
                                }
                            }
                        }
                ).broadcast(broadCastConfigDescriptor); //這裡可以指定多個descriptor

        dataStream.keyBy(0).connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple3<String, Integer, Long>, Map<String, Object>, Tuple2<String,Integer>>() {
            private final Logger logger = LoggerFactory.getLogger(BroadCastWordCountExample.class);
            private transient MapState<String, Integer> counterState;
            int length = 5;
            // 必須和上文的 broadCastConfigDescriptor 一緻,否則報 java.lang.IllegalArgumentException: The requested state does not exist 的錯誤
            private final MapStateDescriptor<String, Map<String,Object>> broadCastConfigDescriptor = new MapStateDescriptor<>("broadCastConfig", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, Object.class));
            private final MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("counter",String.class, Integer.class);
            @Override
            public void open(Configuration parameters) throws Exception{
                counterState = getRuntimeContext().getMapState(descriptor);
                logger.info("get counter/globalConfig MapState from checkpoint");
            }
            /**
             * 這裡處理資料流的資料
             * */
            @Override
            public void processElement(Tuple3<String, Integer, Long> value, ReadOnlyContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                /**
                 * 這裡之隻能擷取到 ReadOnlyBroadcastState,因為 Flink 不允許在這裡修改 BroadcastState 的狀态
                 * */
                // 從廣播狀态中擷取規則
                ReadOnlyBroadcastState<String, Map<String,Object>> broadcastState = ctx.getBroadcastState(broadCastConfigDescriptor);
                if (broadcastState.contains("broadcastStateKey")) {
                    length = (Integer) broadcastState.get("broadcastStateKey").get("length");
                }
                if (value.f0.length() > length) {
                    logger.warn("length of str {} > {}, ignored", value.f0, length);
                    return;
                }
                if (counterState.contains(value.f0)) {
                    counterState.put(value.f0, counterState.get(value.f0) + value.f1);
                } else {
                    counterState.put(value.f0, value.f1);
                }
                out.collect(new Tuple2<>(value.f0, counterState.get(value.f0)));
            }
            /**
             * 這裡處理廣播流的資料
             * */
            @Override
            public void processBroadcastElement(Map<String, Object> value, Context ctx, Collector<Tuple2<String,Integer>> out) throws Exception {
                if (!value.containsKey("length")) {
                    logger.error("stream element {} do not contents \"length\"", value);
                    return;
                }

                /*ctx.applyToKeyedState(broadCastConfigDescriptor, (key, state) -> {
                     // 這裡可以修改所有 broadCastConfigDescriptor 描述的 state
                });*/
                /** 這裡擷取 BroadcastState,BroadcastState 包含 Map 結構,可以修改、添加、删除、疊代等
                 * */
                BroadcastState<String, Map<String,Object>> broadcastState = ctx.getBroadcastState(broadCastConfigDescriptor);
                // 前面說過,BroadcastState 類似于 MapState.這裡的 broadcastStateKey 是随意指定的 key, 用于示例
                // 更新廣播流的規則到廣播狀态: BroadcastState
                if (broadcastState.contains("broadcastStateKey")) {
                    Map<String, Object> oldMap = broadcastState.get("broadcastStateKey");
                    logger.info("get State {}, replaced with State {}",oldMap,value);
                } else {
                    logger.info("do not find old State, put first counterState {}",value);
                }
                broadcastState.put("broadcastStateKey",value);
            }
        }).print();

        env.execute("BroadCastWordCountExample");
    }
}      

示例測試:

1.啟動 flink 叢集,并行度為2,運作該 job;

2.資料流輸入:

No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar      

task 列印輸出:

2> (no,1)
1> (for,1)
2> (path,1)
1> (jar,1)
2> (the,1)
2> (flink,1)
2> (using,1)
2> (the,2)
1> (jar,2)
2> (of,1)
2> (class,1)
2> (to,1)
2> (the,3)      

3.廣播流輸入:

{"length":6}      

資料流輸入相同資料:

No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar      
2> (no,2)
2> (path,2)
1> (for,2)
2> (the,4)
1> (jar,3)
2> (flink,2)
2> (using,2)
2> (the,5)
1> (jar,4)
2> (of,2)
2> (class,2)
2> (to,2)
2> (locate,1)
2> (the,6)      

使用 broadcast state 時需要注意的事項