天天看點

Spark Streaming入門 - foreachRDD算子使用 - 用spark sql的方式來操作資料,單詞計數(第一版)

1 用nc工具發送消息

2 核心代碼如下

package cn.taobao;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.ArrayList;
import java.util.Arrays;

public class Save_2 {
    public static void main(String[] args) throws Exception {

        // StreamingContext 程式設計入口
        JavaStreamingContext ssc = new JavaStreamingContext(
                "local[*]",
                "Save_1",
                Durations.seconds(5),
                System.getenv("SPARK_HOME"),
                JavaStreamingContext.jarOfClass(Save_2.class.getClass()));

        ssc.sparkContext().setLogLevel("ERROR");

        //資料接收器(Receiver)
        //建立一個接收器(JavaReceiverInputDStream),這個接收器接收一台機器上的某個端口通過socket發送過來的資料并處理
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
                "158.158.4.49", 9998, StorageLevels.MEMORY_AND_DISK_SER);

        /*
        假如輸入 aa bb cc aa
         */

        /*
        傳回 aa
            bb
            cc
            aa
         */
        JavaDStream<String> stringJavaDStream = lines.flatMap(xx -> {
            String[] str_split = xx.split(" ");
            return Arrays.asList(str_split).iterator();
        });
        
        //用SPARK SQL來進行WORD COUNT
        stringJavaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> stringJavaRDD) throws Exception {
                /*
                和普通的spark sql代碼差别不大
                 */
                //建立sparksession
                SparkSession sparkSession = SparkSession.builder().config(stringJavaRDD.context().getConf()).getOrCreate();
                /*
                為 建立 createDataFrame 做準備
                 */
                //建構資料
                JavaRDD<Row> javaRddRow = stringJavaRDD.map(xx -> {
                    return RowFactory.create(xx);
                });
                //建構schema
                ArrayList<StructField> structFields = new ArrayList<>();
                structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
                StructType structType = DataTypes.createStructType(structFields);

                Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, structType);

                dataFrame.createOrReplaceTempView("words");

                Dataset<Row> resultOut = sparkSession.sql("select name,count(*) as nameCount from words group by name");
                resultOut.show();
            }
        });

        //顯式的啟動資料接收
        ssc.start();
        try {
            //來等待計算完成
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}
           

3 效果示範

輸入并回車
aa aa aa bb
再輸入并回車
bb bb

結果如下
+----+---------+
|name|nameCount|
+----+---------+
|  aa|        3|
|  bb|        1|
+----+---------+

+----+---------+
|name|nameCount|
+----+---------+
|  bb|        2|
+----+---------+
           

繼續閱讀