天天看點

3.3、Flink流處理(Stream API)- State & Fault Tolerance(狀态和容錯)之 The Broadcast State Pattern(廣播狀态模式)提供的APIs重要内容

目錄

提供的APIs

BroadcastProcessFunction and KeyedBroadcastProcessFunction

重要内容

使用狀态描述算子狀态,該狀态在恢複時均勻地分布在算子的并行任務中,或者統一使用整個狀态來初始化恢複後的并行任務。

第三種受支援的操作符狀态是廣播狀态。廣播狀态被引入以支援這樣的用例:來自一個流的一些資料需要廣播到所有下遊任務,在那裡它被本地存儲,并用于處理另一個流上的所有傳入元素。例如,廣播狀态可以自然地出現,可以想象一個低吞吐量流包含一組規則,我們希望根據來自另一個流的所有元素對這些規則進行評估。

考慮到上述類型的用例,broadcast state 與其他算子狀态的差別在于:

  1. map 格式
  2. 它隻對特定的算子可用,這些算子有一個廣播流和一個非廣播流作為輸入,并且
  3. 這樣的算子可以具有多個具有不同名稱的廣播狀态。

提供的APIs

為了展示所提供的api,我們将從一個示例開始,然後介紹它們的全部功能。作為我們的運作示例,我們将使用這樣一種情況:我們有一個具有不同顔色和形狀的對象流,并且我們希望找到遵循特定模式的相同顔色的對象對,例如,一個矩形後面跟着一個三角形。我們假設一組有趣的模式會随着時間的推移而發展。

在本例中,第一個流将包含帶有顔色和形狀屬性的Item類型的元素。另一個流将包含規則。

從項目流開始,我們隻需要按顔色對它進行 key by,因為我們想要相同顔色的對。這将確定相同顔色的元素最終出現在相同的實體機器上。

// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});
           

繼續讨論規則,包含它們的流應該廣播到所有下遊任務,這些任務應該将它們存儲在本地,以便它們可以根據所有傳入的 item 對它們進行評估。下面的代碼片段将:

  1. 傳播規則流
  2. 使用提供的MapStateDescriptor,它将建立規則存儲的傳播狀态
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
			"RulesBroadcastState",
			BasicTypeInfo.STRING_TYPE_INFO,
			TypeInformation.of(new TypeHint<Rule>() {}));
		
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);
           

 最後,為了針對來自 item 流的輸入元素評估規則,我們需要:

  1. 連接配接兩個流,并且
  2. 指定我們的比對檢測邏輯。

可以通過調用非廣播流上的 connect()來連接配接流(鍵控或非鍵控),參數為 BroadcastStream。這将傳回 BroadcastConnectedStream,我們可以使用特殊類型的 CoProcessFunction 在其上調用 process()。該函數将包含我們的比對邏輯。函數的确切類型取決于非廣播流的類型:

  • 如果它是鍵控的,那麼這個函數就是一個 KeyedBroadcastProcessFunction。
  • 如果它是非鍵控的,則該函數是BroadcastProcessFunction。

鑒于我們的非廣播流是 keyed,以下片段包括上述調用:

注意:連接配接應該在非廣播流上調用,以 BroadcastStream 作為參數。
DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 );
           

BroadcastProcessFunction and KeyedBroadcastProcessFunction

對于 CoProcessFunction,這些函數有兩個要實作的過程方法;processBroadcastElement()負責處理廣播流中的傳入元素,而 processElement()用于非廣播流。方法的完整簽名如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
           
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
           

首先需要注意的是,這兩個函數都需要 processBroadcastElement()方法的實作來處理廣播端中的元素,而 processElement()方法需要實作來處理非廣播端中的元素。

這兩種方法在它們所提供的上下文中是不同的。非廣播方有一個 ReadOnlyContext,而廣播方有一個 Context。

這兩個上下文(以下列舉中的 ctx ):

  1. 允許通路廣播狀态:

    ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)

  2. 允許查詢元素的時間戳:

    ctx.timestamp()

  3. 擷取目前水印:

    ctx.timestamp()

  4. 擷取目前處理時間:

    ctx.currentProcessingTime()

  5. 發出元素:

    ctx.output(OutputTag<X> outputTag, X value)

getBroadcastState() 中的

stateDescriptor 

應該與上面的 .broadcast(ruleStateDescriptor) 中的

stateDescriptor 

相同。

差別在于每個廣播狀态的通路類型。廣播方對其具有讀寫通路權,而非廣播方具有隻讀通路權(即名稱)。原因是在Flink中沒有跨任務通信。為了保證廣播狀态下的内容在算子的所有并行執行個體中是相同的,我們隻允許廣播方進行讀寫通路,在所有任務中看到相同的元素,我們要求這一側的每個輸入元素的計算在所有任務中都是相同的。忽略這條規則會破壞狀态的一緻性保證,導緻不一緻和 debug 結果的困難。

注意:在“processBroadcast()”中實作的邏輯必須在所有并行執行個體中具有相同的确定性行為!

最後,由于KeyedBroadcastProcessFunction是在鍵控流上操作的,是以它公開了BroadcastProcessFunction不可用的一些功能。那就是:

  1. processElement()方法中的ReadOnlyContext允許通路Flink的底層計時器服務,該服務允許注冊事件和/或處理時間計時器。當計時器觸發時,使用OnTimerContext調用onTimer()(如上所示),OnTimerContext公開與ReadOnlyContext plus相同的功能。能夠詢問觸發的計時器是事件還是處理時間;查詢與計時器關聯的 key。
  2. processBroadcastElement() 方法中的 context 包含 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) 方法。

    這允許将 KeyedStateFunction 注冊到與所提供的

    stateDescriptor 

    相關聯的所有鍵的所有狀态。
注意:隻有在' KeyedBroadcastProcessFunction '的' processElement() '處才能注冊計時器,而且隻能在那裡注冊。這在“processBroadcastElement()”方法中是不可能的,因為沒有與廣播元素相關聯的鍵。

回到我們最初的例子,我們的KeyedBroadcastProcessFunction看起來可能如下:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}
           

重要内容

在描述了所提供的api之後,本節将重點介紹在使用廣播狀态時需要記住的重要事項。這些都是:

  • 沒有跨任務通信:如前所述,這就是為什麼隻有(鍵入)broadcastprocessfunction的廣播端可以修改廣播狀态的内容的原因。此外,使用者必須確定所有任務以相同的方式修改每個傳入元素的廣播狀态的内容。否則,不同的任務可能有不同的内容,導緻結果不一緻。
  • 廣播狀态中事件的順序可能因任務而異:盡管廣播流的元素可以保證所有元素(最終)都将轉到所有下遊任務,但是元素到達每個任務的順序可能不同。是以,每個傳入元素的狀态更新不能依賴于傳入事件的順序。
  • 所有任務檢查它們的廣播狀态:雖然發生檢查點時,所有任務的廣播狀态都具有相同的元素(檢查點屏障不會跳過元素),但是所有任務都将檢查它們的廣播狀态,而不是其中一個。這個設計避免在恢複期間從同一檔案讀取所有任務(進而避免熱點),盡管這是以将檢查點狀态的大小增加p(=并行度)為代價的。Flink 保證在恢複/重新縮放時不會有重複和丢失資料。在并行度相同或更小的情況下進行恢複,每個任務讀取其檢查點狀态。在擴充時,每個任務讀取自己的狀态,其餘的任務(p_new-p_old)以循環方式讀取以前任務的檢查點。
  • 沒有 RocksDB 狀态後端:廣播狀态在運作時儲存在記憶體中,應該相應地執行記憶體供應。這适用于所有的操作符狀态。