在之前的《Flink DataStream API》一文中,我們列舉了一些Flink自帶且常用的transformation算子,例如map、flatMap等。在Flink的程式設計體系中,我們擷取到資料源之後,需要經過一系列的處理即transformation操作,再将最終結果輸出到目的Sink(ES、mysql或者hdfs),使資料落地。是以,除了正确的繼承重寫RichSourceFunction<>和RichSinkFunction<>之外,最終要的就是實時處理這部分,下面的圖介紹了Flink代碼執行流程以及各子產品的組成部分。

在Storm中,我們常常用Bolt的層級關系來表示各個資料的流向關系,組成一個拓撲。在Flink中,Transformation算子就是将一個或多個DataStream轉換為新的DataStream,可以将多個轉換組合成複雜的資料流拓撲。如下圖所示,DataStream會由不同的Transformation操作,轉換、過濾、聚合成其他不同的流,進而完成我們的業務要求。
那麼以《Flink從kafka中讀資料存入Mysql Sink》一文中的業務場景作為基礎,在Flink讀取Kafka的資料之後,進行不同的算子操作來分别詳細介紹一下各個Transformation算子的用法。Flink消費的資料格式依然是JSON格式:{"city":"合肥","loginTime":"2019-04-17 19:04:32","os":"Mac OS","phoneName":"vivo"}
1、map
map:輸入一個元素,輸出一個元素,可以用來做一些清洗工作。
/**
* create by xiax.xpu on @Date 2019/4/11 20:47
*/
public class FlinkSubmitter {
public static void main(String[] args) throws Exception{
//擷取運作時環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
//為了能夠使用支援容錯的kafka Consumer,開啟checkpoint機制,支援容錯,儲存某個狀态資訊
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//kafka配置檔案
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.83.129:9092");
props.setProperty("group.id","con1");
props.put("zookeeper.connect","192.168.83.129:2181");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value 反序列化
System.out.println("ready to print");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"kafka_flink_mysql",
new SimpleStringSchema(),
props);
consumer.setStartFromGroupOffsets();//預設消費政策
SingleOutputStreamOperator<Entity> StreamRecord = env.addSource(consumer)
.map(string -> JSON.parseObject(string, Entity.class))
.setParallelism(1);
//融合一些transformation算子進來
//map:輸入一個元素,輸出一個元素,可以用來做一些清洗工作
SingleOutputStreamOperator<Entity> result = StreamRecord.map(new MapFunction<Entity, Entity>() {
@Override
public Entity map(Entity value) throws Exception {
Entity entity1 = new Entity();
entity1.city = value.city+".XPU.Xiax";
entity1.phoneName = value.phoneName.toUpperCase();
entity1.loginTime = value.loginTime;
entity1.os = value.os;
return entity1;
}
});
result.print().setParallelism(1);
env.execute("new one");
}
}
本例中我們将擷取的JSON字元串轉換到Entity object之後,使用map算子讓所有的phoneName程式設計大寫,city後面添加XPU.Xiax字尾。
2、flatMap
flatMap:打平操作,我們可以了解為将輸入的元素壓平,進而對輸出結果的數量不做要求,可以為0、1或者多個都OK。它和Map相似,但引入flatMap的原因是因為一般java方法的傳回值結果都是一個,是以引入flatMap來差別這個。
//flatMap, 輸入一個元素,傳回0個、1個或者多個元素
SingleOutputStreamOperator<Entity> result = StreamRecord
.flatMap(new FlatMapFunction<Entity, Entity>() {
@Override
public void flatMap(Entity entity, Collector<Entity> out) throws Exception {
if (entity.city.equals("北京")) {
out.collect(entity);
}
}
});
這裡我們将所有city是北京的結果集聚合輸出,注意這裡并不是過濾,有些人可能會困惑這不是起了過濾filter的作用嗎,其實不然,隻是這裡的用法剛好相似而已。簡單分析一下,new FlatMapFunction<Entity, Entity>,接收的輸入是Entity實體,發出的也是Entity實體類,看到這就可以與Map對應上了。
3、filter
filter:過濾篩選,将所有符合判斷條件的結果集輸出
//filter 判斷條件輸出
SingleOutputStreamOperator<Entity> result = StreamRecord
.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity entity) throws Exception {
if (entity.phoneName.equals("HUAWEI")) {
return true;
}
return false;
}
});
這裡我們将所有phoneName是HUAWEI的值過濾,在直接輸出。
4、keyBy
keyBy:在邏輯上将Stream根據指定的Key進行分區,是根據key的Hash值進行分區的。
//keyBy 從邏輯上對邏輯分區
KeyedStream<Entity, String> result = StreamRecord
.keyBy(new KeySelector<Entity, String>() {
@Override
public String getKey(Entity entity) throws Exception {
return entity.os;
}
});
這裡隻是對DataStream進行分區而已,按照os進行分區,然而這輸出的效果其實沒什麼變化
由于下面這些操作,在之前模拟生成的資料,去做轉換操作不太适合。是以每個操作附上其他demo
5、reduce
reduce:屬于歸并操作,它能将3的keyedStream轉換成DataStream,Reduce 傳回單個的結果值,并且 reduce 操作每處理每一天新資料時總是建立一個新值。常用聚合操作例如min、max等都可使用reduce方法實作。這裡通過實作一個Socket的wordCount簡單例子,來幫助了解flatMap/keyBy/reduce/window等操作的過程。
package com.bigdata.flink.Stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 滑動視窗的計算
*
* 通過socket模拟産生單詞資料 flink對其進行統計計數
* 實作時間視窗:
* 每隔1秒統計前兩秒的資料
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception{
//定義端口号,通過cli接收
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch(Exception e){
System.err.println("No port Set, use default port---java");
port = 9000;
}
//擷取運作時環境,必須要
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//綁定Source,通過master的nc -l 900 産生單詞
String hostname = "192.168.83.129";
String delimiter = "\n";
//連接配接socket 綁定資料源
DataStreamSource<String> socketWord = env.socketTextStream(hostname, port, delimiter);
DataStream<WordWithCount> windowcounts = socketWord.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1));
}
}
}).keyBy("word")
//.sum("count");//這裡求聚合 可以用reduce和sum兩種方式
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowcounts.print().setParallelism(1);
env.execute("socketWindow");
}
public static class WordWithCount{
public String word;
public int count;
//無參的構造函數
public WordWithCount(){
}
//有參的構造函數
public WordWithCount(String word, int count){
this.count = count;
this.word = word;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
這裡隻是做單詞計數,至于為什麼有的單詞重複出現,但是請注意它後面的count值都不一樣,我們直接生成了toString方法列印出的結果。
6、aggregations
aggregations:進行一些聚合操作,例如sum(),min(),max()等,這些可以用于keyedStream進而獲得聚合。用法如下
KeyedStream.sum(0)或者KeyedStream.sum(“Key”)
7、unoin
union:可以将多個流合并到一個流中,以便對合并的流進行統一處理,有點類似于Storm中的将上一級的兩個Bolt資料彙聚到這一級同一個Bolt中。注意,合并的流類型需要一緻
//1.擷取執行環境配置資訊
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.定義加載或建立資料源(source),監聽9000端口的socket消息
DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
DataStream<String> textStream9002 = env.socketTextStream("localhost", 9002, "\n");
DataStream<String> mapStream9000=textStream9000.map(s->"來自9000端口:"+s);
DataStream<String> mapStream9001=textStream9001.map(s->"來自9001端口:"+s);
DataStream<String> mapStream9002=textStream9002.map(s->"來自9002端口:"+s);
//3.union用來合并兩個或者多個流的資料,統一到一個流中
DataStream<String> result = mapStream9000.union(mapStream9001,mapStream9002);
//4.列印輸出sink
result.print();
//5.開始執行
env.execute();
8、connect
connect:和union類似,但是隻能連接配接兩個流,兩個流的資料類型可以不同,會對兩個流中的資料應用不同的處理方法。
//擷取Flink運作環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//綁定資料源
DataStreamSource<Long> text1 = env.addSource(new MyParalleSource()).setParallelism(1);
DataStreamSource<Long> text2 = env.addSource(new MyParalleSource()).setParallelism(1);
//為了示範connect的不同,将第二個source的值轉換為string
SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "str" + value;
}
});
ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
return value;
}
@Override
public Object map2(String value) throws Exception {
return value;
}
});
//列印到控制台,并行度為1
result.print().setParallelism(1);
env.execute( "StreamingDemoWithMyNoParalleSource");
9、split
split:根據規則吧一個資料流切分成多個流,可能在實際場景中,源資料流中混合了多種類似的資料,多種類型的資料處理規則不一樣,是以就可以根據一定的規則把一個資料流切分成多個資料流。
//擷取Flink運作環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//綁定資料源
DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(1);
//對流進行切分 奇數偶數進行區分
SplitStream<Long> splitString = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> output = new ArrayList<>();
if (value % 2 == 0) {
output.add("even");//偶數
} else {
output.add("odd");//奇數
}
return output;
}
});
//選擇一個或者多個切分後的流
DataStream<Long> evenStream = splitString.select("even");//選擇偶數
DataStream<Long> oddStream = splitString.select("odd");//選擇奇數
DataStream<Long> moreStream = splitString.select("odd","even");//選擇多個流
//列印到控制台,并行度為1
evenStream.print().setParallelism(1);
env.execute( "StreamingDemoWithMyNoParalleSource");
10、window以及windowAll
window:按時間進行聚合或者其他條件對KeyedStream進行分組,用法:inputStream.keyBy(0).window(Time.seconds(10));
windowAll: 函數允許對正常資料流進行分組。通常,這是非并行資料轉換,因為它在非分區資料流上運作。用法:inputStream.keyBy(0).windowAll(Time.seconds(10));
關于時間視窗,這個我們後期會詳細說一下,敬請關注。