天天看點

基于spark的Scala程式設計—DataFrame操作之分組聚合

本文主要是寫關于Scala如何操作spark的DataFrame,本文先介紹分組聚合的用法,詳細請看下面的步驟,以下所有的代碼都是在IntelliJ Idea裡面編寫并且遠端調試的。

先建立sparksession對象,代碼如下:

val conf = new SparkConf().setAppName("LzSparkDatasetExamples").setMaster("local")
    val sparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
           

建立一個DataFrame對象,代碼如下:

// 這裡一定要加這一行導入,否則toDF會報錯
    import sparkSession.implicits._
    val df = sparkSession.createDataset(Seq(
      ("aaa", 1, 2),
      ("bbb", 3, 4),
      ("ccc", 3, 5),
      ("bbb", 4, 6)
    )).toDF("key1", "key2", "key3")
           

分組聚合相關操作代碼如下:

// 常見聚合函數
    import org.apache.spark.sql.functions._
    LOGGER.info("--------df.groupBy(\"key1\").count().show()-----------")
    df.groupBy("key1").count().show()
    LOGGER.info("--------df.select(\"key1\").distinct().show()-----------")
    df.select("key1").distinct().show()
    val key1Count = df.select("key1").distinct().count()
    LOGGER.info("--------df.select(\"key1\").distinct().count()-----------" +key1Count)
    LOGGER.info("--------df.groupBy(\"key1\").count().sort(\"key1\").show()-----------")
    df.groupBy("key1").count().sort("key1").show()
    LOGGER.info("--------df.groupBy(\"key1\").count().sort($\"key1\".desc).show()-----------")
    df.groupBy("key1").count().sort($"key1".desc).show()
    LOGGER.info("--------df.groupBy(\"key1\").count.withColumnRenamed(\"count\", \"cnt\").sort($\"cnt\".desc).show-----------")
    df.groupBy("key1").count
      .withColumnRenamed("count", "cnt").sort($"cnt".desc).show()
    LOGGER.info("--------df.groupBy(\"key1\").agg(count(\"key1\").as(\"cnt\")).show-----------")
    df.groupBy("key1").agg(count("key1").as("cnt")).show()

    // 使用agg聚合函數
    df.groupBy("key1").agg(count("key1"), max("key2"), avg("key3")).show
    df.groupBy("key1").agg("key1"->"count", "key2"->"max", "key3"->"avg").show()
    df.groupBy("key1").agg(Map(("key1","count"), ("key2","max"), ("key3","avg"))).show()
    df.groupBy("key1").agg(("key1","count"), ("key2","max"), ("key3","avg")).show
    df.groupBy("key1")
      .agg(count("key1").as("cnt"), max("key2").as("max_key2"), avg("key3").as("avg_key3"))
      .sort($"cnt",$"max_key2".desc).show

           

可以直接在IntelliJ Idea執行或者打包後在遠端伺服器執行。

繼續閱讀