天天看點

Flink_狀态程式設計_2

文章目錄

  • ​​1.算子狀态概述​​
  • ​​1.1 算子狀态分類​​
  • ​​1.2 狀态分析​​
  • ​​1.3 CheckpointedFunction 接口​​
  • ​​2.算子狀态 程式設計案例​​
  • ​​2.1 清單狀态案例​​
  • ​​2.2 廣播機制 案例​​
  • ​​3.狀态持久化和狀态後端​​
  • ​​3.1 檢查點(Checkpoint)​​
  • ​​3.2 狀态後端 state Backends​​
  • ​​4.狀态程式設計總結​​

1.算子狀态概述

1.1 算子狀态分類

  • 算子狀态:

    清單狀态, 聯合清單狀态, 廣播狀态

    ListState, UnionListState, BroadcastState

1.2 狀态分析

  1. 清單狀态: 與 Keyed State 中的 ListState 一樣,将狀态表示為一組資料的清單。與 Keyed State 中的清單狀态的差別是:在算子狀态的上下文中,不會按鍵(key)分别處理狀态,是以每一個并行子任務上隻會保留一個“清單”(list),也就是目前并行子任務上所有狀态項的集合。清單中的狀态項就是可以重新配置設定的最細粒度,彼此之間完全獨立。
  2. 聯合清單狀态: 與 ListState 類似,聯合清單狀态也會将狀态表示為一個清單。它與正常清單狀态的差別在于,算子并行度進行縮放調整時對于狀态的配置設定方式不同。UnionListState 的重點就在于“聯合”(union)。在并行度調整時,正常清單狀态是輪詢配置設定狀态項,而聯合清單狀态的算子則會直接廣播狀态的完整清單。這樣,并行度縮放之後的并行子任務就擷取到了聯合後完整的“大清單”,可以自行選擇要使用的狀态項和要丢棄的狀态項。這種配置設定也叫作“聯合重組”(union redistribution)。如果清單中狀态項數量太多,為資源和效率考慮一般不建議使用聯合重組的方式。
  3. 廣播狀态: 狀态就像被“廣播”到所有分區一樣,這種特殊的算子狀态,就叫作廣播狀态(BroadcastState)。廣播狀态是以類似映射結構(map)的鍵值對(key-value)來儲存的

1.3 CheckpointedFunction 接口

在 Flink 中,對狀态進行持久化儲存的快照機制叫作“檢查點”(Checkpoint)。于是使用算子狀态時,就需要對檢查點的相關操作進行定義,實作一個 CheckpointedFunction 接口。

public interface CheckpointedFunction {
  // 儲存狀态快照到檢查點時,調用這個方法
  void snapshotState(FunctionSnapshotContext context) throws Exception
  // 初始化狀态時調用這個方法,也會在恢複狀态時調用
  void initializeState(FunctionInitializationContext context) throws Exception;
}      

每次應用儲存檢查點做快照時,都會調用.snapshotState()方法,将狀态進行外部持久化。

而在算子任務進行初始化時,會調用. initializeState()方法。

2.算子狀态 程式設計案例

2.1 清單狀态案例

自定義的 SinkFunction 會在CheckpointedFunction 中進行資料緩存,然後統一發送到下遊。這個例子示範了清單狀态的平均分割重組(event-split redistribution)。

緩存機制

public class BufferingSinkExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
                );
        stream.addSink(new BufferingSink(10));
        stream.print("input");

        environment.execute();
    }
    public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction{
        private final int threshold;

        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElement = new ArrayList<>();
        }

        private List<Event> bufferedElement;

        private ListState<Event> listState;

        @Override
        public void invoke(Event value, Context context) throws Exception {
            bufferedElement.add(value);
            if (bufferedElement.size() == threshold){
                for (Event event : bufferedElement) {
                    System.out.println(event);
                }
                bufferedElement.clear();
            }
        }

        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            // 對狀态進行持久化, 複制緩存的清單到 清單狀态
            listState.clear();
            for (Event event : bufferedElement) {
                listState.add(event);
            }
        }

        @Override
        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            // 定義算子狀态
            ListStateDescriptor<Event> listStateDescriptor = new ListStateDescriptor<>("buffered-elements", Event.class);

            listState = functionInitializationContext.getOperatorStateStore().getListState(listStateDescriptor);

            // 如果有故障, 需要将清單狀态中元素 複制到 清單
            if (functionInitializationContext.isRestored()){
                for (Event event : listState.get()) {
                    bufferedElement.add(event);
                }
            }
        }
    }
}      
Flink_狀态程式設計_2

2.2 廣播機制 案例

考慮在電商應用中,往往需要判斷使用者先後發生的行為的“組合模式”,比如“登入-下單”或者“登入-支付”,檢測出這些連續的行為進行統

計,就可以了解平台的運用狀況以及使用者的行為習慣。

public class BehaviorPatternDetectExample {
    // 使用者行為模式
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        // 1.行為的資料流
        DataStream<Action> actionStream = environment.fromElements(
                new Action("Alice", "login"),
                new Action("Alice", "pay"),
                new Action("Bob", "login"),
                new Action("Bob", "order")
        );
        DataStream<Pattern> patternStream = environment.fromElements(
                new Pattern("login", "pay"),
                new Pattern("login", "order")
        );
        // 定義廣播狀态描述器
        MapStateDescriptor<Void, Pattern> stateDescriptor = new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class));
        BroadcastStream<Pattern> broadcast = patternStream.broadcast(stateDescriptor);

        // 連接配接兩條流 處理
        actionStream.keyBy(data -> data.userId)
                .connect(broadcast)
                .process(new PatternDetector())
                .print();

        environment.execute();

        // 2.行為的模式流, 廣播流

    }
    private static class Action{
        public String userId;
        public String action;

        public Action() {
        }

        public Action(String userId, String action) {
            this.userId = userId;
            this.action = action;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId='" + userId + '\'' +
                    ", action='" + action + '\'' +
                    '}';
        }
    }
    public static class Pattern{
        public String active1;
        public String active2;

        public Pattern() {
        }

        public Pattern(String active1, String active2) {
            this.active1 = active1;
            this.active2 = active2;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "active1='" + active1 + '\'' +
                    ", active2='" + active2 + '\'' +
                    '}';
        }
    }
    public static class PatternDetector extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>{
        // 定義一個KeyedState, 儲存上一次使用者的行為
        ValueState<String> preActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            preActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-action", String.class));
        }


        @Override
        public void processElement(Action action, ReadOnlyContext readOnlyContext, Collector<Tuple2<String, Pattern>> collector) throws Exception {
            // 判斷規則
            // 從廣播狀态拿取
            ReadOnlyBroadcastState<Void, Pattern> patternState = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
            // 規則
            Pattern pattern = patternState.get(null);
            // 上一次行為
            String preAction = preActionState.value();
            // 判斷是否比對
            if (pattern != null && preAction != null){
                if (pattern.active1.equals(preAction) && pattern.active2.equals(action.action)){
                    collector.collect(new Tuple2<>(readOnlyContext.getCurrentKey(), pattern));
                }
            }
            preActionState.update(action.action);


        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context context, Collector<Tuple2<String, Pattern>> collector) throws Exception {
            BroadcastState<Void, Pattern> patternState = context.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
            patternState.put(null, pattern);

        }
    }
}      
Flink_狀态程式設計_2

3.狀态持久化和狀态後端

在 Flink 的狀态管理機制中,很重要的一個功能就是對狀态進行持久化(persistence)儲存,這樣就可以在發生故障後進行重新開機恢複。Flink 對狀态進行持久化的方式,就是将目前所有分布式狀态進行“快照”儲存,寫入一個“檢查點”(checkpoint)或者儲存點(savepoint)

儲存到外部存儲系統中。具體的存儲媒體,一般是分布式檔案系統(distributed file system)。

3.1 檢查點(Checkpoint)

有狀态流應用中的檢查點(checkpoint),其實就是所有任務的狀态在某個時間點的一個快照(一份拷貝)。簡單來講,就是一次“存盤”,讓我們之前處理資料的進度不要丢掉。在一個流應用程式運作時,Flink 會定期儲存檢查點,在檢查點中會記錄每個算子的 id 和狀态;如果發生故障,Flink 就會用最近一次成功儲存的檢查點來恢複應用的狀态,重新啟動處理流程,就如同“讀檔”一樣。

預設情況下,檢查點是被禁用的.

environment.enableCheckpointing(10000L);      

3.2 狀态後端 state Backends

檢查點的儲存離不開 JobManager 和 TaskManager,以及外部存儲系統的協調。在應用進行檢查點儲存時,首先會由 JobManager 向所有 TaskManager 發出觸發檢查點的指令;TaskManger 收到之後,将目前任務的所有狀态進行快照儲存,持久化到遠端的存儲媒體中;

完成之後向 JobManager 傳回确認資訊。這個過程是分布式的,當 JobManger 收到所有TaskManager 的傳回資訊後,就會确認目前檢查點成功儲存

Flink_狀态程式設計_2

上圖為檢查點的儲存的圖

Flink 中,狀态的存儲、通路以及維護,都是由一個可插拔的元件決定的,這個元件就叫作狀态後端(state backend)。狀态後端主要負責兩件事:一是本地的狀态管理,二是将檢查點(checkpoint)寫入遠端的持久化存儲。

哈希表狀态後端(HashMapStateBackend)

把狀态存放在記憶體裡

environment.setStateBackend(new HashMapStateBackend());      

内嵌 RocksDB 狀态後端(EmbeddedRocksDBStateBackend)

RocksDB 是一種内嵌的 key-value 存儲媒體,可以把資料持久化到本地硬碟。

<dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}artifactId>
            <version>1.13.0version>
        dependency>      
environment.setStateBackend(new EmbeddedRocksDBStateBackend());      

如何選擇正确的狀态後端

HashMap 和 RocksDB 兩種狀态後端最大的差別,就在于本地狀态存放在哪裡:前者是記憶體,後者是 RocksDB。在實際應用中,選擇那種狀态後端,主要是需要根據業務需求在處理性能和應用的擴充性上做一個選擇。

HashMapStateBackend 是記憶體計算,讀寫速度非常快;但是,狀态的大小會受到叢集可用記憶體的限制,如果應用的狀态随着時間不停地增長,就會耗盡記憶體資源。

而 RocksDB 是硬碟存儲,是以可以根據可用的磁盤空間進行擴充,而且是唯一支援增量檢查點的狀态後端,是以它非常适合于超級海量狀态的存儲。不過由于每個狀态的讀寫都需要做序列化/反序列化,而且可能需要直接從磁盤讀取資料,這就會導緻性能的降低,平均讀寫

性能要比 HashMapStateBackend 慢一個數量級。

4.狀态程式設計總結

有狀态的流處理是 Flink 的本質,是以狀态可以說是 Flink 中最為重要的概念。之前聚合算子、視窗算子中已經提到了狀态的概念。