天天看點

Flink基礎系列17-Tranform之多流轉換算子一.多流轉換算子概述二.代碼實作參考:

文章目錄

  • 一.多流轉換算子概述
    • 1.1 Split和Select
    • 1.2 Connect和CoMap
    • 1.3 Union
  • 二.代碼實作
  • 參考:

一.多流轉換算子概述

多流轉換算子一般包括:

Split和Select (新版已經移除)

Connect和CoMap

Union

1.1 Split和Select

注:新版Flink已經不存在Split和Select這兩個API了(至少Flink1.12.1沒有!)

Split

Flink基礎系列17-Tranform之多流轉換算子一.多流轉換算子概述二.代碼實作參考:

DataStream -> SplitStream:根據某些特征把DataStream拆分成SplitStream;

SplitStream雖然看起來像是兩個Stream,但是其實它是一個特殊的Stream;

Select

Flink基礎系列17-Tranform之多流轉換算子一.多流轉換算子概述二.代碼實作參考:

SplitStream -> DataStream:從一個SplitStream中擷取一個或者多個DataStream;

我們可以結合split&select将一個DataStream拆分成多個DataStream。

1.2 Connect和CoMap

Connect

Flink基礎系列17-Tranform之多流轉換算子一.多流轉換算子概述二.代碼實作參考:

DataStream,DataStream -> ConnectedStreams: 連接配接兩個保持他們類型的資料流,兩個資料流被Connect 之後,隻是被放在了一個流中,内部依然保持各自的資料和形式不發生任何變化,兩個流互相獨立。

CoMap

Flink基礎系列17-Tranform之多流轉換算子一.多流轉換算子概述二.代碼實作參考:

ConnectedStreams -> DataStream: 作用于ConnectedStreams 上,功能與map和flatMap一樣,對ConnectedStreams 中的每一個Stream分别進行map和flatMap操作;

1.3 Union

Flink基礎系列17-Tranform之多流轉換算子一.多流轉換算子概述二.代碼實作參考:

DataStream -> DataStream:對兩個或者兩個以上的DataStream進行Union操作,産生一個包含多有DataStream元素的新DataStream。

問題:和Connect的差別?

  1. Connect 的資料類型可以不同,Connect 隻能合并兩個流;
  2. Union可以合并多條流,Union的資料結構必須是一樣的;

二.代碼實作

資料準備:

sensor.txt

sensor_1,1547718199,35.8

sensor_6,1547718201,15.4

sensor_7,1547718202,6.7

sensor_10,1547718205,38.1

sensor_1,1547718207,36.3

sensor_1,1547718209,32.8

sensor_1,1547718212,37.1

代碼:

package org.flink.transform;

/**
 * @author 隻是甲
 * @date   2021-08-31
 * @remark Flink 基礎Transform  MultipleStreams
 */

import org.flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

public class TransformTest4_MultipleStreams {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 從檔案讀取資料
        DataStream<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkStudy\\src\\main\\resources\\sensor.txt");

        // 轉換成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        } );

        // 1. 分流,按照溫度值30度為界分為兩條流
        SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });

        DataStream<SensorReading> highTempStream = splitStream.select("high");
        DataStream<SensorReading> lowTempStream = splitStream.select("low");
        DataStream<SensorReading> allTempStream = splitStream.select("high", "low");

        highTempStream.print("high");
        lowTempStream.print("low");
        allTempStream.print("all");

        // 2. 合流 connect,将高溫流轉換成二進制組類型,與低溫流連接配接合并之後,輸出狀态資訊
        DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), value.getTemperature());
            }
        });

        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);

        DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "high temp warning");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), "normal");
            }
        });

        resultStream.print();

        // 3. union聯合多條流
//        warningStream.union(lowTempStream);
        highTempStream.union(lowTempStream, allTempStream);

        env.execute();
    }
}

           

測試記錄:

Flink基礎系列17-Tranform之多流轉換算子一.多流轉換算子概述二.代碼實作參考:

參考:

  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae

繼續閱讀