天天看点

SPARK-SQL 通过JDBC读写Mysql数据库

相关的资源文件地址

链接:https://pan.baidu.com/s/1QGQIrVwg56g9eF16ERSLwQ 
提取码:7v8n
           

写数据库表的参数

truncate:仅适用于write数据。当SaveMode.Overwrite启用时,此选项会truncate在MySQL中的表,而不是删除,再重建其现有的表。

这可以更有效,并且防止表元数据(例如,索引)被去除。但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。它默认为false。

createTableColumnTypes:一般用于配置特殊的分区或者数据库配置,比如 CREATE TABLE t (name string) ENGINE=InnoDB

createTableOptions:此选项允许在创建表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)时设置特定的数据库表和分区选项

batchsize:决定插入时的并发大小,默认1000

isolationLevel:事务隔离的等级

读数据库表的参数,可提高效率

Map<String, String> readOpts = new HashMap<>();
readOpts.put("numPartitions", "2");
readOpts.put("partitionColumn", "id");
readOpts.put("lowerBound", "3");
readOpts.put("upperBound", "5");
readOpts.put("fetchsize", "100");
           

通过指定 id 这个数字型的column作为分片键,并设置最大最小值和指定的分区数,可以对数据库的数据进行并发读取。

是不是numPartitions传入多少,分区数就一定是多少呢?其实不然,通过对源码的分析可知:

if upperBound-lowerBound >= numPartitions:
     jdbcDF.rdd.partitions.size = numPartitions
else
     jdbcDF.rdd.partitions.size = upperBound-lowerBound
           

拉取数据时,spark会按numPartitions均分最大最小ID,然后进行并发查询,并最终转换成RDD

fetchsize:每一个fetch,拉取多少数据量

JDBC读写mysql代码示例

import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class test16 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .config("spark.driver.host", "localhost")
                .appName("JdbcDatasourceTest")
                .master("local")
                .getOrCreate();

        spark.sparkContext().setLogLevel("ERROR");

        //1: 读取csv文件数据
        //inferSchema:让框架推断csv文件的数据类型,慢,推荐用自定义Schema来进行代替
        Dataset<Row> df = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(Utils.BASE_PATH + "/jdbc_demo_data.csv");
        df.show();

        String mysqlUrl = "jdbc:mysql://localhost:3306/zhazha";

        Properties properties = new Properties();
        properties.put("user", "root");
        properties.put("password", "123456");
        //写数据库表
        df.write().mode(SaveMode.Overwrite).jdbc(mysqlUrl, "person", properties);
        //读数据库表
        Dataset<Row> jdbcDFWithoutOpt = spark.read().jdbc(mysqlUrl, "person", properties);
        jdbcDFWithoutOpt.show();
        //数据集 jdbcDFWithoutOpt
        /*
        +---+-----+---+
        | id| name|age|
        +---+-----+---+
        |  1| dog1| 23|
        |  2| dog2| 22|
        |  3| dog3| 33|
        |  4| dog4| 45|
        |  5| dog5| 34|
        |  6| dog6| 99|
        |  7| dog7| 44|
        |  8| dog8|  5|
        |  9| dog9|  3|
        | 10|dog10|  4|
        | 11|dog11|  5|
        | 12|dog12| 26|
        | 13|dog13| 23|
        +---+-----+---+
         */

        //写数据库表的参数
        Map<String, String> writeOpts = new HashMap<>();
        writeOpts.put("truncate", "false");
        writeOpts.put("createTableColumnTypes", "age long");
        writeOpts.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8");
        //writeOpts.put("createTableColumnTypes", "age varchar(20)");
        //writeOpts.put("createTableOptions", "ENGINE=MyISAM DEFAULT CHARSET=gb2312");
        writeOpts.put("batchsize", "100");
        writeOpts.put("isolationLevel", "READ_UNCOMMITTED");

        df.write()
                .mode(SaveMode.Overwrite)
                .options(writeOpts)
                .jdbc(mysqlUrl, "person_2", properties);

        //读数据库表
        Map<String, String> readOpts = new HashMap<>();
        readOpts.put("numPartitions", "2");
        readOpts.put("partitionColumn", "id");
        readOpts.put("lowerBound", "3");
        readOpts.put("upperBound", "5");
        readOpts.put("fetchsize", "100");
        Dataset<Row> jdbcDF = spark.read()
                .options(readOpts).jdbc(mysqlUrl, "person_2", properties);
        jdbcDF.show();
        //同 数据集 jdbcDFWithoutOpt 结果一样

        //并行度为2,对应于numPartitions,可提高效率
        System.out.println(jdbcDF.toJavaRDD().getNumPartitions());

        //通过指定 id 这个数字型的column作为分片键,并设置最大最小值和指定的分区数,可以对数据库的数据进行并发读取。
        spark.read()
                .jdbc(mysqlUrl, "person",
                        "id", 3, 5, 2,
                        properties).show();
        //同 数据集 jdbcDFWithoutOpt 结果一样

        //并行度为2,对应于numPartitions,可提高效率
        System.out.println(jdbcDF.toJavaRDD().getNumPartitions());

        //读取时进行筛选
        spark.read()
                .jdbc(mysqlUrl, "person",
                        new String[]{"id > 2 and id < 5", "id >= 5 and id < 8"},
                        properties).show();
        /*
        +---+----+---+
        | id|name|age|
        +---+----+---+
        |  3|dog3| 33|
        |  4|dog4| 45|
        |  5|dog5| 34|
        |  6|dog6| 99|
        |  7|dog7| 44|
        +---+----+---+
         */

        spark.stop();
    }
}
           

继续阅读