天天看点

三种方式实现Flink同步Mysql数据到Hive(不开启Binlog)

作者:极目馆主

方式一:自定义Source和自定义Sink

缺点:每从Mysql取一条数据插入到HIve,就会生成一次MR,很慢。

package com.blog;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class StreamMysqlToHiveCommon {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        System.setProperty("HADOOP_USER_NAME", "root");
        DataStream<List<String>> streamSource = env.addSource(new MyMysqlSource("select * from test.mysql_hive"));
        streamSource.print();
        streamSource.addSink(new MyHiveSink("insert into test.mysql_hive(id,name,age,money,todate,ts) values(?,?,?,?,?,?)"));
        env.execute();


    }

    /**
     * 自定义 Mysql Source Mysql建表
     * <p>
     * CREATE TABLE `mysql_hive`  (
     * `id` int(11) NULL DEFAULT NULL,
     * `name` varchar(255) ,
     * `age` int(11) NULL DEFAULT NULL,
     * `money` double NULL DEFAULT NULL,
     * `todate` date NULL DEFAULT NULL,
     * `ts` timestamp NULL DEFAULT NULL
     * ) ;
     */
    public static class MyMysqlSource extends RichSourceFunction<List<String>> {
        private String sql;
        private Connection conn = null;
        private PreparedStatement pstm = null;
        private ResultSet rs = null;

        public MyMysqlSource(String sql) {
            this.sql = sql;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            String driver = "com.mysql.cj.jdbc.Driver";
            String url = "jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai";
            String username = "root";
            String password = "A";
            Class.forName(driver);
            conn = DriverManager.getConnection(url, username, password);
            pstm = conn.prepareStatement(sql);
        }

        @Override
        public void run(SourceContext<List<String>> sourceContext) throws Exception {
            rs = pstm.executeQuery();
            int count = rs.getMetaData().getColumnCount();
            ArrayList<String> bean = new ArrayList<>();
            while (rs.next()) {
                bean.clear();
                for (int i = 1; i <= count; i++) {
                    bean.add(rs.getString(i));
                }
                sourceContext.collect(bean);
            }

        }

        @Override
        public void cancel() {

        }

        @Override
        public void close() throws Exception {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (pstm != null) {
                try {
                    pstm.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 自定义 Hive Sink  Hive的建表语句
     * <p>
     * CREATE TABLE `mysql_hive`  (
     * `id` int,
     * `name` string ,
     * `age` int,
     * `money` double,
     * `todate` date,
     * `ts` timestamp
     * ) ;
     */
    public static class MyHiveSink extends RichSinkFunction<List<String>> {
        private PreparedStatement pstm;
        private Connection conn;
        private String sql;

        public MyHiveSink(String sql) {
            this.sql = sql;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            conn = getConnection();
            pstm = conn.prepareStatement(sql);

        }

        @Override
        public void invoke(List<String> value, Context context) throws Exception {
            for (int i = 1; i <= value.size(); i++) {
                pstm.setString(i, value.get(i - 1));
            }
            pstm.executeUpdate();

        }

        @Override
        public void close() {
            if (pstm != null) {
                try {
                    pstm.close();
                } catch (SQLException e) {

                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

        private static Connection getConnection() {
            Connection conn = null;
            try {
                String jdbc = "org.apache.hive.jdbc.HiveDriver";
                String url = "jdbc:hive2://192.168.88.108:10000/test";
                String user = "root";
                String password = "";
                Class.forName(jdbc);
                conn = DriverManager.getConnection(url, user, password);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
            return conn;
        }
    }
}

           

方式二:FlinkSQL使用JDBC和HiveCatalog

缺点:生成一个文件,Mysql表的字段必须大于等于Hive的字段。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class MysqlToHiveSql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        System.setProperty("HADOOP_USER_NAME", "root");
        String columns = "id INT ,name STRING, age INT,money DOUBLE,todate DATE,ts TIMESTAMP";
        String mysql_source_table = "mysql_hive";
        String flink_source_table = "flink_mysql_hive";
        String base_sql = "CREATE TABLE %s (%s) " +
                "WITH (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai'," +
                "'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
                "'connector.table' = '%s'," +
                " 'connector.username' = 'root'," +
                " 'connector.password' = 'A'" +
                " )";
        String source_ddl = String.format(base_sql, flink_source_table, columns, mysql_source_table);
        tableEnv.executeSql(source_ddl);
        Table dataTable = tableEnv.sqlQuery("select * from " + flink_source_table);
        // hive catalog
        String name = "hive-test";
        //数据库
        String defaultDatabase = "test";
        // Hive的hive-site.xml文件所在路径
        String hiveConfDir = "FlinkSQL/src/main/resources";
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog(name, hive);
        tableEnv.useCatalog(name);
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.useDatabase("test");

        StatementSet statementSet = tableEnv.createStatementSet();
        statementSet.addInsert("mysql_hive", dataTable);
        statementSet.execute();
    }
}
           

方式三:Flink使用StreamingFileSink,直接保存到HDFS

缺点:需要自定义实体类,以及数据量的大小和未达到阈值,会生成临时文件,HIve无法直接读取

import com.utils.JdbcUtils;
import com.utils.SpendTimeUtils;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.*;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MysqlToHDFS {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Person> streamSource = env.addSource(new MyMysqlSource("select * from test.mysql_hive"));
        System.setProperty("HADOOP_USER_NAME", "root");
        // Hive表的HDFS路径
        String outputBasePath = "hdfs://docker:9820/user/hive/warehouse/mysql_hive";


        StreamingFileSink<Person> sink = StreamingFileSink.forRowFormat(new Path(outputBasePath), new SimpleStringEncoder<Person>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(30))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 128)
                                .build())
                .withBucketAssigner(new CustomBucketAssigner("yyyyMMdd", ZoneId.of("Asia/Shanghai"), "dt="))
                .withBucketCheckInterval(1)
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartSuffix("--")
                                .withPartPrefix("part")
                                .withPartSuffix(".ext")
                                .build())
                .build();
        streamSource.addSink(sink);
        env.execute();
    }

    /**
     * CREATE TABLE `mysql_hive`  (
     * `id` int(11) NULL DEFAULT NULL,
     * `name` varchar(255) ,
     * `age` int(11) NULL DEFAULT NULL,
     * `money` double NULL DEFAULT NULL,
     * `todate` date NULL DEFAULT NULL,
     * `ts` timestamp NULL DEFAULT NULL
     * ) ;
     */
    public static class MyMysqlSource extends RichSourceFunction<Person> {
        private String sql;
        private Connection conn = null;
        public MyMysqlSource(String sql) {
            this.sql = sql;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            String driver = "com.mysql.cj.jdbc.Driver";
            String url = "jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai";
            String username = "root";
            String password = "A";
            Class.forName(driver);
            conn = DriverManager.getConnection(url, username, password);
        }

        @Override
        public void run(SourceContext<Person> sourceContext) throws Exception {
            List<Person> personList = JdbcUtils.queryList(conn, sql, Person.class);
            for (Person person : personList) {
                sourceContext.collect(person);
            }
        }

       /* @Override
        public void run(SourceContext<Test> sourceContext) throws Exception {
            List<Test> personList = JdbcUtils.queryList(conn, sql, Test.class);
            for (Test person : personList) {
                sourceContext.collect(person);
            }

        }*/

        @Override
        public void cancel() {
        }

        @Override
        public void close() throws Exception {
            Thread.sleep(3000);
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public static class CustomBucketAssigner implements BucketAssigner<Person, String> {
        private static final long serialVersionUID = 1L;
        private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd";
        private final String formatString;
        private final ZoneId zoneId;
        private final String column;
        private transient DateTimeFormatter dateTimeFormatter;
        public CustomBucketAssigner() {
            this(DEFAULT_FORMAT_STRING);
        }
        public CustomBucketAssigner(String formatString) {
            this(formatString, ZoneId.systemDefault(), "");
        }
        public CustomBucketAssigner(ZoneId zoneId) {
            this(DEFAULT_FORMAT_STRING, zoneId, "");
        }
        public CustomBucketAssigner(String formatString, String column) {
            this(formatString, ZoneId.systemDefault(), column);
        }
        public CustomBucketAssigner(String formatString, ZoneId zoneId) {
            this(formatString, ZoneId.systemDefault(), "");
        }
        public CustomBucketAssigner(String formatString, ZoneId zoneId, String column) {
            this.formatString = Preconditions.checkNotNull(formatString);
            this.zoneId = Preconditions.checkNotNull(zoneId);
            this.column = Preconditions.checkNotNull(column);
        }

        @Override
        public String getBucketId(Person element, Context context) {
            if (dateTimeFormatter == null) {
                dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
            }
            return "";
        }
        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
        @Override
        public String toString() {
            return "DateTimeBucketAssigner{"
                    + "formatString='"
                    + formatString
                    + '\''
                    + ", zoneId="
                    + zoneId
                    + '}';
        }
    }

    public static  class CustomRollingPolicy implements RollingPolicy<Person,String>{
        @Override
        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
            return false;
        }

        @Override
        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, Person element) throws IOException {
            return false;
        }

        @Override
        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
            return false;
        }
    }
}

           

实体类

import java.sql.Date;
import java.sql.Timestamp;
public class Person  {
    private int id;
    private String name;
    private int age;
    private double money;
    private Date todate;
    private Timestamp ts;
    public Person(int id, String name, int age, double money, Date todate, Timestamp ts) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.money = money;
        this.todate = todate;
        this.ts = ts;
    }
    public Person() {
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public double getMoney() {
        return money;
    }
    public void setMoney(double money) {
        this.money = money;
    }
    public Date getTodate() {
        return todate;
    }
    public void setTodate(Date todate) {
        this.todate = todate;
    }
    public Timestamp getTs() {
        return ts;
    }
    public void setTs(Timestamp ts) {
        this.ts = ts;
    }
    @Override
    public String toString() {
        return id + "\t" +
                name + "\t" +
                age + "\t" +
                money + "\t" +
                todate + "\t" +
                ts;
    }
}
           

工具类

import org.apache.commons.beanutils.BeanUtils;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class JdbcUtils {
    public static <T> List<T> queryList(Connection connection, String querySql, Class<T> clz) throws SQLException, IllegalAccessException, InstantiationException, InvocationTargetException {
        //创建集合用于存放查询结果数据
        ArrayList<T> resultList = new ArrayList<>();
        //预编译SQL
        PreparedStatement preparedStatement = connection.prepareStatement(querySql);
        //执行查询
        ResultSet resultSet = preparedStatement.executeQuery();
        //解析结果集resultSet
        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            //创建泛型对象
            T t = clz.newInstance();
            //给泛型对象赋值
            for (int i = 1; i <= columnCount; i++) {
                //获取列名
                String columnName = metaData.getColumnName(i);
                //获取列值
                Object value = resultSet.getObject(i);
                BeanUtils.setProperty(t,columnName,value);
            }
            //将该对象添加至集合
            resultList.add(t);

        }
        preparedStatement.close();
        resultSet.close();
        //返回结果集合
        return resultList;

    }

}

           

继续阅读