天天看點

flink學習筆記java,從socket中擷取資料(三)

1、建立類StreamWordForTool

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author ***
 * @email ***@***.com.cn
 * @Date 2021/5/21 11:09
 * @Version 1.0
 */
public class StreamWordForTool {
    public static void main(String[] args) throws Exception{
//        1、建立流處理執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        **設定并行度(線程)**
        env.setParallelism(8);
//    2、從檔案讀取資料
//        String inputPuth = "F:\\project\\learnflink\\src\\main\\resources\\test.txt";
//        DataStreamSource<String> inputDataStream = env.readTextFile(inputPuth);
        //從socket文本流讀取資料
        DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);

//       3、基于資料流進行轉換計算
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
                .keyBy(0)
                .sum(1);
//      4、輸出
        resultStream.print();
//       5、執行任務
        env.execute();
    }
}
           

2、打開電腦的linux子系統,cmd==>bash==>nc -lk 7777,執行任務,指令行内輸入資料如下:

flink學習筆記java,從socket中擷取資料(三)

注:安裝子系統可參考:子系統安裝教程

3、測試提取參數,位址和端口改為配置型

//    2、從檔案讀取資料
//        String inputPuth = "F:\\project\\learnflink\\src\\main\\resources\\test.txt";
//        DataStreamSource<String> inputDataStream = env.readTextFile(inputPuth);
//       用parameter tool工具從程式啟動參數中提取配置項
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host =parameterTool.get("hosts");
        Integer port =parameterTool.getInt("ports");
        DataStream<String> inputDataStream = env.socketTextStream(host, port);
        //從socket文本流讀取資料
//        DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);
           

運作時需要使用Edit configurations配置傳的參數,如圖:

flink學習筆記java,從socket中擷取資料(三)

未完待續…