天天看点

sparksql读取mysql表的两种方式

一、mysql数据表及数据

mysql> use test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| stu            |
| walk_info      |
+----------------+
2 rows in set (0.00 sec)

mysql> select * from walk_info
    -> ;
+-----------+---------------------+-----------------+------------+
| user      | counttime           | walkplace       | newwalknum |
+-----------+---------------------+-----------------+------------+
| zhangSan  | 2020-07-02 13:44:43 | 操场西南门      |          1 |
| liSi      | 2020-07-02 13:44:45 | 操场西南门      |          3 |
| wangWu    | 2020-07-02 13:44:47 | 操场东北门      |          3 |
| xiaoQiang | 2020-07-02 13:44:49 | 操场西北门      |          2 |
| zhangSan  | 2020-07-02 13:44:51 | 操场西门        |          3 |
| liSi      | 2020-07-02 13:44:53 | 操场东南门      |          1 |
| wangWu    | 2020-07-02 13:44:55 | 操场东南门      |          3 |
| xiaoQiang | 2020-07-02 13:44:57 | 操场西门        |          1 |
| zhangSan  | 2020-07-02 13:44:59 | 操场北门        |          1 |
+-----------+---------------------+-----------------+------------+
9 rows in set (0.01 sec)
           

二、spark.read.jdbc()的方式

package com.cn.sparkSql

import java.util.Properties

import org.apache.spark.sql.SparkSession

object Sparksql_mysql {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("toMysql")
      .getOrCreate()
   spark.sparkContext.setLogLevel("WARN")
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123456")
    val url="jdbc:mysql://192.168.230.21:3306/test?characterEncoding=utf8&useSSL=true"
    spark.read.jdbc(url,"walk_info",properties)
      .select("user","counttime","walkplace","newwalknum")
      .where("newwalknum>1")
      .show()
    spark.stop()
  }
}
           

输出:

.........
20/07/16 13:41:01 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.230.1, 56387, None)
20/07/16 13:41:01 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.230.1, 56387, None)
+---------+-------------------+---------+----------+
|     user|          counttime|walkplace|newwalknum|
+---------+-------------------+---------+----------+
|     liSi|2020-07-02 13:44:45|    操场西南门|         3|
|   wangWu|2020-07-02 13:44:47|    操场东北门|         3|
|xiaoQiang|2020-07-02 13:44:49|    操场西北门|         2|
| zhangSan|2020-07-02 13:44:51|     操场西门|         3|
|   wangWu|2020-07-02 13:44:55|    操场东南门|         3|
+---------+-------------------+---------+----------+


Process finished with exit code 0
           

三、spark.read.format().option().load()的方式

package com.cn.sparkSql

import org.apache.spark.sql.{DataFrame, SparkSession}

object sparkSqlToMysql {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[1]")
      .appName("toMysql")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    val df: DataFrame = spark.read.format("jdbc")
    //.option("driver", "com.mysql.jdbc.Driver") //mysql5版本以下
      .option("driver", "com.mysql.cj.jdbc.Driver") //mysql5版本以上
      .option("url", "jdbc:mysql://192.168.230.21:3306/test?characterEncoding=utf8&useSSL=true")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "walk_info")
      .load()
     df.createOrReplaceTempView("walkinfo")
    val sql="select * from walkinfo where newwalknum>1"
    spark.sql(sql).show()
    spark.stop()
  }
}
           

输出:

20/07/16 13:42:02 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.230.1, 56493, None)
20/07/16 13:42:02 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.230.1, 56493, None)
+---------+-------------------+---------+----------+
|     user|          counttime|walkplace|newwalknum|
+---------+-------------------+---------+----------+
|     liSi|2020-07-02 13:44:45|    操场西南门|         3|
|   wangWu|2020-07-02 13:44:47|    操场东北门|         3|
|xiaoQiang|2020-07-02 13:44:49|    操场西北门|         2|
| zhangSan|2020-07-02 13:44:51|     操场西门|         3|
|   wangWu|2020-07-02 13:44:55|    操场东南门|         3|
+---------+-------------------+---------+----------+


Process finished with exit code 0