天天看點

FLINK基礎(146): DS算子與視窗(27)任務鍊

0 Flink的任務鍊

  Flink 中的每個算子都可以設定并行度,每個算子的一個并行度執行個體就是一個 subTask。由于 Flink 的 TaskManager 運作 Task 的時候是每個 Task 采用一個單獨的線程,這會帶來很多線程切換和資料交換的開銷,進而影響吞吐量。

  為了避免資料在網絡或線程之間傳輸導緻的開銷,Flink 會在 JobGraph 階段,将代碼中可以優化的算子優化成一個算子鍊(Operator Chains)以放到一個 Task 中執行。

使用者也可以自己指定相應的鍊條,将相關性非常強的轉換操作綁定在一起,這樣能夠讓轉換過程中上下遊的 Task 在同一個 Pipeline 中執行,進而避免因為資料在網絡或者線程間傳輸導緻的開銷,提高整體的吞吐量和延遲。

  一般情況下,Flink 在 Map 操作中預設開啟 TaskChain,以提高 Flink 作業的整體性能。

如圖1,Source 和 Map 在優化後,組成一個算子鍊,作為一個 task 運作在一個線程上,其簡圖如 Condensed view 所示,并行圖如 parellelized view 所示。

FLINK基礎(146): DS算子與視窗(27)任務鍊

Flink提供了更細粒度的任務鍊控制方法,使用者可根據需求建立任務鍊或禁止任務鍊。

1 禁用全局任務鍊

evn.disableOperatorChaining();      

關閉全局任務鍊後,建立對應Operator Chain,需要使用者先指定操作符,然後再調用

startNewChain()

方法建立。

dataStream.keyBy(0).filter().map().startNewChain().map();      

startNewChain

方法建立的鍊條隻對調用方法的前一個操作符和後一個操作符有效,不影響其他的。比如示例中建立的鍊條隻有

map->map

,對前面的

filter

無效。

禁用全局任務鍊會影響整體任務執行的情況,禁用前,要清楚任務執行的流程,否則可能造成非預期的結果。

2 禁用局部任務鍊

如果不想關閉整體算子上的鍊條,隻是想關閉部分算子上鍊條綁定,可以使用

disableChaining()

方法禁用目前操作符上的鍊條。

dataStream.keyBy(0).filter().map().disableChaining()      

上述代碼隻會禁用map操作上的任務鍊,不會影響其他操作符。

3 開始新鍊

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...);      

4 Set Slot Sharing Group

Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don’t have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is “default”, operations can explicitly be put into this group by calling slotSharingGroup(“default”).

someStream.filter(...).slotSharingGroup("name");      

5  Operator Chains(操作鍊)

  • Flink出于分布式執行的目的,将operator的subtask連結在一起形成task(類似spark中的管道)。
  • 每個task在一個線程中執行。
  • 将operators連結成task是非常有效的優化:它可以減少線程與線程間的切換和資料緩沖的開銷,并在降低延遲的同時提高整體吞吐量。
  • 連結的行為可以在程式設計API中進行指定,詳情請見代碼OperatorChainTest。
  • 開啟操作鍊 和 禁用操作鍊的對比圖(預設開啟):
  • Flink預設會将多個operator進行串聯,形成任務鍊(task chain)
  • 注意: task chain 可以了解為就是 operator chain 隻是不同場景下,稱呼不同。
  • 我們也可以禁用任務鍊,讓每個operator形成一個task。
  • StreamExecutionEnvironment.disableOperatorChaining() 這個方法會禁用整條工作鍊
  • 操作鍊其實就是類似spark的pipeline管道模式,一個task可以執行同一個窄依賴中的算子操作。
  • 我們也可以細粒度的控制工作鍊的形成,比如調用dataStreamSource.map(...).startNewChain(),但不能使用dataStreamSource.startNewChain()
  • dataStreamSource.filter(...).map(...).startNewChain().map(...),需要注意的是,當這樣寫時相當于source和filter組成一條鍊,兩個map組成一條鍊。
package com.ronnie.flink.stream.test;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 *  開啟與禁用工作鍊時,輸出的結果不一樣。
 *  當開啟工作鍊時(預設啟動),operator map1與map2 組成一個task.
 *     此時task運作時,對于hello,flink 這兩條資料是:
 *     先列印 hello ---- 1 , hello->1 ---- 2
 *     後列印 flink ---- 1 , flink->1 ---- 2
 *  當禁用工作鍊時,operator map1與map2 分别在兩個task中執行
 *     此時task運作時,對于hello,flink 這兩條資料是:
 *     先列印 hello ---- 1 , flink ---- 1
 *     後列印 hello->1 ---- 2  , flink->1 ---- 2
 *
 *  注:操作鍊類似spark的管道,一個task執行多個的算子.
 */
public class OperatorChainTest {

    public static final String[] WORDS = new String[] {
            "hello",
            "flink",
            "spark",
            "hbase"
    };

    public static void main(String[] args) {
        // 設定執行環境, 類似spark中初始化sparkContext一樣
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // 關閉操作鍊..
        env.disableOperatorChaining();

        DataStreamSource<String> dataStreamSource = env.fromElements(WORDS);

        SingleOutputStreamOperator<String> pairStream = dataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                System.err.println(value + " ---- 1");
                return value + "->1";
            }
        }).map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                System.err.println(value + " ---- 2");
                return value + "->2";
            }
        });

        // 還可以控制更細粒度的任務鍊,比如指明從哪個operator開始形成一條新的鍊
        // someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}      

繼續閱讀