天天看點

使用flink-connector-jdbc實作flink寫入clickhouse、mysql等

适用範圍:flink1.11.0及之後版本,包名為flink-connector-jdbc

  1. 編輯pom.xml中的

    <dependencies />

    小節添加依賴。
    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    			<!-->flink-connector-jdbc flink版本需在1.11.0之後<!-->
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
              <version>${flink.version}</version>
          </dependency>
    
     				 <!-->clickhouse jdbc連接配接<!-->
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
    
            <!-->mysql jdbc連接配接<!-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.44</version>
            </dependency>
               
    1. Java實作demo
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.util.Collector;
    
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.Arrays;
    
    public class JdbcSink_Test {
        public static void main(String[] args) throws Exception {
            //構造執行環境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //source,數組形式擷取
            DataStream<String> input = env.fromCollection(Arrays.asList("10001 tom", "10002 jane", "10003 jack"));
    
            //transform
            DataStream<Tuple2<Integer, String>> result = input.process(new ProcessFunction<String, Tuple2<Integer, String>>() {
                @Override
                public void processElement(String s, Context context, Collector<Tuple2<Integer, String>> collector) throws Exception {
                    String[] user = s.split(" ");
                    collector.collect(new Tuple2<>(Integer.parseInt(user[0]), user[1]));
                }
            });
    
            //sink,采用JdbcSink工具類 (flink 1.11 版本提供)
            /**
             * sink方法需要依次傳入四個參數:1.預編譯的SQL字元串 2.實作JdbcStatementBuilder的對象,用于對預編譯的SQL進行傳值
             *                        3.JDBC執行器,可以設定批量寫入等參數,負責具體執行寫入操作 4.JDBC連接配接設定 包含連接配接驅動、URL、使用者名、密碼
             * 說明:不同的資料庫在jdbc連接配接設定部分傳入不同的驅動、url等參數即可
             */
            String sql = "insert into user(user_id,name) values(?,?)";
            SinkFunction<Tuple2<Integer, String>> sink = JdbcSink.sink(sql, new MysqlBuilder(),
                    JdbcExecutionOptions.builder().withBatchSize(50).build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withDriverName("com.mysql.jdbc.Driver") // clickhouse使用.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                            .withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false") //clickhouse使用 .withUrl("jdbc:clickhouse://localhost/test")
                            .withUsername("root")
                            .withPassword("root").build());
    
            result.addSink(sink);
            env.execute();
        }
    
        //自定義StatementBuilder 實作accept方法實作對預編譯的SQL進行傳值
        public static class MysqlBuilder implements JdbcStatementBuilder<Tuple2<Integer,String>>{
            @Override
            public void accept(PreparedStatement preparedStatement, Tuple2<Integer, String> user) throws SQLException {
                preparedStatement.setInt(1,user.f0);
                preparedStatement.setString(2,user.f1);
            }
        }
    }