天天看点

基于spark的Scala编程—DataFrame操作之分组聚合

本文主要是写关于Scala如何操作spark的DataFrame,本文先介绍分组聚合的用法,详细请看下面的步骤,以下所有的代码都是在IntelliJ Idea里面编写并且远程调试的。

先创建sparksession对象,代码如下:

val conf = new SparkConf().setAppName("LzSparkDatasetExamples").setMaster("local")
    val sparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
           

创建一个DataFrame对象,代码如下:

// 这里一定要加这一行导入,否则toDF会报错
    import sparkSession.implicits._
    val df = sparkSession.createDataset(Seq(
      ("aaa", 1, 2),
      ("bbb", 3, 4),
      ("ccc", 3, 5),
      ("bbb", 4, 6)
    )).toDF("key1", "key2", "key3")
           

分组聚合相关操作代码如下:

// 常见聚合函数
    import org.apache.spark.sql.functions._
    LOGGER.info("--------df.groupBy(\"key1\").count().show()-----------")
    df.groupBy("key1").count().show()
    LOGGER.info("--------df.select(\"key1\").distinct().show()-----------")
    df.select("key1").distinct().show()
    val key1Count = df.select("key1").distinct().count()
    LOGGER.info("--------df.select(\"key1\").distinct().count()-----------" +key1Count)
    LOGGER.info("--------df.groupBy(\"key1\").count().sort(\"key1\").show()-----------")
    df.groupBy("key1").count().sort("key1").show()
    LOGGER.info("--------df.groupBy(\"key1\").count().sort($\"key1\".desc).show()-----------")
    df.groupBy("key1").count().sort($"key1".desc).show()
    LOGGER.info("--------df.groupBy(\"key1\").count.withColumnRenamed(\"count\", \"cnt\").sort($\"cnt\".desc).show-----------")
    df.groupBy("key1").count
      .withColumnRenamed("count", "cnt").sort($"cnt".desc).show()
    LOGGER.info("--------df.groupBy(\"key1\").agg(count(\"key1\").as(\"cnt\")).show-----------")
    df.groupBy("key1").agg(count("key1").as("cnt")).show()

    // 使用agg聚合函数
    df.groupBy("key1").agg(count("key1"), max("key2"), avg("key3")).show
    df.groupBy("key1").agg("key1"->"count", "key2"->"max", "key3"->"avg").show()
    df.groupBy("key1").agg(Map(("key1","count"), ("key2","max"), ("key3","avg"))).show()
    df.groupBy("key1").agg(("key1","count"), ("key2","max"), ("key3","avg")).show
    df.groupBy("key1")
      .agg(count("key1").as("cnt"), max("key2").as("max_key2"), avg("key3").as("avg_key3"))
      .sort($"cnt",$"max_key2".desc).show

           

可以直接在IntelliJ Idea执行或者打包后在远程服务器执行。

继续阅读