常用Spark-SQL資料處理總結
導語: 本文是在實習工作當中就遇到的資料處理當中Spark-SQL相關的操作做一個總結。主要包含ArrayType, JSON等複雜資料類型的處理。以及UDF的各種實作, 希望通過更加簡單的處理方式提高程式的可讀性, 代碼簡潔性和優雅性。
本文是使用Scala-test架構寫的測試用例。并且将SparkSession進行如下封裝。下文的所有測試用例都遵循此規則。
trait SparkSessionTestWrapper {
lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local[*]")
.appName("spark test example")
.config("spark.driver.bindAddress","127.0.0.1")
.getOrCreate()
}
}
UDF常用方法
-
無參數輸入, 輸出一列
下面是新添加一個空的Array列的示例
運作結果如下:class UdfTest extends FunSuite with SparkSessionTestWrapper { test("testUdf") { import spark.implicits._ val sourceDf = Seq( ("測試") ).toDF("name") val emptyArray = udf(() => Array.empty[String]) val actualDf = sourceDf.withColumn("value", emptyArray()) actualDf.printSchema() actualDf.show() } }
- 輸入一列, 輸出一列
運作結果如下:test("testUdf2") { import spark.implicits._ // 定義UDF def sexMapFunction: UserDefinedFunction = udf((sex: String) => { sex match { case "0" => "F" case "1" => "M" case "2" => "N" case _ => "N" } }) val sourceDf = Seq( ("張三", "0"), ("李四", "1"), ("王五", "2"), ("劉六", "3") ).toDF("name", "code") val actualDf = sourceDf.withColumn("sex", sexMapFunction(col("code"))) actualDf.printSchema() actualDf.show() println("**************** following is sql test ****************") spark.udf.register("sexMap", sexMapFunction) sourceDf.createTempView("table_tmp") val sql = "SELECT name, code, sexMap(code) as sex FROM table_tmp"; val df = spark.sql(sql) df.printSchema() df.show() }
- 輸入一列, 輸出多列
test("testUdf3") { import spark.implicits._ case class Test(id: Int, name: String) val schema = StructType(List(StructField("id", IntegerType), StructField("name", StringType))) // 定義UDF def SplitFunction: UserDefinedFunction = udf((str: String) => { val splits = str.split(",") Test(splits(0).toInt, splits(1)) }, schema) val sourceDf = Seq( ("1,張三"), ("2,李四"), ("3,王五") ).toDF("case") var actualDf = sourceDf.withColumn("idname", SplitFunction(col("case"))) actualDf = actualDf.select("case", "idname.*") actualDf.printSchema() actualDf.show() }
- 輸入多列, 輸出一列
運作結果test("testUdf4") { import spark.implicits._ def AvgScoreFunction: UserDefinedFunction = udf((row: Row) => { row.schema.fields.map { field => field.dataType match { case IntegerType => row.getAs[Int](field.name).toDouble case StringType => row.getAs[String](field.name).toDouble case FloatType => row.getAs[Float](field.name).toDouble case DoubleType => row.getAs[Double](field.name) case LongType => row.getAs[Long](field.name).toDouble } }.sum / row.schema.fields.length }) val sourceDf = Seq( ("張三", "34", 28, 19F, 29.0, 99L), ("李四", "90.5", 110, 80.8F, 88.88, 60L) ).toDF("姓名", "國文", "數學", "英語", "實體", "體育") sourceDf.printSchema() sourceDf.show() var actualDf = sourceDf.withColumn("語數外平均成績", AvgScoreFunction(struct("國文", "數學", "英語"))) actualDf.printSchema() actualDf.show() actualDf = sourceDf.withColumn("全部課程平均成績", AvgScoreFunction(struct(sourceDf.columns.drop(1).map(col): _*))) actualDf.show() }
JSON資料類型處理
- 提取單個(多個)字段值
test("testUdf5") { import spark.implicits._ val sourceDf = Seq( (1, "{\"key_a\": \"value_a\", \"key_b\": 1}") ).toDF("id", "json_str") var actualDf = sourceDf.withColumn("key_a", get_json_object(col("json_str"), "$.key_a")) // get_json_object 擷取單個json字段 .withColumn("key_b", get_json_object(col("json_str"), "$.key_b")) // json_tuple 擷取多個字段 .select(col("id"), col("json_str"), col("key_a"), col("key_b"), json_tuple(col("json_str"), "key_a", "key_b").as(Seq("key_a_copy", "key_b_copy"))) actualDf.printSchema() actualDf.show() sourceDf.createTempView("table_tmp") val sql = """ | SELECT id, | json_str, | get_json_object(t.json_str, "$.key_a") as key_a, | get_json_object(t.json_str, "$.key_b") as key_b, | j.key_a_copy, | j.key_b_copy | FROM table_tmp t | LATERAL VIEW json_tuple(t.json_str, 'key_a', 'key_b') j as key_a_copy, key_b_copy |""".stripMargin actualDf = spark.sql(sql) actualDf.printSchema() actualDf.show() }
- 提取JSON數組
test("testUdf6") { import spark.implicits._ val sourceDf = Seq( (1, """ |[ | { | "name": "網友", | "weight": 0.1973038 | }, | { | "name": "中國銀聯", | "weight": 0.1973038 | } |] |""".stripMargin.replaceAll("\\s", "")) ).toDF("id", "json_str") var actualDf = sourceDf // 擷取單個字段 .withColumn("names_1", from_json(col("json_str"), ArrayType(StructType(List(StructField("name", StringType)))))) // 擷取多個字段 .withColumn("names_weight_struct", from_json(col("json_str"), ArrayType(StructType(List(StructField("name", StringType), StructField("weight", StringType)))))) // 擷取單個字段 .withColumn("names_2", split(regexp_replace(get_json_object(col("json_str"), "$[*].name"), "\"|\\[|\\]", ""), ",")) actualDf.printSchema() actualDf.show() // 通過SQL的方式擷取單個字段 sourceDf.createTempView("table_tmp") val sql = """ | SELECT id, | json_str, | split(regexp_replace(get_json_object(json_str, "$[*].name"), "\\[|\\]|\"", ""), ",") as name | FROM table_tmp t |""".stripMargin actualDf = spark.sql(sql) actualDf.printSchema() actualDf.show() }
其他Spark-SQL操作
- 将某列的一行array展開。
test("testUdf7") { import spark.implicits._ val sourceDf = Seq( ("name", Array("張三", "李四", "王五")) ).toDF("col_name", "col_value") sourceDf.printSchema() sourceDf.show() println("\nfollowing is explode result:\n") val actualDf = sourceDf.select(col("col_name"), explode(col("col_value"))) actualDf.printSchema() actualDf.show() }
- Flatten 嵌套的 struct 列
test("testUdf8") { import spark.implicits._ var sourceDf = Seq( ("A", "B", "C", "D", "E") ).toDF("a", "b", "c", "d", "e") sourceDf = sourceDf.withColumn("struct_de", struct("d", "e")) .withColumn("struct_bc_de", struct("b", "c", "struct_de")) .select("a", "struct_bc_de") println("following is sourceDf:\n") sourceDf.printSchema() sourceDf.show() def flattenStructSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val columnName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenStructSchema(st, columnName) case _ => Array( col(columnName) // 這列别名可以不加 //.as(columnName.replace(".","_")) ) } }) } val actualDf = sourceDf.select(flattenStructSchema(sourceDf.schema): _*) println("following is explode result:\n") actualDf.printSchema() actualDf.show() }
- 添加聚合列, 但是保留其他所有列
test("testUdf9") { import spark.implicits._ var sourceDf = Seq( ("a", "b", 1, "c", "d"), ("a", "b", 2, "m", "n"), ("a", "b", 3, "p", "q"), ("a", "B", 2, "u", "v"), ("a", "B", 1, "x", "y"), ("a", "B", 3, "r", "w") ).toDF("col_1", "col_2", "number", "col_other_1", "col_other_2") sourceDf.printSchema() sourceDf.show() val groupCols = Array("col_1", "col_2") val structCol = "struct" sourceDf = sourceDf.withColumn(structCol, struct(sourceDf.columns.diff(groupCols).map(col): _*)) println("following is result:\n") var actualDf = sourceDf.groupBy(groupCols.map(col): _*) .agg( sum("number").as("sum"), avg("number").as("average"), collect_list(structCol).as(structCol)) actualDf = actualDf.select(actualDf.columns.diff(structCol).map(col) :+ explode(col(structCol)) : _*) .drop(structCol) actualDf = actualDf.select(flattenStructSchema(actualDf.schema): _*) actualDf.printSchema() actualDf.show() }
- 一個DataFrame JOIN 另一個DataFrame兩次
上面程式是souceDf連續join userDf兩次, 于是報了錯。原因是檢查到了笛卡爾積test("testUdf10") { import spark.implicits._ var sourceDf = Seq( (1, 2), (3, 4) ).toDF("id_1", "id_2") var userDf = Seq( (1, "張三"), (2, "李四"), (3, "王五"), (4, "劉六") ).toDF("id", "name") sourceDf.show() userDf.show() var actualDf = sourceDf.join(userDf, sourceDf.col("id_1") === userDf.col("id"), "left") .withColumn("name_1", userDf.col("name")) actualDf = actualDf.join(userDf, actualDf.col("id_2") === userDf.col("id"), "left") .withColumn("name_2", userDf.col("name")) println("following is join result:\n") actualDf.printSchema() actualDf.show() }
解決辦法:給要Join的DataFrame加上别名, 如下Detected implicit cartesian product for LEFT OUTER join between logical plans Project [id_1#5, id_2#6, id#14, name#15, name#15 AS name_1#44] +- Join LeftOuter, (id_1#5 = id#14) :- LocalRelation [id_1#5, id_2#6] +- LocalRelation [id#14, name#15] and LocalRelation [id#50, name#51] Join condition is missing or trivial. Either: use the CROSS JOIN syntax to allow cartesian products between these relations, or: enable implicit cartesian products by setting the configuration variable spark.sql.crossJoin.enabled=true; org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for LEFT OUTER join between logical plans Project [id_1#5, id_2#6, id#14, name#15, name#15 AS name_1#44] +- Join LeftOuter, (id_1#5 = id#14) :- LocalRelation [id_1#5, id_2#6] +- LocalRelation [id#14, name#15] and LocalRelation [id#50, name#51] Join condition is missing or trivial. Either: use the CROSS JOIN syntax to allow cartesian products between these relations, or: enable implicit cartesian products by setting the configuration variable spark.sql.crossJoin.enabled=true; at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$22.applyOrElse(Optimizer.scala:1295)
test("testUdf10") { import spark.implicits._ var sourceDf = Seq( (1, 2), (3, 4) ).toDF("id_1", "id_2").as("sourceDf") val userDf = Seq( (1, "張三"), (2, "李四"), (3, "王五"), (4, "劉六") ).toDF("id", "name").as("userDf") sourceDf.show() userDf.show() var leftJoinedDf = sourceDf.join(userDf, col("sourceDf.id_1") === col("userDf.id"), "left") .withColumn("name_1", userDf.col("name")).alias("leftJoinedDf") val actualDf = leftJoinedDf.join(userDf, col("leftJoinedDf.id_2") === col("userDf.id"), "left") .withColumn("name_2", userDf.col("name")) .select("id_1", "name_1", "id_2", "name_2") println("following is join result:\n") actualDf.printSchema() actualDf.show() }
- DataFrame轉RDD使用Map方法的另一種寫法
def mapFunction: UserDefinedFunction = udf((row: Row) => { // 自已在這裡寫需要的傳回值, 可以是一列, 也可以是多列, 參照本文的第一部分UDF寫法 }) df.withColumn("map_return", mapFunction(df.columns.map(col): _*))
-
def naMap(df: DataFrame): Map[String, Any] = { df.schema.filter(field => Seq(StringType, LongType, IntegerType, DoubleType, ByteType, ShortType) .contains(field.dataType)) .map(column => { column.dataType match { case IntegerType | LongType | ByteType | ShortType => (column.name -> 0) case StringType => (column.name -> "") case DoubleType => (column.name -> 0.0) } }).toMap } df = df.na.fill(naMap(df))
-
def selectColumn(df: DataFrame, schema: StructType): DataFrame = { val original = df.dtypes.map(v => v._1) val defaultValueMap = Map("ByteType" -> 0, "IntegerType" -> 0, "LongType" -> 0, "StringType" -> "", "FloatType" -> 0.0, "DoubleType" -> 0.0) var rtn = df for (field <- schema.fields) { // 如果select的字段在df.schema中不存在 if (!original.contains(field.name)) { // 字段名稱在defaultFieldMap中沒有, 則使用字段類型映射預設值 rtn = rtn.withColumn(field.name, lit(defaultValueMap(field.dataType.toString))) } } val selectFields = schema.fields.map(v => v.name) rtn.select(selectFields(0), selectFields.drop(1): _*) }
-
df.rdd.foreachPartition(iterator => { val db = DBUtils(url, name, password) db.prop.setProperty("rewriteBatchedStatements", "true") val conn = db.getConnection val args = List.fill(fieldLen)("?") val sql = s"INSERT IGNORE INTO table_name (`a`, `b`) values (?, ?) " val ps = conn.prepareStatement(sql) var size = 0 iterator.foreach(row => { for ((dataTypeMap, index) <- dataTypeMap.zipWithIndex) { dataTypeMap._2 match { case StringType => ps.setString(index + 1, row.getString(index)) case IntegerType => ps.setInt(index + 1, row.getInt(index)) case LongType => ps.setLong(index + 1, row.getLong(index)) case DoubleType => ps.setDouble(index + 1, row.getDouble(index)) case ByteType => ps.setByte(index + 1, row.getByte(index)) case ShortType => ps.setShort(index + 1, row.getShort(index)) } } size += 1 ps.addBatch() if (size == batchSize) { ps.executeBatch() ps.clearBatch() size = 0 } }) ps.executeBatch() conn.close() })