适用範圍:flink1.11.0及之後版本,包名為flink-connector-jdbc
- 編輯pom.xml中的
小節添加依賴。<dependencies />
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-->flink-connector-jdbc flink版本需在1.11.0之後<!--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-->clickhouse jdbc連接配接<!--> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> <!-->mysql jdbc連接配接<!--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
- Java實作demo
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.Collector; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Arrays; public class JdbcSink_Test { public static void main(String[] args) throws Exception { //構造執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //source,數組形式擷取 DataStream<String> input = env.fromCollection(Arrays.asList("10001 tom", "10002 jane", "10003 jack")); //transform DataStream<Tuple2<Integer, String>> result = input.process(new ProcessFunction<String, Tuple2<Integer, String>>() { @Override public void processElement(String s, Context context, Collector<Tuple2<Integer, String>> collector) throws Exception { String[] user = s.split(" "); collector.collect(new Tuple2<>(Integer.parseInt(user[0]), user[1])); } }); //sink,采用JdbcSink工具類 (flink 1.11 版本提供) /** * sink方法需要依次傳入四個參數:1.預編譯的SQL字元串 2.實作JdbcStatementBuilder的對象,用于對預編譯的SQL進行傳值 * 3.JDBC執行器,可以設定批量寫入等參數,負責具體執行寫入操作 4.JDBC連接配接設定 包含連接配接驅動、URL、使用者名、密碼 * 說明:不同的資料庫在jdbc連接配接設定部分傳入不同的驅動、url等參數即可 */ String sql = "insert into user(user_id,name) values(?,?)"; SinkFunction<Tuple2<Integer, String>> sink = JdbcSink.sink(sql, new MysqlBuilder(), JdbcExecutionOptions.builder().withBatchSize(50).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("com.mysql.jdbc.Driver") // clickhouse使用.withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false") //clickhouse使用 .withUrl("jdbc:clickhouse://localhost/test") .withUsername("root") .withPassword("root").build()); result.addSink(sink); env.execute(); } //自定義StatementBuilder 實作accept方法實作對預編譯的SQL進行傳值 public static class MysqlBuilder implements JdbcStatementBuilder<Tuple2<Integer,String>>{ @Override public void accept(PreparedStatement preparedStatement, Tuple2<Integer, String> user) throws SQLException { preparedStatement.setInt(1,user.f0); preparedStatement.setString(2,user.f1); } } }