天天看点

Spark dstream转structured streaming

上篇讲述不同topic之间join: 链接,

很多聚合操作如group by,不如SparkSql灵活.

所以想将join后topic转变成DataSet格式.

发现官网有现成demo例子

words.foreachRDD((rdd, time) -> {
      SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

      // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
      JavaRDD<JavaRecord> rowRDD = rdd.map(word -> {
        JavaRecord record = new JavaRecord();
        record.setWord(word);
        return record;
      });
      Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words");

      // Do word count on table using SQL and print it
      Dataset<Row> wordCountsDataFrame =
          spark.sql("select word, count(*) as total from words group by word");
      System.out.println("========= " + time + "=========");
      wordCountsDataFrame.show();
    });
           

在每个RDD中创建一个DataFrame,这里面使用了同一个SparkSession

class JavaSparkSessionSingleton {
  private static transient SparkSession instance = null;
  public static SparkSession getInstance(SparkConf sparkConf) {
    if (instance == null) {
      instance = SparkSession
        .builder()
        .config(sparkConf)
        .getOrCreate();
    }
    return instance;
  }
}
           

所以中间改造为

JavaDStream<CheckDown> joinResult = checkResult.join(downResult).map(
				s -> {
					CheckDown checkDown = new CheckDown();
					Long productId = Long.parseLong(s._1._2().toString());
					String deviceId = s._1._3().toString();
					String lac = s._2()._1()._1().toString();
					String cid = s._2()._1()._2().toString();
					Float downRate = s._2()._2();
					checkDown.setProductId(productId);
					checkDown.setDeviceId(deviceId);
					checkDown.setLac(lac);
					checkDown.setCid(cid);
					checkDown.setDownRate(downRate);
					return checkDown;

				}
		);
           

将topic(check)结果join topic(download) ,映射CheckDown类.

package com.adups.online.streaming.bean.input;
import java.io.Serializable;

/**
 * @author allen
 * Created on 05/12/2017.
 */
public class CheckDown implements Serializable {
	private Long productId;
	private String deviceId;
	private String lac;
	private String cid;
	private Float downRate;
	//get,set method
	{..}
}

           

直接建dataframe,可以方便使用SparkSql里面的函数

import static org.apache.spark.sql.functions.*;

....

	joinResult.foreachRDD((rdd, time) -> {
			SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
			Dataset<Row> checkOrDown = spark.createDataFrame(rdd, CheckDown.class);
			checkOrDown.printSchema();
			String rateTime = DateUtil.getMinuteTimeYmd();
			Dataset<Row> downAvg = checkOrDown.groupBy("productId", "lac", "cid")
					.agg(round(functions.sum("downRate"), 4).as("downRateSum"),functions.count("deviceId").as("countNum"),
							round(functions.sum("downRate").as("downAllRate").divide(functions.count("deviceId").as("countNum")), 4).as("downRate"));
			Dataset<Row> result = downAvg.withColumn("createTime",functions.date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss")).withColumn("rateTime", lit(rateTime));
			result.show();

			
			Dataset<Row> downRates = checkOrDown.groupBy("lac","cid").agg(round(functions.sum("downRate").as("downAllRate")
				.divide(functions.count("deviceId").as("countNum")), 4).as("downRate"));
			downRates.show();

		});

		return jssc;
	}

           

JavaSparkSessionSingleton方法

public class JavaSparkSessionSingleton {

	private static transient SparkSession instance = null;

	public static SparkSession getInstance(SparkConf sparkConf) {
		if (instance == null) {
			instance = SparkSession.builder().config(sparkConf).getOrCreate();
		}
		return instance;
	}
}

           

继续阅读