本文主要是写关于Scala如何操作spark的DataFrame,本文先介绍分组聚合的用法,详细请看下面的步骤,以下所有的代码都是在IntelliJ Idea里面编写并且远程调试的。
先创建sparksession对象,代码如下:
val conf = new SparkConf().setAppName("LzSparkDatasetExamples").setMaster("local")
val sparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
创建一个DataFrame对象,代码如下:
// 这里一定要加这一行导入,否则toDF会报错
import sparkSession.implicits._
val df = sparkSession.createDataset(Seq(
("aaa", 1, 2),
("bbb", 3, 4),
("ccc", 3, 5),
("bbb", 4, 6)
)).toDF("key1", "key2", "key3")
分组聚合相关操作代码如下:
// 常见聚合函数
import org.apache.spark.sql.functions._
LOGGER.info("--------df.groupBy(\"key1\").count().show()-----------")
df.groupBy("key1").count().show()
LOGGER.info("--------df.select(\"key1\").distinct().show()-----------")
df.select("key1").distinct().show()
val key1Count = df.select("key1").distinct().count()
LOGGER.info("--------df.select(\"key1\").distinct().count()-----------" +key1Count)
LOGGER.info("--------df.groupBy(\"key1\").count().sort(\"key1\").show()-----------")
df.groupBy("key1").count().sort("key1").show()
LOGGER.info("--------df.groupBy(\"key1\").count().sort($\"key1\".desc).show()-----------")
df.groupBy("key1").count().sort($"key1".desc).show()
LOGGER.info("--------df.groupBy(\"key1\").count.withColumnRenamed(\"count\", \"cnt\").sort($\"cnt\".desc).show-----------")
df.groupBy("key1").count
.withColumnRenamed("count", "cnt").sort($"cnt".desc).show()
LOGGER.info("--------df.groupBy(\"key1\").agg(count(\"key1\").as(\"cnt\")).show-----------")
df.groupBy("key1").agg(count("key1").as("cnt")).show()
// 使用agg聚合函数
df.groupBy("key1").agg(count("key1"), max("key2"), avg("key3")).show
df.groupBy("key1").agg("key1"->"count", "key2"->"max", "key3"->"avg").show()
df.groupBy("key1").agg(Map(("key1","count"), ("key2","max"), ("key3","avg"))).show()
df.groupBy("key1").agg(("key1","count"), ("key2","max"), ("key3","avg")).show
df.groupBy("key1")
.agg(count("key1").as("cnt"), max("key2").as("max_key2"), avg("key3").as("avg_key3"))
.sort($"cnt",$"max_key2".desc).show
可以直接在IntelliJ Idea执行或者打包后在远程服务器执行。