天天看點

Spark SparkSQL的資料加載和落地

1.資料的加載

使用read.load(path)

預設加載的是

parquet

格式的檔案,如果需要加載其他類型的檔案,需要通過

format(類型)

指定。當然,spark對一些主要格式的檔案的加載都提供了更加簡潔的API操作方式

加載

json

格式檔案----要求檔案的格式統一
spark.read.format("csv").load("file:///C:\\Users\\mycat\\Desktop/test.json")
//使用json函數
spark.read.json("file:///C:\\Users\\mycat\\Desktop/test.json")
           
加載普通文本檔案
spark.read.format("text").load("file:///D://wordin/w1.txt")
//或者
spark.read.textFile("file:///D://wordin/w1.txt")
           
加載CSV格式檔案
spark.read.format("csv").load("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")
// 或者
spark.read.csv("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")//.toDF("addressid","address")   // 可使用帶參數toDF為對應的列重命名
           
加載ORC檔案
spark.read.format("orc").load(“file:///D://test.orc”)
// 或者
spark.read.orc(“file:///D://test.orc”)
           
加載MySQL中的資料
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","xxx")
val df = spark.read.jdbc(url,table,properties)
           

2.資料的落地------對DataFrame或者Dataset的操作

json

格式檔案
val spark=SparkSession.builder().appName("dfdemo")
  .master("local[*]")
  .getOrCreate()
val jdf = spark.read.json("file:///D://test/p1.json")

//寫操作
jdf.write.format("json").save("ile:///D://test/p2.json")
// jdf.write.json("ile:///D://test/p2.json")
           

關于其他格式的檔案,例如

orc

,

parquet

,

csv

等格式的格式資料的落地,與上雷同。

對于MySQL等關系型資料庫的話,先來看看寫入模式

SaveMode

ErrorIfExists://預設,目錄存在則報錯
Append:追加模式,原有的基礎上追加寫入
Override:覆寫模式,覆寫原來的重寫一份
Ignore:忽略本次寫入模式
           
1)預設如果寫入的表存在則報錯
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.jdbc(url,table,properties)
// 等同于 mdf.write.mode(SaveMode.ErrorIfExists).jdbc(url,table,properties)
           
2)追加寫入到MySQL
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.mode(SaveMode.Append).jdbc(url,table,properties)
           
3)覆寫寫入到MySQL—覆寫原表中的資料
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.mode(SaveMode.Override).jdbc(url,table,properties)
           
4)如果目标輸出存在則忽略本次寫入
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")

mdf.write.mode(SaveMode.Ignore).jdbc(url,table,properties)
           

繼續閱讀