天天看点

Spark DataFrame学习笔记

Spark DataFrame学习笔记

对于结构性数据,Spark的DataFame是一大利器,Spark的DataFrame相比于RDD来讲做了很多底层的优化,在数据处理上面非常有效。Spark使用了可扩展优化器Catalyst,因为知道每一列数据的具体类型,算子可以单独的在某个列上运作,优化器优化了Spark SQL的很多查询规则,速度对比可以看下网友的测试结果。

Spark DataFrame学习笔记

DataFame的访问大体上有两种方式,一种是创建一个临时视图表或者全局视图表,一种是直接对DataFrame进行操作。前者直接使用spark.sql函数,写上标准的sql语句就可以,后者使用 org.apache.spark.sql.functions里面的函数,灵活模拟sql语句的功能,也是非常强悍。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType

/**
  * Created by balabala on 2018/4/28.
  */
object Test {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Test")
      .master("local")
      .getOrCreate()
    import spark.implicits._ // 需要导入spark的隐式转换
    val df = spark.createDataset(Seq(
      ("2019-07-21 12:00:23", "猪可爱", 1, "女", 11, 82, 70, "湖北"),
      ("2019-07-22 13:00:23", "猪聪明", 2, "男", 51, 84, 73, "湖北"),
      ("2019-07-22 11:00:23", "猪能干", 3, "男", 81, 85, 56, "四川"),
      ("2019-07-23 10:00:23", "猪小米", 4, "女", 42, 86, 34, "四川"),
      ("2019-07-23 09:00:23", "猪大枣", 5, "男", 77, 82, 93, "湖北"),
      ("2019-07-23 09:00:23", "猪脆桃", 6, "男", 54, 84, 92, null),
      ("2019-07-24 11:00:23", "猪靑李", 7, "男", 89, 85, 88, "湖北"),
      ("2019-07-24 12:00:23", "猪红提", 8, "女", 40, 86, 78, "广东"),
      ("2019-07-25 12:00:23", "猪 瓜", 9, "女", 40, 91, 68, "广东"))
    ).toDF("create_time", "name", "no", "sex", "math", "eng", "tech", "address")

    df.show()

    //DataFrame第一种访问方式:直接sql方式

    // 创建临时视图和全局视图,临时视图生命周期仅在本会话里面,全局视图在多个会话里面
    // 本地临时视图不绑定任何数据库,全局视图需要global_temp.XXtable来访问
    df.createTempView("tmp_pig")
    // 标准的sql语法
    spark.sql("SELECT concat_ws('_',no, name, math) as tmp FROM tmp_pig").show()
    spark.sql("SELECT AVG(math) AS avg,MIN(math) AS min FROM tmp_pig").show()


    // 下面全是DataFrame的第二种访问方式

    // 语法糖"$"  两种等价写法
    // df("name") <=> $"name"

    // 查找非空值
    df.filter(df("address").isNotNull).show()
    df.filter("address <> ''").show()

    // 过滤
    df.filter($"math" > 60).show() // Column参数方式
    df.filter("math > 60").show() // St ring参数方式
    df.filter($"sex" === "女").show()
    df.filter($"sex" =!= "女").show()
    df.filter($"address" === "广东" || $"address" === "四川").show()
    df.filter($"math" > 70 && $"eng" > 80).show() // 多条件
    df.filter("address like '%湖%'").show() // like 字符串参数查询
    df.filter($"address".like("湖%")).show() // like Column参数查询
    df.filter("address not like '%湖%'").show() // like 字符串参数“非”查询
    df.filter(!$"address".like("湖%")).show() // like Column参数“非”查询
    
    // 处理空值
    df.na.drop().show() // 丢弃处理
    df.na.fill("default").show()
    df.na.fill(Map("address" -> "default", "sex" -> "未知", "no" -> -1)).show()
    df.na.fill("default").na.replace(Array("address"), Map("广东" -> "四川")).show()
    df.na.replace("address", Map("广东" -> "四川")).show()
    df.na.replace("*", Map("UNKNOWN" -> "null")) // 将所有列的unknown替换为null

    // 列变动
    df.drop("address").show()
    df.withColumn("key", concat_ws("_", $"no", $"sex")).show()
    df.withColumnRenamed("address", "province")
    df.withColumn("province", regexp_replace($"address", "广东", "湖北")).show()

    // 列聚合 结果中就只有groupBy的key和agg的字段
    // 看分布 按性别聚合,看address列的在每个性别上的分布
    df.groupBy("sex")
      .agg(count($"no"), collect_set($"address"), countDistinct("address"))
      .show()

    // 求值
    df.agg(Map("eng" -> "max", "math" -> "avg", "tech" -> "min"))
      .show()

    // 排序
    df.select("no", "math", "eng")
      .orderBy("math") // 升序
      .show()
    df.select("no", "math", "eng")
      .orderBy(desc("math")) // 降序
      .show()
    df.select("no", "math", "eng")
      .orderBy(df("math").desc) // 降序
      .show()

    // 字符串分割
    df.select(split($"name", " ").as("names"))
      .filter(size($"names") > 1) // split后是一个数组,过滤长度大于1的
      .show()

    // 时间处理
    //dayofmonth dayofyear dayofweek weekofyear hour minute second
    df.select(dayofmonth($"create_time").as("day"), $"name").show()

    // 截取trunc 除截取外,其他部分都是01
    // 'year', 'yyyy', 'yy' for truncate by year, or 'month', 'mon', 'mm' for truncate by month
    df.select(trunc($"create_time", "month").as("month"), $"name").show()

    // 将 date/timestamp/string 转成指定的日期字符串格式
    df.select(date_format($"create_time", "Month").as("Month"), $"name").show()

    // 将Unix时间戳转为指定的日期字符串格式 先构造一列时间戳类型的数据
    df.withColumn("timestamp", regexp_replace($"create_time", "2.*", "1564390041").cast(LongType))
      .select(from_unixtime($"timestamp", "yyyy-MM-dd").as("date"), $"create_time")
      .show()
    // 将字符串转为Unix时间戳
    df.select(unix_timestamp($"create_time").as("timestamp")).show()

    // add_months next_day last_day date_add datediff
    df.select(add_months($"create_time", 1).as("newMonth"), $"name").show()

    // 变成Map形式
    val map = df.head().getValuesMap(Seq("name", "no", "sex", "math"))
    println(map.toString())

    // 每条记录变成json格式  详细显示每一行不做截断处理
    df.select(to_json(struct("no", "name", "math")).as("json")).show(truncate = false)
  }

}




           

参考

1 比较一下spark2的DataFrame和RDD

2 Spark SQL, DataFrames and Datasets Guide

继续阅读