相关的资源文件地址
链接:https://pan.baidu.com/s/1QGQIrVwg56g9eF16ERSLwQ
提取码:7v8n
写数据库表的参数
truncate:仅适用于write数据。当SaveMode.Overwrite启用时,此选项会truncate在MySQL中的表,而不是删除,再重建其现有的表。
这可以更有效,并且防止表元数据(例如,索引)被去除。但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。它默认为false。
createTableColumnTypes:一般用于配置特殊的分区或者数据库配置,比如 CREATE TABLE t (name string) ENGINE=InnoDB
createTableOptions:此选项允许在创建表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)时设置特定的数据库表和分区选项
batchsize:决定插入时的并发大小,默认1000
isolationLevel:事务隔离的等级
读数据库表的参数,可提高效率
Map<String, String> readOpts = new HashMap<>();
readOpts.put("numPartitions", "2");
readOpts.put("partitionColumn", "id");
readOpts.put("lowerBound", "3");
readOpts.put("upperBound", "5");
readOpts.put("fetchsize", "100");
通过指定 id 这个数字型的column作为分片键,并设置最大最小值和指定的分区数,可以对数据库的数据进行并发读取。
是不是numPartitions传入多少,分区数就一定是多少呢?其实不然,通过对源码的分析可知:
if upperBound-lowerBound >= numPartitions:
jdbcDF.rdd.partitions.size = numPartitions
else
jdbcDF.rdd.partitions.size = upperBound-lowerBound
拉取数据时,spark会按numPartitions均分最大最小ID,然后进行并发查询,并最终转换成RDD
fetchsize:每一个fetch,拉取多少数据量
JDBC读写mysql代码示例
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class test16 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.config("spark.driver.host", "localhost")
.appName("JdbcDatasourceTest")
.master("local")
.getOrCreate();
spark.sparkContext().setLogLevel("ERROR");
//1: 读取csv文件数据
//inferSchema:让框架推断csv文件的数据类型,慢,推荐用自定义Schema来进行代替
Dataset<Row> df = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv(Utils.BASE_PATH + "/jdbc_demo_data.csv");
df.show();
String mysqlUrl = "jdbc:mysql://localhost:3306/zhazha";
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "123456");
//写数据库表
df.write().mode(SaveMode.Overwrite).jdbc(mysqlUrl, "person", properties);
//读数据库表
Dataset<Row> jdbcDFWithoutOpt = spark.read().jdbc(mysqlUrl, "person", properties);
jdbcDFWithoutOpt.show();
//数据集 jdbcDFWithoutOpt
/*
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1| dog1| 23|
| 2| dog2| 22|
| 3| dog3| 33|
| 4| dog4| 45|
| 5| dog5| 34|
| 6| dog6| 99|
| 7| dog7| 44|
| 8| dog8| 5|
| 9| dog9| 3|
| 10|dog10| 4|
| 11|dog11| 5|
| 12|dog12| 26|
| 13|dog13| 23|
+---+-----+---+
*/
//写数据库表的参数
Map<String, String> writeOpts = new HashMap<>();
writeOpts.put("truncate", "false");
writeOpts.put("createTableColumnTypes", "age long");
writeOpts.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8");
//writeOpts.put("createTableColumnTypes", "age varchar(20)");
//writeOpts.put("createTableOptions", "ENGINE=MyISAM DEFAULT CHARSET=gb2312");
writeOpts.put("batchsize", "100");
writeOpts.put("isolationLevel", "READ_UNCOMMITTED");
df.write()
.mode(SaveMode.Overwrite)
.options(writeOpts)
.jdbc(mysqlUrl, "person_2", properties);
//读数据库表
Map<String, String> readOpts = new HashMap<>();
readOpts.put("numPartitions", "2");
readOpts.put("partitionColumn", "id");
readOpts.put("lowerBound", "3");
readOpts.put("upperBound", "5");
readOpts.put("fetchsize", "100");
Dataset<Row> jdbcDF = spark.read()
.options(readOpts).jdbc(mysqlUrl, "person_2", properties);
jdbcDF.show();
//同 数据集 jdbcDFWithoutOpt 结果一样
//并行度为2,对应于numPartitions,可提高效率
System.out.println(jdbcDF.toJavaRDD().getNumPartitions());
//通过指定 id 这个数字型的column作为分片键,并设置最大最小值和指定的分区数,可以对数据库的数据进行并发读取。
spark.read()
.jdbc(mysqlUrl, "person",
"id", 3, 5, 2,
properties).show();
//同 数据集 jdbcDFWithoutOpt 结果一样
//并行度为2,对应于numPartitions,可提高效率
System.out.println(jdbcDF.toJavaRDD().getNumPartitions());
//读取时进行筛选
spark.read()
.jdbc(mysqlUrl, "person",
new String[]{"id > 2 and id < 5", "id >= 5 and id < 8"},
properties).show();
/*
+---+----+---+
| id|name|age|
+---+----+---+
| 3|dog3| 33|
| 4|dog4| 45|
| 5|dog5| 34|
| 6|dog6| 99|
| 7|dog7| 44|
+---+----+---+
*/
spark.stop();
}
}