天天看点

FlinkSQL使用Catalog访问外部数据源

FlinkSQL使用Catalog访问外部数据源

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

下面以JdbcCatalog为例,介绍集成过程。

JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。

参考 JdbcCatalog 文档 获取关于配置 JDBC catalog 的详细信息。

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/jdbc/

我使用的flink版本为:flink-1.17.1

  1. 添加依赖

演示连接mysql,需要添加JDBC SQL 连接器和mysql驱动的jar包。

去官网下载对应版本的flink-connector-jdbc和mysql驱动jar包。

Maven dependency
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>3.1.0-1.17</version>
</dependency>           

在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:

Driver Group Id Artifact Id
MySQL mysql mysql-connector-java

下载好jar后,直接把上面2个jar包 直接复制到 Flink 发行版的 /lib 文件夹内。

FlinkSQL使用Catalog访问外部数据源
  1. 重启flink集群

flink安装目录/bin下执行stop-cluster.sh停止集群,然后执行start-cluster.sh启动本地集群。

  1. SQL客户端执行SQL

flink安装目录/bin下执行./sql-client.sh启动SQL客户端命令行界面,用来执行SQL。

创建并使用 MySQL Catalog:

JDBC catalog 支持以下参数:

  • name:必填,catalog 的名称。
  • default-database:必填,默认要连接的数据库。
  • username:必填,Postgres/MySQL 账户的用户名。
  • password:必填,账户的密码。
  • base-url:必填,(不应该包含数据库名)对于 Postgres Catalog base-url 应为 "jdbc:postgresql://<ip>:<port>" 的格式。对于 MySQL Catalog base-url 应为 "jdbc:mysql://<ip>:<port>" 的格式。

Flink SQL> CREATE CATALOG mysql_30_catalog WITH(

> 'type' = 'jdbc',

> 'default-database' = '库',

> 'username' = '用户名',

> 'password' = '*****',

> 'base-url' = 'jdbc:mysql://ip:3306'

> );

USE CATALOG my_catalog;           

查询数据:

Flink 中的表的完整路径应该是 "<catalog>.<db>.`<schema.table>`"。如果指定了 schema,请注意需要转义 <schema.table>。

Flink SQL> select * from mysql_30_catalog.库名.`表名` limit 2;
FlinkSQL使用Catalog访问外部数据源

大功告成,这样就可以查的配置的catalog对应数据源的数据了。

继续阅读