天天看點

Flink運作時架構

 1. 系統架構

Flink 運作時的兩大架構。JobManager作業管理器和TaskManager任務管理器。

JobManager:真正的管理者(master),負責管理和排程。在不考慮高可用的情況下隻有一個。

TaskManager:可以了解為工作中(worker, slave)。可以有一個或者多個。

作業送出和任務處理時的系統如下:

Flink運作時架構

  用戶端并不是處理系統的一部分,隻負責作業的送出。負責調用程式的main 方法,将代碼轉換成資料流圖(Dataflow Graph) ,并且最終生成作業圖(Job Graph),然後發送給JobManager。也可以從JobManager 擷取任務的執行狀态和結果。 TaskManager 啟動之後,JobManager 會與它建立連接配接,并将作業圖(Job Graph) 轉換成可執行的執行圖(ExecutionGraph) 然後分發給可用的TaskManager。

1. JobManager

    一個叢集中任務管理和排程中心。JobManager 又包含三個元件:

1. JobMaster

  負責處理單獨的作業,JobMaster 接收要執行的應用。包括:jar包、資料流圖、作業圖。JobMaster 會把JobGraph 轉換成一個實體層面的資料流圖,叫做執行圖(包含了所有可以并發執行的任務)。JobMaster 會向資料總管(ResourceManager)送出請求,申請執行任務必要的資源,一旦擷取到足夠資源就會将執行圖分發到真正運作的TaskManager 上。

  在運作過程中,JobMaster 會負責所有需要中央協調的操作,比如說檢查點的協調等。

2. ResourceManager

  主要負責資源的配置設定和管理。所謂資源主要是指TaskManager 的任務槽。任務槽是Flink 叢集中的資源協調單元,包含了機器用來執行計算的一組CPU和記憶體資源。 每一個任務都需要配置設定到一個slot 上執行。

3. Dispatcher

  主要負責提供一個REST 接口,用來送出應用,并且負責為每一個新送出的作業啟動一個新的JobMaster 元件。 Dispatcher 也會啟動一個WEB UI,用來友善的展示和監控作業執行的資訊。

2. TaskManager

  Flink 中的工作程序,也被稱為worker。 一個叢集包含一個或多個TaskManager,每個TaskManager 都包含一定數量的任務槽(task slots)。slot 的數量限制了TaskManager 能并行處理的任務數量。

  啟動後,TaskManager 向資料總管注冊它的slots;收到資料總管的指令後會将一個或多個槽位提供給JobMaster 調用,用于配置設定任務。

  執行過程中,TaskManager 可以緩沖資料,還可以跟其他運作同一應用的TaskManager 交換資料。

2. 作業送出流程

   可以用下圖表示

Flink運作時架構

(1)用戶端APP通過分發器提供REST 接口,将作業送出給JobManager

(2)分發器啟動JobMaster, 将作業(包含JobGraph)送出給JobMaster

(3)JobMaster 将JobGraph 解析為ExecutionGraph,得到所需的資源數量,然後向資料總管請求資源(slots)

(4)資料總管協調資源

(5)Taskmanager 啟動隻會向ResourceManager 注冊自己的可以slots

(6)資料總管通知TaskManager 為新的作業提供slots

(7)TaskManager連接配接到對應的JobMaster,提供slots

(8)JobMaster 将需要執行的任務分發給TaskManager

(9)TaskManager 執行任務,互相之間可以交換

3. 重要概念

  通過這些核心概念,我們可以明白:

1》怎樣從Flink程式得到任務?

2》一個流處理程式,到底包含多少個任務?

3》最終執行任務,需要占用多少slot?

1. 資料流圖(Dataflow Graph)

  Flink 是流式計算架構,它的程式結構其實就是定義了一連串的操作,每個資料輸入之後都會調用每一個步驟一次計算,每一個操作都叫做"算子"(operator),可以了解為我們的程式是一串算子構成的管道,資料則像水流一樣有序地流過。 

  所有的程式都由三部分組成。source(源算子,負責讀取資料)、Transformation(轉換算子,負責處理資料)、Sink(下沉子算子,負責資料的輸出)。

  在運作時,Flink 程式會被映射成所有算子按照邏輯順序拼接成一張圖,這種圖被稱為邏輯資料流(資料流圖)。資料流圖類似于任意的有向無環圖(DAG-Directed Acyclic Graph)。圖中的每一條資料流以一個或者多個source 開始,以一個或者多個sink 結束。

  代碼中,除了source和sink,其他可以被稱為代碼中如果傳回值是 SingleOutputStreamOperator 的API 就可以稱為一個算子,否則不會計算為算子(隻能了解為中間的轉換操作),比如:keyBy 傳回值是 KeyedStream 就不是一個算子;org.apache.flink.streaming.api.datastream.KeyedStream#sum(int) 就是一個算子。

  常見的算子:

source:讀txt、socket、自定義輸入等

transformation:flatMap、map、filter、process 處理操作,還有sum、max、maxBy、min、minBy 等也都是聚合算子(名字都是Keyed Aggregation)

sink: print、printToErr、writeAsText、writeAsCsv 等

org.apache.flink.streaming.api.datastream.DataStream 源碼可以看出每個算子都有一個特定名稱:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.datastream;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.Strategy;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

@Public
public class DataStream<T> {
    protected final StreamExecutionEnvironment environment;
    protected final Transformation<T> transformation;

    public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
        this.environment = (StreamExecutionEnvironment)Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
        this.transformation = (Transformation)Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
    }

    @Internal
    public int getId() {
        return this.transformation.getId();
    }

    public int getParallelism() {
        return this.transformation.getParallelism();
    }

    @PublicEvolving
    public ResourceSpec getMinResources() {
        return this.transformation.getMinResources();
    }

    @PublicEvolving
    public ResourceSpec getPreferredResources() {
        return this.transformation.getPreferredResources();
    }

    public TypeInformation<T> getType() {
        return this.transformation.getOutputType();
    }

    protected <F> F clean(F f) {
        return this.getExecutionEnvironment().clean(f);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.environment;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.environment.getConfig();
    }

    @SafeVarargs
    public final DataStream<T> union(DataStream<T>... streams) {
        List<Transformation<T>> unionedTransforms = new ArrayList();
        unionedTransforms.add(this.transformation);
        DataStream[] var3 = streams;
        int var4 = streams.length;

        for(int var5 = 0; var5 < var4; ++var5) {
            DataStream<T> newStream = var3[var5];
            if (!this.getType().equals(newStream.getType())) {
                throw new IllegalArgumentException("Cannot union streams of different types: " + this.getType() + " and " + newStream.getType());
            }

            unionedTransforms.add(newStream.getTransformation());
        }

        return new DataStream(this.environment, new UnionTransformation(unionedTransforms));
    }

    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
        return new ConnectedStreams(this.environment, this, dataStream);
    }

    @PublicEvolving
    public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
        return new BroadcastConnectedStream(this.environment, this, (BroadcastStream)Preconditions.checkNotNull(broadcastStream), broadcastStream.getBroadcastStateDescriptors());
    }

    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream(this, (KeySelector)this.clean(key));
    }

    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(keyType);
        return new KeyedStream(this, (KeySelector)this.clean(key), keyType);
    }

    /** @deprecated */
    @Deprecated
    public KeyedStream<T, Tuple> keyBy(int... fields) {
        return !(this.getType() instanceof BasicArrayTypeInfo) && !(this.getType() instanceof PrimitiveArrayTypeInfo) ? this.keyBy((Keys)(new ExpressionKeys(fields, this.getType()))) : this.keyBy((KeySelector)KeySelectorUtil.getSelectorForArray(fields, this.getType()));
    }

    /** @deprecated */
    @Deprecated
    public KeyedStream<T, Tuple> keyBy(String... fields) {
        return this.keyBy((Keys)(new ExpressionKeys(fields, this.getType())));
    }

    private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
        return new KeyedStream(this, (KeySelector)this.clean(KeySelectorUtil.getSelectorForKeys(keys, this.getType(), this.getExecutionConfig())));
    }

    /** @deprecated */
    @Deprecated
    public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) {
        ExpressionKeys<T> outExpressionKeys = new ExpressionKeys(new int[]{field}, this.getType());
        return this.partitionCustom(partitioner, (Keys)outExpressionKeys);
    }

    /** @deprecated */
    @Deprecated
    public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) {
        ExpressionKeys<T> outExpressionKeys = new ExpressionKeys(new String[]{field}, this.getType());
        return this.partitionCustom(partitioner, (Keys)outExpressionKeys);
    }

    public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        return this.setConnectionType(new CustomPartitionerWrapper((Partitioner)this.clean(partitioner), (KeySelector)this.clean(keySelector)));
    }

    private <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, Keys<T> keys) {
        KeySelector<T, K> keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, this.getType(), this.getExecutionConfig());
        return this.setConnectionType(new CustomPartitionerWrapper((Partitioner)this.clean(partitioner), (KeySelector)this.clean(keySelector)));
    }

    public DataStream<T> broadcast() {
        return this.setConnectionType(new BroadcastPartitioner());
    }

    @PublicEvolving
    public BroadcastStream<T> broadcast(MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
        Preconditions.checkNotNull(broadcastStateDescriptors);
        DataStream<T> broadcastStream = this.setConnectionType(new BroadcastPartitioner());
        return new BroadcastStream(this.environment, broadcastStream, broadcastStateDescriptors);
    }

    @PublicEvolving
    public DataStream<T> shuffle() {
        return this.setConnectionType(new ShufflePartitioner());
    }

    public DataStream<T> forward() {
        return this.setConnectionType(new ForwardPartitioner());
    }

    public DataStream<T> rebalance() {
        return this.setConnectionType(new RebalancePartitioner());
    }

    @PublicEvolving
    public DataStream<T> rescale() {
        return this.setConnectionType(new RescalePartitioner());
    }

    @PublicEvolving
    public DataStream<T> global() {
        return this.setConnectionType(new GlobalPartitioner());
    }

    @PublicEvolving
    public IterativeStream<T> iterate() {
        return new IterativeStream(this, 0L);
    }

    @PublicEvolving
    public IterativeStream<T> iterate(long maxWaitTimeMillis) {
        return new IterativeStream(this, maxWaitTimeMillis);
    }

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
        TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
        return this.map(mapper, outType);
    }

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
        return this.transform("Map", outputType, (OneInputStreamOperator)(new StreamMap((MapFunction)this.clean(mapper))));
    }

    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
        return this.flatMap(flatMapper, outType);
    }

    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
        return this.transform("Flat Map", outputType, (OneInputStreamOperator)(new StreamFlatMap((FlatMapFunction)this.clean(flatMapper))));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
        TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(processFunction, ProcessFunction.class, 0, 1, TypeExtractor.NO_INDEX, this.getType(), Utils.getCallLocationName(), true);
        return this.process(processFunction, outType);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction, TypeInformation<R> outputType) {
        ProcessOperator<T, R> operator = new ProcessOperator((ProcessFunction)this.clean(processFunction));
        return this.transform("Process", outputType, (OneInputStreamOperator)operator);
    }

    public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
        return this.transform("Filter", this.getType(), (OneInputStreamOperator)(new StreamFilter((FilterFunction)this.clean(filter))));
    }

    @PublicEvolving
    public <R extends Tuple> SingleOutputStreamOperator<R> project(int... fieldIndexes) {
        return (new StreamProjection(this, fieldIndexes)).projectTupleX();
    }

    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
        return new CoGroupedStreams(this, otherStream);
    }

    public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
        return new JoinedStreams(this, otherStream);
    }

    /** @deprecated */
    @Deprecated
    public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
        return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.windowAll(TumblingProcessingTimeWindows.of(size)) : this.windowAll(TumblingEventTimeWindows.of(size));
    }

    /** @deprecated */
    @Deprecated
    public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
        return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.windowAll(SlidingProcessingTimeWindows.of(size, slide)) : this.windowAll(SlidingEventTimeWindows.of(size, slide));
    }

    public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
        return this.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
        return this.windowAll(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
    }

    @PublicEvolving
    public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
        return new AllWindowedStream(this, assigner);
    }

    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy) {
        WatermarkStrategy<T> cleanedStrategy = (WatermarkStrategy)this.clean(watermarkStrategy);
        int inputParallelism = this.getTransformation().getParallelism();
        TimestampsAndWatermarksTransformation<T> transformation = new TimestampsAndWatermarksTransformation("Timestamps/Watermarks", inputParallelism, this.getTransformation(), cleanedStrategy);
        this.getExecutionEnvironment().addOperator(transformation);
        return new SingleOutputStreamOperator(this.getExecutionEnvironment(), transformation);
    }

    /** @deprecated */
    @Deprecated
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
        AssignerWithPeriodicWatermarks<T> cleanedAssigner = (AssignerWithPeriodicWatermarks)this.clean(timestampAndWatermarkAssigner);
        WatermarkStrategy<T> wms = new Strategy(cleanedAssigner);
        return this.assignTimestampsAndWatermarks((WatermarkStrategy)wms);
    }

    /** @deprecated */
    @Deprecated
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {
        AssignerWithPunctuatedWatermarks<T> cleanedAssigner = (AssignerWithPunctuatedWatermarks)this.clean(timestampAndWatermarkAssigner);
        WatermarkStrategy<T> wms = new org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter.Strategy(cleanedAssigner);
        return this.assignTimestampsAndWatermarks((WatermarkStrategy)wms);
    }

    @PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction();
        return this.addSink(printFunction).name("Print to Std. Out");
    }

    @PublicEvolving
    public DataStreamSink<T> printToErr() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction(true);
        return this.addSink(printFunction).name("Print to Std. Err");
    }

    @PublicEvolving
    public DataStreamSink<T> print(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false);
        return this.addSink(printFunction).name("Print to Std. Out");
    }

    @PublicEvolving
    public DataStreamSink<T> printToErr(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, true);
        return this.addSink(printFunction).name("Print to Std. Err");
    }

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public DataStreamSink<T> writeAsText(String path) {
        return this.writeUsingOutputFormat(new TextOutputFormat(new Path(path)));
    }

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
        TextOutputFormat<T> tof = new TextOutputFormat(new Path(path));
        tof.setWriteMode(writeMode);
        return this.writeUsingOutputFormat(tof);
    }

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public DataStreamSink<T> writeAsCsv(String path) {
        return this.writeAsCsv(path, (WriteMode)null, "\n", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
    }

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode) {
        return this.writeAsCsv(path, writeMode, "\n", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
    }

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) {
        Preconditions.checkArgument(this.getType().isTupleType(), "The writeAsCsv() method can only be used on data streams of tuples.");
        CsvOutputFormat<X> of = new CsvOutputFormat(new Path(path), rowDelimiter, fieldDelimiter);
        if (writeMode != null) {
            of.setWriteMode(writeMode);
        }

        return this.writeUsingOutputFormat(of);
    }

    @PublicEvolving
    public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
        DataStreamSink<T> returnStream = this.addSink(new SocketClientSink(hostName, port, schema, 0));
        returnStream.setParallelism(1);
        return returnStream;
    }

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
        return this.addSink(new OutputFormatSinkFunction(format));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
        return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperatorFactory<T, R> operatorFactory) {
        return this.doTransform(operatorName, outTypeInfo, operatorFactory);
    }

    protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
        this.transformation.getOutputType();
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operatorFactory, outTypeInfo, this.environment.getParallelism());
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);
        this.getExecutionEnvironment().addOperator(resultTransform);
        return returnStream;
    }

    protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
        return new DataStream(this.getExecutionEnvironment(), new PartitionTransformation(this.getTransformation(), partitioner));
    }

    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
        this.transformation.getOutputType();
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable)sinkFunction).setInputType(this.getType(), this.getExecutionConfig());
        }

        StreamSink<T> sinkOperator = new StreamSink((SinkFunction)this.clean(sinkFunction));
        DataStreamSink<T> sink = new DataStreamSink(this, sinkOperator);
        this.getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

    @Experimental
    public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
        this.transformation.getOutputType();
        return new DataStreamSink(this, sink);
    }

    public CloseableIterator<T> executeAndCollect() throws Exception {
        return this.executeAndCollect("DataStream Collect");
    }

    public CloseableIterator<T> executeAndCollect(String jobExecutionName) throws Exception {
        return this.executeAndCollectWithClient(jobExecutionName).iterator;
    }

    public List<T> executeAndCollect(int limit) throws Exception {
        return this.executeAndCollect("DataStream Collect", limit);
    }

    public List<T> executeAndCollect(String jobExecutionName, int limit) throws Exception {
        Preconditions.checkState(limit > 0, "Limit must be greater than 0");
        ClientAndIterator<T> clientAndIterator = this.executeAndCollectWithClient(jobExecutionName);
        Throwable var4 = null;

        try {
            ArrayList results;
            for(results = new ArrayList(limit); clientAndIterator.iterator.hasNext() && limit > 0; --limit) {
                results.add(clientAndIterator.iterator.next());
            }

            ArrayList var6 = results;
            return var6;
        } catch (Throwable var15) {
            var4 = var15;
            throw var15;
        } finally {
            if (clientAndIterator != null) {
                if (var4 != null) {
                    try {
                        clientAndIterator.close();
                    } catch (Throwable var14) {
                        var4.addSuppressed(var14);
                    }
                } else {
                    clientAndIterator.close();
                }
            }

        }
    }

    ClientAndIterator<T> executeAndCollectWithClient(String jobExecutionName) throws Exception {
        TypeSerializer<T> serializer = this.getType().createSerializer(this.getExecutionEnvironment().getConfig());
        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString();
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        CollectSinkOperatorFactory<T> factory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectSinkOperator<T> operator = (CollectSinkOperator)factory.getOperator();
        CollectResultIterator<T> iterator = new CollectResultIterator(operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
        CollectStreamSink<T> sink = new CollectStreamSink(this, factory);
        sink.name("Data stream collect sink");
        env.addOperator(sink.getTransformation());
        JobClient jobClient = env.executeAsync(jobExecutionName);
        iterator.setJobClient(jobClient);
        return new ClientAndIterator(jobClient, iterator);
    }

    @Internal
    public Transformation<T> getTransformation() {
        return this.transformation;
    }
}      

View Code

org.apache.flink.streaming.api.datastream.KeyedStream 針對集合的算子API:

Flink運作時架構
Flink運作時架構
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.datastream;

import java.util.ArrayList;
import java.util.Stack;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Preconditions;

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
    private final KeySelector<T, KEY> keySelector;
    private final TypeInformation<KEY> keyType;

    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
        this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(dataStream, new PartitionTransformation(dataStream.getTransformation(), new KeyGroupStreamPartitioner(keySelector, 128)), keySelector, keyType);
    }

    @Internal
    KeyedStream(DataStream<T> stream, PartitionTransformation<T> partitionTransformation, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        super(stream.getExecutionEnvironment(), partitionTransformation);
        this.keySelector = (KeySelector)this.clean(keySelector);
        this.keyType = this.validateKeyType(keyType);
    }

    private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> keyType) {
        Stack<TypeInformation<?>> stack = new Stack();
        stack.push(keyType);
        ArrayList unsupportedTypes = new ArrayList();

        while(true) {
            TypeInformation typeInfo;
            do {
                if (stack.isEmpty()) {
                    if (!unsupportedTypes.isEmpty()) {
                        throw new InvalidProgramException("Type " + keyType + " cannot be used as key. Contained UNSUPPORTED key types: " + StringUtils.join(unsupportedTypes, ", ") + ". Look at the keyBy() documentation for the conditions a type has to satisfy in order to be eligible for a key.");
                    }

                    return keyType;
                }

                typeInfo = (TypeInformation)stack.pop();
                if (!this.validateKeyTypeIsHashable(typeInfo)) {
                    unsupportedTypes.add(typeInfo);
                }
            } while(!(typeInfo instanceof TupleTypeInfoBase));

            for(int i = 0; i < typeInfo.getArity(); ++i) {
                stack.push(((TupleTypeInfoBase)typeInfo).getTypeAt(i));
            }
        }
    }

    private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
        try {
            return type instanceof PojoTypeInfo ? !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class) : !isArrayType(type) && !isEnumType(type);
        } catch (NoSuchMethodException var3) {
            return false;
        }
    }

    private static boolean isArrayType(TypeInformation<?> type) {
        return type instanceof PrimitiveArrayTypeInfo || type instanceof BasicArrayTypeInfo || type instanceof ObjectArrayTypeInfo;
    }

    private static boolean isEnumType(TypeInformation<?> type) {
        return type instanceof EnumTypeInfo;
    }

    @Internal
    public KeySelector<T, KEY> getKeySelector() {
        return this.keySelector;
    }

    @Internal
    public TypeInformation<KEY> getKeyType() {
        return this.keyType;
    }

    protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
        throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
    }

    protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
        SingleOutputStreamOperator<R> returnStream = super.doTransform(operatorName, outTypeInfo, operatorFactory);
        OneInputTransformation<T, R> transform = (OneInputTransformation)returnStream.getTransformation();
        transform.setStateKeySelector(this.keySelector);
        transform.setStateKeyType(this.keyType);
        return returnStream;
    }

    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
        DataStreamSink<T> result = super.addSink(sinkFunction);
        result.getTransformation().setStateKeySelector(this.keySelector);
        result.getTransformation().setStateKeyType(this.keyType);
        return result;
    }

    /** @deprecated */
    @Deprecated
    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
        TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(processFunction, ProcessFunction.class, 0, 1, TypeExtractor.NO_INDEX, this.getType(), Utils.getCallLocationName(), true);
        return this.process(processFunction, outType);
    }

    /** @deprecated */
    @Deprecated
    @Internal
    public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction, TypeInformation<R> outputType) {
        LegacyKeyedProcessOperator<KEY, T, R> operator = new LegacyKeyedProcessOperator((ProcessFunction)this.clean(processFunction));
        return this.transform("Process", outputType, operator);
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
        TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(keyedProcessFunction, KeyedProcessFunction.class, 1, 2, TypeExtractor.NO_INDEX, this.getType(), Utils.getCallLocationName(), true);
        return this.process(keyedProcessFunction, outType);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction, TypeInformation<R> outputType) {
        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator((KeyedProcessFunction)this.clean(keyedProcessFunction));
        return this.transform("KeyedProcess", outputType, operator);
    }

    @PublicEvolving
    public <T1> KeyedStream.IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
        return new KeyedStream.IntervalJoin(this, otherStream);
    }

    /** @deprecated */
    @Deprecated
    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
        return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.window(TumblingProcessingTimeWindows.of(size)) : this.window(TumblingEventTimeWindows.of(size));
    }

    /** @deprecated */
    @Deprecated
    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
        return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.window(SlidingProcessingTimeWindows.of(size, slide)) : this.window(SlidingEventTimeWindows.of(size, slide));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return this.window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
        return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
    }

    @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream(this, assigner);
    }

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
        ReduceTransformation<T, KEY> reduce = new ReduceTransformation("Keyed Reduce", this.environment.getParallelism(), this.transformation, (ReduceFunction)this.clean(reducer), this.keySelector, this.getKeyType());
        this.getExecutionEnvironment().addOperator(reduce);
        return new SingleOutputStreamOperator(this.getExecutionEnvironment(), reduce);
    }

    public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return this.aggregate(new SumAggregator(positionToSum, this.getType(), this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String field) {
        return this.aggregate(new SumAggregator(field, this.getType(), this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int positionToMin) {
        return this.aggregate(new ComparableAggregator(positionToMin, this.getType(), AggregationType.MIN, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String field) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationType.MIN, false, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int positionToMax) {
        return this.aggregate(new ComparableAggregator(positionToMax, this.getType(), AggregationType.MAX, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String field) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationType.MAX, false, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationType.MINBY, first, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationType.MAXBY, first, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
        return this.aggregate(new ComparableAggregator(positionToMinBy, this.getType(), AggregationType.MINBY, first, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
        return this.aggregate(new ComparableAggregator(positionToMaxBy, this.getType(), AggregationType.MAXBY, first, this.getExecutionConfig()));
    }

    protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        return this.reduce(aggregate).name("Keyed Aggregation");
    }

    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName) {
        ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor(UUID.randomUUID().toString(), this.getType());
        return this.asQueryableState(queryableStateName, valueStateDescriptor);
    }

    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName, ValueStateDescriptor<T> stateDescriptor) {
        this.transform("Queryable state: " + queryableStateName, this.getType(), new QueryableValueStateOperator(queryableStateName, stateDescriptor));
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        return new QueryableStateStream(queryableStateName, stateDescriptor, this.getKeyType().createSerializer(this.getExecutionConfig()));
    }

    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName, ReducingStateDescriptor<T> stateDescriptor) {
        this.transform("Queryable state: " + queryableStateName, this.getType(), new QueryableAppendingStateOperator(queryableStateName, stateDescriptor));
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        return new QueryableStateStream(queryableStateName, stateDescriptor, this.getKeyType().createSerializer(this.getExecutionConfig()));
    }

    @PublicEvolving
    public static class IntervalJoined<IN1, IN2, KEY> {
        private final KeyedStream<IN1, KEY> left;
        private final KeyedStream<IN2, KEY> right;
        private final long lowerBound;
        private final long upperBound;
        private final KeySelector<IN1, KEY> keySelector1;
        private final KeySelector<IN2, KEY> keySelector2;
        private boolean lowerBoundInclusive;
        private boolean upperBoundInclusive;

        public IntervalJoined(KeyedStream<IN1, KEY> left, KeyedStream<IN2, KEY> right, long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive) {
            this.left = (KeyedStream)Preconditions.checkNotNull(left);
            this.right = (KeyedStream)Preconditions.checkNotNull(right);
            this.lowerBound = lowerBound;
            this.upperBound = upperBound;
            this.lowerBoundInclusive = lowerBoundInclusive;
            this.upperBoundInclusive = upperBoundInclusive;
            this.keySelector1 = left.getKeySelector();
            this.keySelector2 = right.getKeySelector();
        }

        @PublicEvolving
        public KeyedStream.IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
            this.upperBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public KeyedStream.IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
            this.lowerBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
            Preconditions.checkNotNull(processJoinFunction);
            TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(processJoinFunction, ProcessJoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.left.getType(), this.right.getType(), Utils.getCallLocationName(), true);
            return this.process(processJoinFunction, outputType);
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(outputType);
            ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = (ProcessJoinFunction)this.left.getExecutionEnvironment().clean(processJoinFunction);
            IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator(this.lowerBound, this.upperBound, this.lowerBoundInclusive, this.upperBoundInclusive, this.left.getType().createSerializer(this.left.getExecutionConfig()), this.right.getType().createSerializer(this.right.getExecutionConfig()), cleanedUdf);
            return this.left.connect(this.right).keyBy(this.keySelector1, this.keySelector2).transform("Interval Join", outputType, operator);
        }
    }

    @PublicEvolving
    public static class IntervalJoin<T1, T2, KEY> {
        private final KeyedStream<T1, KEY> streamOne;
        private final KeyedStream<T2, KEY> streamTwo;
        private KeyedStream.IntervalJoin.TimeBehaviour timeBehaviour;

        IntervalJoin(KeyedStream<T1, KEY> streamOne, KeyedStream<T2, KEY> streamTwo) {
            this.timeBehaviour = KeyedStream.IntervalJoin.TimeBehaviour.EventTime;
            this.streamOne = (KeyedStream)Preconditions.checkNotNull(streamOne);
            this.streamTwo = (KeyedStream)Preconditions.checkNotNull(streamTwo);
        }

        public KeyedStream.IntervalJoin<T1, T2, KEY> inEventTime() {
            this.timeBehaviour = KeyedStream.IntervalJoin.TimeBehaviour.EventTime;
            return this;
        }

        public KeyedStream.IntervalJoin<T1, T2, KEY> inProcessingTime() {
            this.timeBehaviour = KeyedStream.IntervalJoin.TimeBehaviour.ProcessingTime;
            return this;
        }

        @PublicEvolving
        public KeyedStream.IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
            if (this.timeBehaviour != KeyedStream.IntervalJoin.TimeBehaviour.EventTime) {
                throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
            } else {
                Preconditions.checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
                Preconditions.checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
                return new KeyedStream.IntervalJoined(this.streamOne, this.streamTwo, lowerBound.toMilliseconds(), upperBound.toMilliseconds(), true, true);
            }
        }

        static enum TimeBehaviour {
            ProcessingTime,
            EventTime;

            private TimeBehaviour() {
            }
        }
    }
}      

View Code

比如之前的socket 例子:show plan 顯示幾個框可以了解為幾個任務(一個任務可能有多個子任務,子任務的數量可以了解為并行度),兩者為什麼會這麼合并在了解合并算子鍊(并行度相同的一對一算子會合并算子鍊)後就會明白。

(1) 并行度設定為2的時候show plan 計劃如下:

Flink運作時架構

 (2)并行度設定為1 的時候show plan 如下

Flink運作時架構

2. 并行度

1. 什麼是并行計算

  可以了解為,我們期望的是“資料并行”。也就是多條資料同時到來,我們可以同時讀入,并且在不同的節點進行flatMap 等操作。

2. 并行子任務和并行度

  為了實作并行操作,我們把一個算子操作,複制多分到多個節點,資料來了之後到其中任意一個執行。這樣一來,一個算子任務就被拆分成了多個并行的子任務,再将他們分發到不同節點,就實作了真正的并行計算。

  在Flink 執行過程中,每個算子(operator)可以包含一個或多個子任務,這些子任務在不同的線程、實體機或者容器中完全獨立地執行。

  一個特定算子的子任務的個數被=稱之為并行度(parallelism)。一個流程式的并行度可以認為是其所有算子中最大的并行度。一個程式中不同的算子可能具有不同的并行度。

如下圖:

Flink運作時架構

  目前資料流中有source、map、window、sink 四個算子,除最後sink外其他的算子的并行度都為2.整個程式包含7個子任務,至少需要兩個分區來執行,可以認為這段程式的并行度就是2。

3. 并行度設定

設定按照最近原則,最先設定的優先生效。

(1)代碼中設定

// 全局設定
executionEnvironment.setParallelism(3);

// 對單個算子設定
txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                }).setParallelism(3)      

 (2)送出時設定(webui也可以設定)

./flink-1.13.0/bin/flink run -c cn.qz.SocketStreamWordCount -p 2 ./study-flink-1.0-SNAPSHOT.jar      

(3) 在叢集的配置檔案 flink-conf.yaml 中直接更改預設并行度:

parallelism.default: 1      

  這些參數都不是必須的,會按照由近到遠的原則比對(單個設定<env<-p<預設)。需要注意,有的算子即使設定了并行度也不會生效,比如讀取socket 文本流的算子本身就不支援并行。在開發環境中,預設的并行度為目前機器的CPU核數(預設的任務槽的數量也是CPU核數)。

 4. 測試例子

  還是以socket 流為例子。

(1) 送出時選擇并行度為2, 檢視任務:

Flink運作時架構

   如上。 name 是每個算子的名稱,我們在源碼中可以看到為這些算子起的名稱。 後面有子任務的數量。

(2) 7777 端口輸入

[root@k8smaster01 conf]# nc -l 7777
hello china and beijing
what is your name?
my name is qz.      

(3) 檢視輸出任務的詳細資訊

Flink運作時架構

 檢視子任務資訊:

Flink運作時架構

 (4) 檢視兩個子任務所在機器的标準輸出: 可以看出輸出前面加的序号(可以了解為分區序号、任務插槽号)

第一個子任務所在機器輸出:

Flink運作時架構

 第二個機器:

Flink運作時架構

最大并行度就是一個任務最多能分到幾個資源(任務槽),任務會同時并行處理,可以了解為在不同的機器直接并行處理(至于每個機器并行幾個線程跑,後面任務槽進行研究,目前是每個機器一個任務槽)。

補充: 針對1、2 進行的測試

  比如如下程式:

package cn.qz;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SocketStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 建立執行環境(流處理執行環境)
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 讀取檔案
        DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("192.168.13.107", 7777);
        // 3. 轉換資料格式
        SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要顯示的聲明類型資訊
        // 4. 分組
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        // 6. 列印
        sum.print();
        System.out.println("========");
        // 7. 求最大
        SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = singleOutputStreamOperator.keyBy(t -> t.f0).max(1);
        tuple2SingleOutputStreamOperator.printToErr();
        // 8. 執行
        executionEnvironment.execute();
    }
}      

  debug 檢視其相關對象:可以看到預設的并行度和相關的轉換

Flink運作時架構

3. 算子鍊

  我們觀察webui 給出的計劃圖發現,節點數量和代碼中的算子不是一一對應的。 有的節點會把多個任務連接配接在一起合并成一個大任務。下面解釋其原因。

1.算子間的資料傳輸

一個資料流在算子之間傳輸資料的形式可以是一對一的直通模式(forwarding),也可以是打亂的重分區(redistributing)模式,具體是哪一種取決于算子的種類

Flink運作時架構

(1)一對一直通

  這種模式下,資料流維護着分區以及元素的順序。比如圖中的source和map 算子,source 讀取完之後可以直接發給map 做處理,不需要重新分區,也不需要調整資料的順序。這意味着map算子的子任務,看到的元素個數和順序跟source 算子的子任務産生的完全一樣,保證一對一的關系。map、filter、flatMap等算子都是這種一對一的關系。

(2)重分區

這種模式下,資料流的分區會發生改變。比如圖中的map和後面的keyBy/window/apply算子、以及keyBy/window算子和sink 算子之間。

  每一個算子的子任務會根據資料傳輸的政策,把資料發送到不同的下遊目标任務。例如:keyBy是分組操作,本質上是基于key進行hash後重分區;比如從并行度為2的window 算子傳遞到并行度為1 的sink,這時的資料傳輸方式是再平衡(rebalance),會把資料均勻的向下遊子任務分發出去。這些傳輸方式都會引起重分區(redistribute)。

2.合并算子鍊

  并行度相同的一對一算子操作,可以直接連接配接在一起形成一個大的任務(task),這樣原來算子就成了合并任務裡的一部分。每個任務被一個線程執行。這就是合并算子鍊。合并後如下圖:

Flink運作時架構

合并後就有五個任務,由五個線程并行執行。合并算子鍊可以減少線程之間的轉換,提升吞吐量。

Flink 預設按照算子鍊的原則進行合并,如果想禁止合并或者自定義,可以在代碼對算子做一些特定設定:

// 禁用算子鍊
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                }).disableChaining()
    
// 從目前算子開始新鍊
        SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                }).startNewChain()      

4. 作業圖與執行圖

Flink任務排程執行的圖,其按照順序分為四層:

邏輯流圖-》作業圖-》執行圖-》實體圖

比如以soclet 為例子,其轉換過程如下:

Flink運作時架構

 1. 邏輯流圖

  圖中的節點一般對應算子操作。用戶端完成的。

2. 作業圖

  資料流圖經過優化就是作業圖。主要的優化為将符合條件的節點連接配接在一起合并成一個任務節點,行成算子鍊。也是用戶端完成的,作業送出時傳遞給JobMaster。

3. 執行圖

  JobMaster 收到JobGraph後用它生成執行圖。執行圖是JobGraph的并行化版本,是排程處最核心的資料結構。和作業圖差別是對子任務進行了拆分,并明确任務之間傳遞資料的方式。

4. 實體圖

  JobMaster 生成執行圖後,将它分發給TaskManager。TasjkManager 根據執行圖部署任務,最終的實體執行過程行成實體圖。

  實體圖在執行圖的基礎上,進一步确定資料存放的位置和收發的具體方式。

5. 任務與任務槽

   在之前的測試中,我們三個taskManager的slots任務槽為3。送出任務cn.qz.SocketStreamWordCount 的時候選擇的并行度為2, 顯示的任務應該是有5個(1+2+2),但是卻占據了兩個任務槽,下面解釋其原因。

Flink運作時架構

 1.任務槽

flink中一個worker(taskmanager)是一個JVM程序,既然是程序就可以啟動多個獨立的線程來執行多個子任務(subtask)。

flink 中的多個獨立的執行任務的線程數量就是任務槽,預設為1,可以進行修改。修改 flink-conf.yaml,如下修改後每個節點變為4個槽,總共3個節點就是12個slot。

taskmanager.numberOfTaskSlots: 4      

需要注意的是,slot目前用來隔離記憶體,不涉及cpu的隔離。具體應用需要根據cpu 核心數進行調整。

2.任務對任務槽的共享

預設情況下,flink 允許子任務共享slot。是以2個子任務兩個slot(最大的子任務數量)就可以完成。

  不同任務節點的子任務可以共享一個slot, 換句話說同一個任務的多個子任務必須放置在不同的slot。比如并行度為2,可能的結果就是

Flink運作時架構

  到這裡可能有個疑問就是既然想要最大利用計算資源,為什麼又在一個任務槽并行處理多個任務了(一個線程幹多件事)?

  原因是: 不同的任務對資源占用不同,比如source、map 、sink可能處理時間極短,而window等轉換操作時間長(資源密集型任務)。如果每個任務一個slot,造成的現象就是上遊的source(等待下遊的window任務發通知而阻塞,相當于背壓)和下遊的sink可能長時間浪費,但是windows卻忙死,出現資源利用不平衡。于是出現了任務共享,将資源密集型和非密集型放到一個slot,這樣就可以自行配置設定對資源占用的比例。

如果想某個任務獨占一個slot,或者隻有某部分算子共享slot,可以設定共享組:隻有屬于一個slot組的子任務才會開啟共享slot,不同組之間的任務必須配置設定到不同的slot 數量。

txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                }).slotSharingGroup("1")      

3.任務槽和并行度的關系

  整個流處理程式的并行度,應該是所有算子中并行度最大的那個,也就是所需要的slot 數量(這種是不指定插槽組的情況)。