天天看點

Spark的Dataset操作(四)-其他單表操作Spark的Dataset操作(四)-其他單表操作

Spark的Dataset操作(四)-其他單表操作

還有些雜七雜八的小用法沒有提到,比如添加列,删除列,NA值處理之類的,就在這裡大概列一下吧。

資料集還是之前的那個吧:

scala> val df = spark.createDataset(Seq(
  ("aaa",1,2),("bbb",3,4),("ccc",3,5),("bbb",4, 6))   ).toDF("key1","key2","key3")
df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]

scala> df.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key3: integer (nullable = false)

scala> df.show
+----+----+----+
|key1|key2|key3|
+----+----+----+
| aaa|   1|   2|
| bbb|   3|   4|
| ccc|   3|   5|
| bbb|   4|   6|
+----+----+----+
           

下面來添加一列,可以是字元串類型,整型;可以是常量或者是對目前已有的某列的變換,都行:

/* 
新增字元串類型的列key_4,都初始化為new_str_col,注意這裡的lit()函數 
還有人發消息說這個lit(),補一下說明吧。這裡的lit()是spark自帶的函數,需要import org.apache.spark.sql.functions
def lit(literal: Any): Column Creates a Column of literal value. The passed in object is returned
directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column
also. Otherwise, a new Column is created to represent the literal value. 
Since 1.3.0
*/
scala> val df_1 = df.withColumn("key4", lit("new_str_col"))
df_1: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 2 more fields]

scala> df_1.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key3: integer (nullable = false)
 |-- key4: string (nullable = false)

scala> df_1.show
+----+----+----+-----------+
|key1|key2|key3|       key4|
+----+----+----+-----------+
| aaa|   1|   2|new_str_col|
| bbb|   3|   4|new_str_col|
| ccc|   3|   5|new_str_col|
| bbb|   4|   6|new_str_col|
+----+----+----+-----------+

/* 
同樣的,新增Int類型的列key5,都初始化為1024 
*/
scala> val df_2 = df_1.withColumn("key5", lit(1024))
df_2: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]

scala> df_2.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key3: integer (nullable = false)
 |-- key4: string (nullable = false)
 |-- key5: integer (nullable = false)

scala> df_2.show
+----+----+----+-----------+-----+
|key1|key2|key3|       key4|key5|
+----+----+----+-----------+-----+
| aaa|   1|   2|new_str_col| 1024|
| bbb|   3|   4|new_str_col| 1024|
| ccc|   3|   5|new_str_col| 1024|
| bbb|   4|   6|new_str_col| 1024|
+----+----+----+-----------+-----+

/*
再來個不是常量的新增列key6 = key5 * 2
*/
scala> val df_3 = df_2.withColumn("key6", $"key5"*2)
df_3: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]

scala> df_3.show
+----+----+----+-----------+----+----+
|key1|key2|key3|       key4|key5|key6|
+----+----+----+-----------+----+----+
| aaa|   1|   2|new_str_col|1024|2048|
| bbb|   3|   4|new_str_col|1024|2048|
| ccc|   3|   5|new_str_col|1024|2048|
| bbb|   4|   6|new_str_col|1024|2048|
+----+----+----+-----------+----+----+

/*
這次是用的expr()函數
*/
scala> val df_4 = df_2.withColumn("key6", expr("key5 * 4"))
df_4: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]

scala> df_4.show
+----+----+----+-----------+----+----+
|key1|key2|key3|       key4|key5|key6|
+----+----+----+-----------+----+----+
| aaa|   1|   2|new_str_col|1024|4096|
| bbb|   3|   4|new_str_col|1024|4096|
| ccc|   3|   5|new_str_col|1024|4096|
| bbb|   4|   6|new_str_col|1024|4096|
+----+----+----+-----------+----+----+
           

删除列就比較簡單了,指定列名就好了

/*
删除列key5
*/
scala> val df_5 = df_4.drop("key5")
df_5: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]

scala> df_4.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key3: integer (nullable = false)
 |-- key4: string (nullable = false)
 |-- key5: integer (nullable = false)
 |-- key6: integer (nullable = false)

scala> df_5.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key3: integer (nullable = false)
 |-- key4: string (nullable = false)
 |-- key6: integer (nullable = false)

scala> df_5.show
+----+----+----+-----------+----+
|key1|key2|key3|       key4|key6|
+----+----+----+-----------+----+
| aaa|   1|   2|new_str_col|4096|
| bbb|   3|   4|new_str_col|4096|
| ccc|   3|   5|new_str_col|4096|
| bbb|   4|   6|new_str_col|4096|
+----+----+----+-----------+----+

/*
可以一次删除多列key4和key6
*/
scala> val df_6 = df_5.drop("key4", "key6")
df_6: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]

/* 這裡的columns函數以數組形式傳回所有列名 */
scala> df_6.columns
res23: Array[String] = Array(key1, key2, key3)

scala> df_6.show
+----+----+----+
|key1|key2|key3|
+----+----+----+
| aaa|   1|   2|
| bbb|   3|   4|
| ccc|   3|   5|
| bbb|   4|   6|
+----+----+----+
           

再寫幾個null值等無效資料的一些處理吧

這次得換個資料集,null值的表用個csv檔案導入,代碼如下:

/*
csv檔案内容如下:
key1,key2,key3,key4,key5
aaa,1,2,t1,4
bbb,5,3,t2,8
ccc,2,2,,7
,7,3,t1,
bbb,1,5,t3,0
,4,,t1,8 
*/
scala> val df = spark.read.option("header","true").csv("natest.csv")
df: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2|null|   7|
|null|   7|   3|  t1|null|
| bbb|   1|   5|  t3|   0|
| null|   4|null|  t1|   8|
+----+----+----+----+----+

/*
把key1列中所有的null值替換成'xxx' 
*/
scala> val df_2 = df.na.fill("xxx",Seq("key1"))
df_2: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df_2.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2|null|   7|
| xxx|   7|   3|  t1|null|
| bbb|   1|   5|  t3|   0|
| xxx|   4|null|  t1|   8|
+----+----+----+----+----+

/*
一次修改相同類型的多個列的示例。
這裡是把key3,key5列中所有的null值替換成1024。
csv導入時預設是string,如果是整型,寫法是一樣的,有各個類型的重載。
*/
scala> val df_3 = df.na.fill("1024",Seq("key3","key5"))
df_3: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df_3.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2|null|   7|
|null|   7|   3|  t1|1024|
| bbb|   1|   5|  t3|   0|
|null|   4|1024|  t1|   8|
+----+----+----+----+----+

/*
一次修改不同類型的多個列的示例。
csv導入時預設是string,如果是整型,寫法是一樣的,有各個類型的重載。
*/
scala> val df_3 = df.na.fill(Map(("key1"->"yyy"),("key3","1024"),("key4","t88"),("key5","4096")))
df_3: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df_3.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2| t88|   7|
| yyy|   7|   3|  t1|4096|
| bbb|   1|   5|  t3|   0|
| yyy|   4|1024|  t1|   8|
+----+----+----+----+----+

/*
不修改,隻是過濾掉含有null值的行。
這裡是過濾掉key3,key5列中含有null的行
*/
scala> val df_4 = df.na.drop(Seq("key3","key5"))
df_4: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df_4.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2|null|   7|
| bbb|   1|   5|  t3|   0|
+----+----+----+----+----+

/*
過濾掉指定的若幹列中,有效值少于n列的行
這裡是過濾掉key1,key2,key3這3列中有效值小于2列的行。最後一行中,這3列有2列都是null,是以被過濾掉了。
*/
scala> val df_5 = df.na.drop(2,Seq("key1","key2","key3"))
df_5: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2|null|   7|
|null|   7|   3|  t1|null|
| bbb|   1|   5|  t3|   0|
|null|   4|null|  t1|   8|
+----+----+----+----+----+

scala> df_5.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2|null|   7|
|null|   7|   3|  t1|null|
| bbb|   1|   5|  t3|   0|
+----+----+----+----+----+

/*
同上,如果不指定列名清單,則預設列名清單就是所有列
*/
scala> val df_6 = df.na.drop(4)
df_6: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df_6.show
+----+----+----+----+----+
|key1|key2|key3|key4|key5|
+----+----+----+----+----+
| aaa|   1|   2|  t1|   4|
| bbb|   5|   3|  t2|   8|
| ccc|   2|   2|null|   7|
| bbb|   1|   5|  t3|   0|
+----+----+----+----+----+
           

ok,就到這吧,下次再寫多表的部分了~~

喜歡這些内容的話,可以關注下公衆号哈~
Spark的Dataset操作(四)-其他單表操作Spark的Dataset操作(四)-其他單表操作

繼續閱讀