1、建立類StreamWordForTool
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author ***
* @email ***@***.com.cn
* @Date 2021/5/21 11:09
* @Version 1.0
*/
public class StreamWordForTool {
public static void main(String[] args) throws Exception{
// 1、建立流處理執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// **設定并行度(線程)**
env.setParallelism(8);
// 2、從檔案讀取資料
// String inputPuth = "F:\\project\\learnflink\\src\\main\\resources\\test.txt";
// DataStreamSource<String> inputDataStream = env.readTextFile(inputPuth);
//從socket文本流讀取資料
DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);
// 3、基于資料流進行轉換計算
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(0)
.sum(1);
// 4、輸出
resultStream.print();
// 5、執行任務
env.execute();
}
}
2、打開電腦的linux子系統,cmd==>bash==>nc -lk 7777,執行任務,指令行内輸入資料如下:

注:安裝子系統可參考:子系統安裝教程
3、測試提取參數,位址和端口改為配置型
// 2、從檔案讀取資料
// String inputPuth = "F:\\project\\learnflink\\src\\main\\resources\\test.txt";
// DataStreamSource<String> inputDataStream = env.readTextFile(inputPuth);
// 用parameter tool工具從程式啟動參數中提取配置項
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host =parameterTool.get("hosts");
Integer port =parameterTool.getInt("ports");
DataStream<String> inputDataStream = env.socketTextStream(host, port);
//從socket文本流讀取資料
// DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);
運作時需要使用Edit configurations配置傳的參數,如圖:
未完待續…