1.資料的加載
使用read.load(path)
預設加載的是
parquet
格式的檔案,如果需要加載其他類型的檔案,需要通過
format(類型)
指定。當然,spark對一些主要格式的檔案的加載都提供了更加簡潔的API操作方式
加載 json
格式檔案----要求檔案的格式統一
spark.read.format("csv").load("file:///C:\\Users\\mycat\\Desktop/test.json")
//使用json函數
spark.read.json("file:///C:\\Users\\mycat\\Desktop/test.json")
加載普通文本檔案
spark.read.format("text").load("file:///D://wordin/w1.txt")
//或者
spark.read.textFile("file:///D://wordin/w1.txt")
加載CSV格式檔案
spark.read.format("csv").load("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")
// 或者
spark.read.csv("file:///D:\\soft\\databases\\powerdesign\\testdata/address.csv")//.toDF("addressid","address") // 可使用帶參數toDF為對應的列重命名
加載ORC檔案
spark.read.format("orc").load(“file:///D://test.orc”)
// 或者
spark.read.orc(“file:///D://test.orc”)
加載MySQL中的資料
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","xxx")
val df = spark.read.jdbc(url,table,properties)
2.資料的落地------對DataFrame或者Dataset的操作
寫 json
格式檔案
val spark=SparkSession.builder().appName("dfdemo")
.master("local[*]")
.getOrCreate()
val jdf = spark.read.json("file:///D://test/p1.json")
//寫操作
jdf.write.format("json").save("ile:///D://test/p2.json")
// jdf.write.json("ile:///D://test/p2.json")
關于其他格式的檔案,例如
orc
,
parquet
,
csv
等格式的格式資料的落地,與上雷同。
對于MySQL等關系型資料庫的話,先來看看寫入模式
SaveMode
:
ErrorIfExists://預設,目錄存在則報錯
Append:追加模式,原有的基礎上追加寫入
Override:覆寫模式,覆寫原來的重寫一份
Ignore:忽略本次寫入模式
1)預設如果寫入的表存在則報錯
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.jdbc(url,table,properties)
// 等同于 mdf.write.mode(SaveMode.ErrorIfExists).jdbc(url,table,properties)
2)追加寫入到MySQL
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.mode(SaveMode.Append).jdbc(url,table,properties)
3)覆寫寫入到MySQL—覆寫原表中的資料
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.mode(SaveMode.Override).jdbc(url,table,properties)
4)如果目标輸出存在則忽略本次寫入
val url="jdbc:mysql://localhost:3306/mktest"
val table="book"
val properties=new Properties()
properties.put("user","root")
properties.put("password","miku")
mdf.write.mode(SaveMode.Ignore).jdbc(url,table,properties)