方式一:自定义Source和自定义Sink
缺点:每从Mysql取一条数据插入到HIve,就会生成一次MR,很慢。
package com.blog;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class StreamMysqlToHiveCommon {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
System.setProperty("HADOOP_USER_NAME", "root");
DataStream<List<String>> streamSource = env.addSource(new MyMysqlSource("select * from test.mysql_hive"));
streamSource.print();
streamSource.addSink(new MyHiveSink("insert into test.mysql_hive(id,name,age,money,todate,ts) values(?,?,?,?,?,?)"));
env.execute();
}
/**
* 自定义 Mysql Source Mysql建表
* <p>
* CREATE TABLE `mysql_hive` (
* `id` int(11) NULL DEFAULT NULL,
* `name` varchar(255) ,
* `age` int(11) NULL DEFAULT NULL,
* `money` double NULL DEFAULT NULL,
* `todate` date NULL DEFAULT NULL,
* `ts` timestamp NULL DEFAULT NULL
* ) ;
*/
public static class MyMysqlSource extends RichSourceFunction<List<String>> {
private String sql;
private Connection conn = null;
private PreparedStatement pstm = null;
private ResultSet rs = null;
public MyMysqlSource(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
String driver = "com.mysql.cj.jdbc.Driver";
String url = "jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai";
String username = "root";
String password = "A";
Class.forName(driver);
conn = DriverManager.getConnection(url, username, password);
pstm = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<List<String>> sourceContext) throws Exception {
rs = pstm.executeQuery();
int count = rs.getMetaData().getColumnCount();
ArrayList<String> bean = new ArrayList<>();
while (rs.next()) {
bean.clear();
for (int i = 1; i <= count; i++) {
bean.add(rs.getString(i));
}
sourceContext.collect(bean);
}
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (pstm != null) {
try {
pstm.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
/**
* 自定义 Hive Sink Hive的建表语句
* <p>
* CREATE TABLE `mysql_hive` (
* `id` int,
* `name` string ,
* `age` int,
* `money` double,
* `todate` date,
* `ts` timestamp
* ) ;
*/
public static class MyHiveSink extends RichSinkFunction<List<String>> {
private PreparedStatement pstm;
private Connection conn;
private String sql;
public MyHiveSink(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
conn = getConnection();
pstm = conn.prepareStatement(sql);
}
@Override
public void invoke(List<String> value, Context context) throws Exception {
for (int i = 1; i <= value.size(); i++) {
pstm.setString(i, value.get(i - 1));
}
pstm.executeUpdate();
}
@Override
public void close() {
if (pstm != null) {
try {
pstm.close();
} catch (SQLException e) {
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private static Connection getConnection() {
Connection conn = null;
try {
String jdbc = "org.apache.hive.jdbc.HiveDriver";
String url = "jdbc:hive2://192.168.88.108:10000/test";
String user = "root";
String password = "";
Class.forName(jdbc);
conn = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
System.out.println(e.getMessage());
}
return conn;
}
}
}
方式二:FlinkSQL使用JDBC和HiveCatalog
缺点:生成一个文件,Mysql表的字段必须大于等于Hive的字段。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class MysqlToHiveSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
System.setProperty("HADOOP_USER_NAME", "root");
String columns = "id INT ,name STRING, age INT,money DOUBLE,todate DATE,ts TIMESTAMP";
String mysql_source_table = "mysql_hive";
String flink_source_table = "flink_mysql_hive";
String base_sql = "CREATE TABLE %s (%s) " +
"WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.table' = '%s'," +
" 'connector.username' = 'root'," +
" 'connector.password' = 'A'" +
" )";
String source_ddl = String.format(base_sql, flink_source_table, columns, mysql_source_table);
tableEnv.executeSql(source_ddl);
Table dataTable = tableEnv.sqlQuery("select * from " + flink_source_table);
// hive catalog
String name = "hive-test";
//数据库
String defaultDatabase = "test";
// Hive的hive-site.xml文件所在路径
String hiveConfDir = "FlinkSQL/src/main/resources";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.useDatabase("test");
StatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsert("mysql_hive", dataTable);
statementSet.execute();
}
}
方式三:Flink使用StreamingFileSink,直接保存到HDFS
缺点:需要自定义实体类,以及数据量的大小和未达到阈值,会生成临时文件,HIve无法直接读取
import com.utils.JdbcUtils;
import com.utils.SpendTimeUtils;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.*;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MysqlToHDFS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Person> streamSource = env.addSource(new MyMysqlSource("select * from test.mysql_hive"));
System.setProperty("HADOOP_USER_NAME", "root");
// Hive表的HDFS路径
String outputBasePath = "hdfs://docker:9820/user/hive/warehouse/mysql_hive";
StreamingFileSink<Person> sink = StreamingFileSink.forRowFormat(new Path(outputBasePath), new SimpleStringEncoder<Person>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(30))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 128)
.build())
.withBucketAssigner(new CustomBucketAssigner("yyyyMMdd", ZoneId.of("Asia/Shanghai"), "dt="))
.withBucketCheckInterval(1)
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartSuffix("--")
.withPartPrefix("part")
.withPartSuffix(".ext")
.build())
.build();
streamSource.addSink(sink);
env.execute();
}
/**
* CREATE TABLE `mysql_hive` (
* `id` int(11) NULL DEFAULT NULL,
* `name` varchar(255) ,
* `age` int(11) NULL DEFAULT NULL,
* `money` double NULL DEFAULT NULL,
* `todate` date NULL DEFAULT NULL,
* `ts` timestamp NULL DEFAULT NULL
* ) ;
*/
public static class MyMysqlSource extends RichSourceFunction<Person> {
private String sql;
private Connection conn = null;
public MyMysqlSource(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
String driver = "com.mysql.cj.jdbc.Driver";
String url = "jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai";
String username = "root";
String password = "A";
Class.forName(driver);
conn = DriverManager.getConnection(url, username, password);
}
@Override
public void run(SourceContext<Person> sourceContext) throws Exception {
List<Person> personList = JdbcUtils.queryList(conn, sql, Person.class);
for (Person person : personList) {
sourceContext.collect(person);
}
}
/* @Override
public void run(SourceContext<Test> sourceContext) throws Exception {
List<Test> personList = JdbcUtils.queryList(conn, sql, Test.class);
for (Test person : personList) {
sourceContext.collect(person);
}
}*/
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
Thread.sleep(3000);
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public static class CustomBucketAssigner implements BucketAssigner<Person, String> {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd";
private final String formatString;
private final ZoneId zoneId;
private final String column;
private transient DateTimeFormatter dateTimeFormatter;
public CustomBucketAssigner() {
this(DEFAULT_FORMAT_STRING);
}
public CustomBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault(), "");
}
public CustomBucketAssigner(ZoneId zoneId) {
this(DEFAULT_FORMAT_STRING, zoneId, "");
}
public CustomBucketAssigner(String formatString, String column) {
this(formatString, ZoneId.systemDefault(), column);
}
public CustomBucketAssigner(String formatString, ZoneId zoneId) {
this(formatString, ZoneId.systemDefault(), "");
}
public CustomBucketAssigner(String formatString, ZoneId zoneId, String column) {
this.formatString = Preconditions.checkNotNull(formatString);
this.zoneId = Preconditions.checkNotNull(zoneId);
this.column = Preconditions.checkNotNull(column);
}
@Override
public String getBucketId(Person element, Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
return "";
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
@Override
public String toString() {
return "DateTimeBucketAssigner{"
+ "formatString='"
+ formatString
+ '\''
+ ", zoneId="
+ zoneId
+ '}';
}
}
public static class CustomRollingPolicy implements RollingPolicy<Person,String>{
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, Person element) throws IOException {
return false;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return false;
}
}
}
实体类
import java.sql.Date;
import java.sql.Timestamp;
public class Person {
private int id;
private String name;
private int age;
private double money;
private Date todate;
private Timestamp ts;
public Person(int id, String name, int age, double money, Date todate, Timestamp ts) {
this.id = id;
this.name = name;
this.age = age;
this.money = money;
this.todate = todate;
this.ts = ts;
}
public Person() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public double getMoney() {
return money;
}
public void setMoney(double money) {
this.money = money;
}
public Date getTodate() {
return todate;
}
public void setTodate(Date todate) {
this.todate = todate;
}
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
@Override
public String toString() {
return id + "\t" +
name + "\t" +
age + "\t" +
money + "\t" +
todate + "\t" +
ts;
}
}
工具类
import org.apache.commons.beanutils.BeanUtils;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class JdbcUtils {
public static <T> List<T> queryList(Connection connection, String querySql, Class<T> clz) throws SQLException, IllegalAccessException, InstantiationException, InvocationTargetException {
//创建集合用于存放查询结果数据
ArrayList<T> resultList = new ArrayList<>();
//预编译SQL
PreparedStatement preparedStatement = connection.prepareStatement(querySql);
//执行查询
ResultSet resultSet = preparedStatement.executeQuery();
//解析结果集resultSet
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
//创建泛型对象
T t = clz.newInstance();
//给泛型对象赋值
for (int i = 1; i <= columnCount; i++) {
//获取列名
String columnName = metaData.getColumnName(i);
//获取列值
Object value = resultSet.getObject(i);
BeanUtils.setProperty(t,columnName,value);
}
//将该对象添加至集合
resultList.add(t);
}
preparedStatement.close();
resultSet.close();
//返回结果集合
return resultList;
}
}