天天看点

Flink使用JdbcSink下沉数据到数据库MySQL

Flink官网提供了JdbcSink的功能,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...)
        .addSink(JdbcSink.sink(
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(getDbMetadata().getUrl())
                        .withDriverName(getDbMetadata().getDriverClass())
                        .build()));
env.execute();
           

但是,在下才疏学浅,中间那个…看的在下一脸懵逼。

自己从网上查找资料发现只有创建一个RichSinkFunction的方法,在下就是想用官网的那个方法。于是经过在gitee寻找依赖,在代码中寻找关键字JdbcSink,终于找到了,Apache Flink的整个项目的代码。起初我还不知道,发现这个demo项目怎么这么大,还有几个依赖下载不下来。

在Flink的测试类中找到了JdbcSink相关的Java测试代码。

然后经过在下的不懈努力,终于写成了一个scala的测试类。如下:

import org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder
import org.apache.flink.connector.jdbc.{JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

import java.sql.{PreparedStatement, Types}

case class TestEntry(id: Int, title: String, author: String, price: Double, qty: Int)

object JDBCSinkTestJob {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env
      .fromElements(TestEntry(1001, "Java public for dummies", "Tan Ah Teck", 11.11, 11),
        TestEntry(1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22),
        TestEntry(1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33),
        TestEntry(1004, "A Cup of Java", "Kumar", 44.44, 44),
        TestEntry(1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55),
        TestEntry(1006, "A Teaspoon of Java 1.4", "Kevin Jones", 66.66, 66),
        TestEntry(1007, "A Teaspoon of Java 1.5", "Kevin Jones", 77.77, 77),
        TestEntry(1008, "A Teaspoon of Java 1.6", "Kevin Jones", 88.88, 88),
        TestEntry(1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99),
        TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), 99, 1010))
      .addSink(
        JdbcSink.sink(
          String.format("insert into %s (id, title, author, price, qty) values (?,?,?,?,?)", "books"),
          new JdbcStatementBuilder[TestEntry] {
            override def accept(ps: PreparedStatement, t: TestEntry): Unit = {
              ps.setInt(1, t.id);
              ps.setString(2, t.title);
              ps.setString(3, t.author);
              if (t.price == null) {
                ps.setNull(4, Types.DOUBLE);
              } else {
                ps.setDouble(4, t.price);
              }
              ps.setInt(5, t.qty);
            }
          },
          new JdbcConnectionOptionsBuilder()
            .withUrl("jdbc:mysql://hadoop10:3306/test?useUnicode=true&characterEncoding=UTF-8")
            .withUsername("root")
            .withPassword("123456")
            .withDriverName("com.mysql.jdbc.Driver")
            .build()
        )
      )
    env.execute()
  }

}

           

这个测试类,把测试代码的变量给提取了出来。

写代码的过程还需要导入MySQL需要的依赖,自行搜索。

测试结果

Flink使用JdbcSink下沉数据到数据库MySQL

Perfect,在下非常的高兴,决定打两把游戏奖赏一下自己。

参考资料:

gitee中的Flink项目代码

继续阅读