sparkSQL加載資料
1.read加載資料
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
注意:加載資料的相關參數需寫到上述方法中,如:textFile需傳入加載資料的路徑,jdbc需傳入JDBC相關參數。
例如:直接加載Json資料
scala> spark.read.json("/opt/module/spark-local/people.json").show
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|
2.format指定加載資料類型
scala> spark.read.format("…")[.option("…")].load("…")
用法詳解:
format("…"):指定加載的資料類型,包括"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile”
load("…"):在"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile"格式下需要傳入加載資料的路徑
option(”…"):在"jdbc"格式下需要傳入JDBC相應參數,url、user、password和dbtable
例如:使用format指定加載Json類型資料
scala> spark.read.format("json").load ("/opt/module/spark-local/people.json").show
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|
3.在檔案上直接運作SQL
我們前面都是使用read API 先把檔案加載到 DataFrame然後再查詢,其實,我們也可以直接在檔案上進行查詢
scala> spark.sql("select * from json.`/opt/module/spark-local/people.json`").show
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|
+---+--------+|
說明:
json表示檔案的格式. 後面的檔案具體路徑需要用反引号括起來.
sparkSQL儲存資料
1.write直接儲存資料
scala> df.write.
csv jdbc json orc parquet textFile… …
注意:儲存資料的相關參數需寫到上述方法中。如:textFile需傳入加載資料的路徑,jdbc需傳入JDBC相關參數。
例如:直接将df中資料儲存到指定目錄
//預設儲存格式為parquet
scala> df.write.save("/opt/module/spark-local/output")
//可以指定為儲存格式,直接儲存,不需要再調用save了
scala> df.write.json("/opt/module/spark-local/output")
2.format指定儲存資料類型
scala> df.write.format("…")[.option("…")].save("…")
用法詳解:
format("…"):指定儲存的資料類型,包括"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile”。
save ("…"):在"csv"、“orc”、“parquet"和"textFile"格式下需要傳入儲存資料的路徑。
option(”…"):在"jdbc"格式下需要傳入JDBC相應參數,url、user、password和dbtable
3.檔案儲存選項
儲存操作可以使用 SaveMode, 用來指明如何處理資料,使用mode()方法來設定。
例如:使用指定format指定儲存類型進行儲存
df.write.mode("append").json("/opt/module/spark-local/output")
例一、讀取Json檔案
Spark SQL 能夠自動推測 JSON資料集的結構,并将它加載為一個Dataset[Row]. 可以通過SparkSession.read.json()去加載一個 一個JSON 檔案。
1.從JDBC讀取資料
Spark SQL 能夠自動推測 JSON資料集的結構,并将它加載為一個Dataset[Row]. 可以通過SparkSession.read.json()去加載一個 一個JSON 檔案。
注意:這個JSON檔案不是一個傳統的JSON檔案,每一行都得是一個JSON串。格式如下:
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
- 導入隐式轉換
import spark.implicits._
- 加載JSON檔案
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)
- 建立臨時表
peopleDF.createOrReplaceTempView("people")
- 資料查詢
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+
Mysql
1.從JDBC讀取資料
object SparkSQL02_Datasource {
def main(args: Array[String]): Unit = {
//建立上下文環境配置對象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
//建立SparkSession對象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//方式1:通用的load方法讀取
spark.read.format("jdbc")
.option("url", "jdbc:mysql://hadoop202:3306/test")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.load().show
//方式2:通用的load方法讀取 參數另一種形式
spark.read.format("jdbc")
.options(Map("url"->"jdbc:mysql://hadoop202:3306/test?user=root&password=123456",
"dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show
//方式3:使用jdbc方法讀取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop202:3306/test", "user", props)
df.show
//釋放資源
spark.stop()
}
}
2.向JDBC寫資料
object SparkSQL03_Datasource {
def main(args: Array[String]): Unit = {
//建立上下文環境配置對象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
//建立SparkSession對象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30)))
val ds: Dataset[User2] = rdd.toDS
//方式1:通用的方式 format指定寫出類型
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop202:3306/test")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.mode(SaveMode.Append)
.save()
//方式2:通過jdbc方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop202:3306/test", "user", props)
//釋放資源
spark.stop()
}
}
case class User2(name: String, age: Long)
Hive
外部Hive應用
如果Spark要接管Hive外部已經部署好的Hive,需要通過以下幾個步驟。
1.确定原有Hive是正常工作的
2.需要把hive-site.xml拷貝到spark的conf/目錄下
3.如果以前hive-site.xml檔案中,配置過Tez相關資訊,注釋掉
4.把Mysql的驅動copy到Spark的jars/目錄下
5.需要提前啟動hive服務,hive/bin/hiveservices.sh start
6.如果通路不到hdfs,則需把core-site.xml和hdfs-site.xml拷貝到conf/目錄
啟動 spark-shell
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| emp| false|
+--------+---------+-----------+
scala> spark.sql("select * from emp").show
19/02/09 19:40:28 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
+-----+-------+---------+----+----------+------+------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+-------+---------+----+----------+------+------+------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| null| 20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null| 20|
| 7654| MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| null| 30|
| 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| null| 10|
| 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| null| 20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000.0| null| 10|
| 7844| TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| null| 20|
| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| null| 30|
| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null| 20|
| 7934| MILLER| CLERK|7782| 1982-1-23|1300.0| null| 10|
| 7944|zhiling| CLERK|7782| 1982-1-23|1300.0| null| 50|
+-----+-------+---------+----+----------+------+------+------+
代碼中操作Hive
- 添加依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
- 拷貝hive-site.xml到resources目錄
- 代碼實作
object SparkSQL08_Hive{
def main(args: Array[String]): Unit = {
//建立上下文環境配置對象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("SQLTest")
.getOrCreate()
spark.sql("show tables").show()
//釋放資源
spark.stop()
}
}