文章目錄
- 一.多流轉換算子概述
-
- 1.1 Split和Select
- 1.2 Connect和CoMap
- 1.3 Union
- 二.代碼實作
- 參考:
一.多流轉換算子概述
多流轉換算子一般包括:
Split和Select (新版已經移除)
Connect和CoMap
Union
1.1 Split和Select
注:新版Flink已經不存在Split和Select這兩個API了(至少Flink1.12.1沒有!)
Split
DataStream -> SplitStream:根據某些特征把DataStream拆分成SplitStream;
SplitStream雖然看起來像是兩個Stream,但是其實它是一個特殊的Stream;
Select
SplitStream -> DataStream:從一個SplitStream中擷取一個或者多個DataStream;
我們可以結合split&select将一個DataStream拆分成多個DataStream。
1.2 Connect和CoMap
Connect
DataStream,DataStream -> ConnectedStreams: 連接配接兩個保持他們類型的資料流,兩個資料流被Connect 之後,隻是被放在了一個流中,内部依然保持各自的資料和形式不發生任何變化,兩個流互相獨立。
CoMap
ConnectedStreams -> DataStream: 作用于ConnectedStreams 上,功能與map和flatMap一樣,對ConnectedStreams 中的每一個Stream分别進行map和flatMap操作;
1.3 Union
DataStream -> DataStream:對兩個或者兩個以上的DataStream進行Union操作,産生一個包含多有DataStream元素的新DataStream。
問題:和Connect的差別?
- Connect 的資料類型可以不同,Connect 隻能合并兩個流;
- Union可以合并多條流,Union的資料結構必須是一樣的;
二.代碼實作
資料準備:
sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
代碼:
package org.flink.transform;
/**
* @author 隻是甲
* @date 2021-08-31
* @remark Flink 基礎Transform MultipleStreams
*/
import org.flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.Collections;
public class TransformTest4_MultipleStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 從檔案讀取資料
DataStream<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkStudy\\src\\main\\resources\\sensor.txt");
// 轉換成SensorReading
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
} );
// 1. 分流,按照溫度值30度為界分為兩條流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
// 2. 合流 connect,将高溫流轉換成二進制組類型,與低溫流連接配接合并之後,輸出狀态資訊
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
resultStream.print();
// 3. union聯合多條流
// warningStream.union(lowTempStream);
highTempStream.union(lowTempStream, allTempStream);
env.execute();
}
}
測試記錄:
參考:
- https://www.bilibili.com/video/BV1qy4y1q728
- https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae