本文主要是寫關于Scala如何操作spark的DataFrame,本文先介紹分組聚合的用法,詳細請看下面的步驟,以下所有的代碼都是在IntelliJ Idea裡面編寫并且遠端調試的。
先建立sparksession對象,代碼如下:
val conf = new SparkConf().setAppName("LzSparkDatasetExamples").setMaster("local")
val sparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
建立一個DataFrame對象,代碼如下:
// 這裡一定要加這一行導入,否則toDF會報錯
import sparkSession.implicits._
val df = sparkSession.createDataset(Seq(
("aaa", 1, 2),
("bbb", 3, 4),
("ccc", 3, 5),
("bbb", 4, 6)
)).toDF("key1", "key2", "key3")
分組聚合相關操作代碼如下:
// 常見聚合函數
import org.apache.spark.sql.functions._
LOGGER.info("--------df.groupBy(\"key1\").count().show()-----------")
df.groupBy("key1").count().show()
LOGGER.info("--------df.select(\"key1\").distinct().show()-----------")
df.select("key1").distinct().show()
val key1Count = df.select("key1").distinct().count()
LOGGER.info("--------df.select(\"key1\").distinct().count()-----------" +key1Count)
LOGGER.info("--------df.groupBy(\"key1\").count().sort(\"key1\").show()-----------")
df.groupBy("key1").count().sort("key1").show()
LOGGER.info("--------df.groupBy(\"key1\").count().sort($\"key1\".desc).show()-----------")
df.groupBy("key1").count().sort($"key1".desc).show()
LOGGER.info("--------df.groupBy(\"key1\").count.withColumnRenamed(\"count\", \"cnt\").sort($\"cnt\".desc).show-----------")
df.groupBy("key1").count
.withColumnRenamed("count", "cnt").sort($"cnt".desc).show()
LOGGER.info("--------df.groupBy(\"key1\").agg(count(\"key1\").as(\"cnt\")).show-----------")
df.groupBy("key1").agg(count("key1").as("cnt")).show()
// 使用agg聚合函數
df.groupBy("key1").agg(count("key1"), max("key2"), avg("key3")).show
df.groupBy("key1").agg("key1"->"count", "key2"->"max", "key3"->"avg").show()
df.groupBy("key1").agg(Map(("key1","count"), ("key2","max"), ("key3","avg"))).show()
df.groupBy("key1").agg(("key1","count"), ("key2","max"), ("key3","avg")).show
df.groupBy("key1")
.agg(count("key1").as("cnt"), max("key2").as("max_key2"), avg("key3").as("avg_key3"))
.sort($"cnt",$"max_key2".desc).show
可以直接在IntelliJ Idea執行或者打包後在遠端伺服器執行。