将Flink中的批處理的WordCount轉化為流處理的WordCount
目的:将Flink中批處理的WordCount轉化為流處理的WordCount
作用:感覺毫無用處
如何實作:将批的environmentBatch中的各個算子,在流的environmentStream中重寫一遍
代碼如下:
package org.apache.flink.examples.java.maqy;
/**
* 實作Flink中Batch的WordCount到流的WordCount的轉換
* 注意:流的WordCount相同的邏輯,每到來一個新元素都會進行一次輸出,是以輸出結果會不同
*
* Flink版本:1.4.2
* @author maqy
* @date 2018.08.11
*/
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.AggregationFunction;
import org.apache.flink.api.java.aggregation.SumAggregationFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
public class BatchToStream {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// get input data
// DataSet<String> text = env.fromElements(
// "To be, or not to be,--that is the question:--",
// "Whether 'tis nobler in the mind to suffer",
// "The slings and arrows of outrageous fortune",
// "Or to take arms against a sea of troubles,"
// );
//輸入檔案
DataSet<String> a = env.readTextFile("F:\\test.txt");
DataSet<Tuple2<String, Integer>> b = a.flatMap(new LineSplitter());
//DataSet<Tuple2<String, Integer>> d = b.sum(1);
DataSet<Tuple2<String, Integer>> c = b.groupBy(0)
.sum(1);
//sink必須單獨寫????,放在上一行後頭會報錯,原因是因為傳回的是datasink類型
c.writeAsText("F:\\output\\batchToStream");
// DataSet<Tuple2<String, Integer>> counts = env.readTextFile("/home/maqy/桌面/out/test")
// // split up the lines in pairs (2-tuples) containing: (word,1)
// .flatMap(new LineSplitter())
// // group by the tuple field "0" and sum up tuple field "1"
// .groupBy(0)
// .sum(1);
//
// // execute and print result
// counts.writeAsText("/home/maqy/桌面/out/out1");
StreamExecutionEnvironment envStream = batchToStream(env);
//執行程式的是流的Environment
//env.execute("batch job~~~~~~~~~~~~~~");
envStream.execute("StreamJob~~~~~~~~~~~~~");
}
//實作批的環境到流的環境的轉換,傳入envBatch,傳回StreamExecutionEnvironment
public static StreamExecutionEnvironment batchToStream(ExecutionEnvironment envBatch) throws Exception {
//建立一個新的流環境,用于傳回的
StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment();
//設定并行度隻能在這裡設定,不然沒用
envStream.setParallelism(1);
//這裡考慮下用DataSet 還是用 Environment,得到環境中的sinks
List<DataSink<?>> batchSinks = envBatch.getSinks();
for (DataSink dataSink : batchSinks) {
//先定義一個資料流
DataStream first = null;
//對每個sink進行操作,找到源頭?
DataSet dataSetLast = dataSink.getDataSet();
//Operator繼承了DataSet
DataSet p = dataSetLast;
//不這麼寫,first會為null,初步判斷是因為first在datasource時建立的時候,重新定向到新的位址了
first = preVisit(p, envStream, first);
//轉換sink
OutputFormat dataSinkOutputFormat = dataSink.getFormat();
if (dataSinkOutputFormat instanceof TextOutputFormat) {
System.out.println("dataSinkOutputFormat is a TextOutputFormat");
Path path = ((TextOutputFormat) dataSinkOutputFormat).getOutputFilePath();
first.writeUsingOutputFormat(new TextOutputFormat(path));
}
//first.addSink(dataSink.)
}
//System.out.println("size:"+batchSinks.size());
//傳回流環境,後期還可以考慮是否可以合并到原本存在的流環境中,甚至将各個datastream也加以傳回,重新利用
return envStream;
}
//從尾向前周遊,并轉化
public static DataStream preVisit(DataSet dataSet, StreamExecutionEnvironment envStream, DataStream first) {
if (!(dataSet instanceof DataSource)) { // && (dataSet != null)
//如果沒有到DataSource節點,則遞歸
first = preVisit(getPre(dataSet), envStream, first);
}
//對節點進行相應的操作
if (dataSet == null) {
System.out.println("source is null");
} else if (dataSet instanceof DataSource) {
//得到源頭後,看源屬于哪一種類型,然後添加到流中
//而且從dataSource可以得到輸出的資料類型
//這裡可以得到輸入的資料的類型,但還不知道怎麼用到DataStream中
// TypeInformation sourceTypeInfo = ((DataSource) dataSet).getResultType();
// Class sourceType = sourceTypeInfo.getTypeClass();
//System.out.println("sourceType:"+sourceTypeInfo.getTypeClass());
InputFormat inputFormat = ((DataSource) dataSet).getInputFormat();
if (inputFormat instanceof TextInputFormat) {
//後期可以考慮是否可以直接轉換算子
System.out.println("inputFormat is TextInputFormat");
String filePath = ((TextInputFormat) inputFormat).getFilePath().toString();
System.out.println("輸入的檔案路徑為:" + filePath);
//這裡相當于讓first重新指向一個新位址了????
first = envStream.readTextFile(filePath);
}
} else if (dataSet instanceof SingleInputOperator) {
System.out.println("SingleInputOperator yes");
//如果是SingleInputOperator,再判斷具體類型,SingleInputOperator中有DataSet 類型的 input。
//((SingleInputOperator) dataSet).getInput();
if (dataSet instanceof AggregateOperator) {
System.out.println("AggregateOperator yes");
//可以有多個aggregationFunctions,還有個對應的List<Integer> fields,預設好像是4
List<AggregationFunction<?>> aggregationFunctions = ((AggregateOperator) dataSet).getAggregationFunctions();
List<Integer> fields = ((AggregateOperator) dataSet).getFields();
//首先要得到是否被groupBy過了,即是否可以得到UnsortedGrouping類型,得不到則是null
Grouping grouping=((AggregateOperator) dataSet).getGrouping();
//如果grouping不是null的話,則說明經過了groupBy,則進行相應的轉換
if(grouping != null){
int position = 0; //暫時隻考慮一個的情況
if(grouping instanceof SortedGrouping){
System.out.println("SortedGrouping yes");
}else if(grouping instanceof UnsortedGrouping){
System.out.println("UnsortedGrouping yes");
//Keys中有keyFields和originalKeyTypes,這裡的後者是String
Keys keys=grouping.getKeys();
if(keys instanceof Keys.ExpressionKeys){
System.out.println("Keys.ExpressionKeys yes");
//這裡還沒弄清楚有多個時的意思
int numOfKeyFields = keys.getNumberOfKeyFields();
int[] positions = keys.computeLogicalKeyPositions();
if(numOfKeyFields == 1){
position = positions[0];
}
//這裡還友善了我,在流中不用考慮UnsortedGrouping這種東西
first = first.keyBy(position);
}else if(keys instanceof Keys.SelectorFunctionKeys){
System.out.println("Keys.SelectorFunctionKeys yes");
}
}
}
//先考慮數目為1的情況,因為需要先keyby再sum
if(aggregationFunctions.size()==1 && fields.size()==1){
if(aggregationFunctions.get(0) instanceof SumAggregationFunction){
if(first instanceof KeyedStream){
first = ((KeyedStream) first).sum(fields.get(0));
}else{
System.out.println("Stream中sum的話一定要keyby麼,似乎是。。。");
}
}
}
//AggregateOperator aggregateOperator = (AggregateOperator) dataSet;
} else if (dataSet instanceof SingleInputUdfOperator) {
System.out.println("SingleInputUdfOperator yes");
if (dataSet instanceof FlatMapOperator){
System.out.println("FlatMapOperator yes");
FlatMapFunction flatMapFunction= ((FlatMapOperator) dataSet).getFlatMapFunction();
first = first.flatMap(flatMapFunction);
}
} else {
System.out.println("Not sure what SingleInputOperator");
}
} else if (dataSet instanceof TwoInputOperator) {
System.out.println("TwoInputOperator yes");
} else {
System.out.println("not sure what Operator");
}
return first;
}
//得到一個前驅
public static DataSet getPre(DataSet dataSet) {
if (dataSet instanceof Operator) {
System.out.println("Operator yes");
if (dataSet instanceof DataSource) {
System.out.println("DataSource yes");
return (DataSource) dataSet;
} else if (dataSet instanceof SingleInputOperator) {
System.out.println("SingleInputOperator yes");
//如果是SingleInputOperator,再判斷具體類型,SingleInputOperator中有DataSet 類型的 input。
return ((SingleInputOperator) dataSet).getInput();
} else if (dataSet instanceof TwoInputOperator) {
System.out.println("TwoInputOperator yes");
/
} else {
System.out.println("not sure what Operator");
/
}
} else {
System.out.println("no Operator");
/
}
return null;
}
//
// User Functions
//
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1 ));
}
}
}
}
}
輸入文本為:
a b c d a a b
a a a
輸出(可以看到每來一個新單詞都會進行一次輸出):
(a,1)
(b,1)
(c,1)
(d,1)
(a,2)
(a,3)
(b,2)
(a,4)
(a,5)
(a,6)
正常的批處理的輸出:
(a,6)
(b,2)
(c,1)
(d,1)