天天看點

Spark Sql 連接配接mysql

版權聲明:本文由董可倫首發于https://dongkelun.com,非商業轉載請注明作者及原創出處。商業轉載請聯系作者本人。 https://blog.csdn.net/dkl12/article/details/80246170

我的原創位址:

https://dongkelun.com/2018/03/21/sparkMysql/

1、基本概念和用法(摘自spark官方文檔中文版)

Spark SQL 還有一個能夠使用 JDBC 從其他資料庫讀取資料的資料源。當使用 JDBC 通路其它資料庫時,應該首選 JdbcRDD。這是因為結果是以資料框(DataFrame)傳回的,且這樣 Spark SQL操作輕松或便于連接配接其它資料源。因為這種 JDBC 資料源不需要使用者提供 ClassTag,是以它也更适合使用 Java 或 Python 操作。(注意,這與允許其它應用使用 Spark SQL 執行查詢操作的 Spark SQL JDBC 伺服器是不同的)。

使用 JDBC 通路特定資料庫時,需要在 spark classpath 上添加對應的 JDBC 驅動配置。例如,為了從 Spark Shell 連接配接 postgres,你需要運作如下指令 :

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar           

通過調用資料源API,遠端資料庫的表可以被加載為DataFrame 或Spark SQL臨時表。支援的參數有 :

屬性名 含義
url 要連接配接的 JDBC URL。
dbtable 要讀取的 JDBC 表。 注意,一個 SQL 查詢的 From 分語句中的任何有效表都能被使用。例如,既可以是完整表名,也可以是括号括起來的子查詢語句。
driver 用于連接配接 URL 的 JDBC 驅動的類名。
partitionColumn, lowerBound, upperBound, numPartitions 這幾個選項,若有一個被配置,則必須全部配置。它們描述了當從多個 worker 中并行的讀取表時,如何對它分區。partitionColumn 必須時所查詢表的一個數值字段。注意,lowerBound 和 upperBound 都隻是用于決定分區跨度的,而不是過濾表中的行。是以,表中的所有行将被分區并傳回。
fetchSize JDBC fetch size,決定每次讀取多少行資料。 預設将它設為較小值(如,Oracle上設為 10)有助于 JDBC 驅動上的性能優化。

2、scala代碼實作連接配接mysql

2.1 添加mysql 依賴

在sbt 配置檔案裡添加:

"mysql" % "mysql-connector-java" % "6.0.6"           

然後執行:

sbt eclipse           

2.2 建表并初始化資料

DROP TABLE IF EXISTS `USER_T`;  
CREATE TABLE `USER_T` (  
  `ID` INT(11) NOT NULL,  
  `USER_NAME` VARCHAR(40) NOT NULL,  
  PRIMARY KEY (`ID`)  
) ENGINE=INNODB  DEFAULT CHARSET=UTF8;             
INSERT  INTO `USER_T`(`ID`,`USER_NAME`) VALUES (1,'測試1');
INSERT  INTO `USER_T`(`ID`,`USER_NAME`) VALUES (2,'測試2');           

2.3 代碼

2.3.1 查詢

package com.dkl.leanring.spark.sql

import org.apache.spark.sql.SparkSession

/**
 * spark查詢mysql測試
 */
object MysqlQueryDemo {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("MysqlQueryDemo").master("local").getOrCreate()
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8")
      .option("dbtable", "USER_T")
      .option("user", "root")
      .option("password", "Root-123456")
      .load()
    jdbcDF.show()
  }
}           

2.3.2 插入資料

建立USER_T.csv,造幾條資料如圖:

(需将csv的編碼格式轉為utf-8,否則spark讀取中文亂碼,轉碼方法見:

https://jingyan.baidu.com/article/fea4511a092e53f7bb912528.html

package com.dkl.leanring.spark.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import java.util.Properties

/**
 * 從USER_T.csv讀取資料并插入的mysql表中
 */
object MysqlInsertDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("MysqlInsertDemo").master("local").getOrCreate()
    val df = spark.read.option("header", "true").csv("src/main/resources/scala/USER_T.csv")
    df.show()
    val url = "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8"
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "Root-123456")
    df.write.mode(SaveMode.Append).jdbc(url, "USER_T", prop)
  }
}
           

再查詢一次,就會發現表裡多了幾條資料

3、注意

上面的代碼在本地eclipse運作是沒有問題的,如果放在伺服器上用spark-submit送出的話,可能會報異常

java.sql.SQLException:No suitable driver           

解決方法是在代碼裡添加

mysql:

.option("driver", "com.mysql.jdbc.Driver")           

oracle:

.option("driver", "oracle.jdbc.driver.OracleDriver")           

具體可參考我的另一篇部落格:

spark-submit報錯:Exception in thread “main” java.sql.SQLException:No suitable driver