Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。
下面以JdbcCatalog为例,介绍集成过程。
JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。
参考 JdbcCatalog 文档 获取关于配置 JDBC catalog 的详细信息。
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/jdbc/
我使用的flink版本为:flink-1.17.1
- 添加依赖
演示连接mysql,需要添加JDBC SQL 连接器和mysql驱动的jar包。
去官网下载对应版本的flink-connector-jdbc和mysql驱动jar包。
Maven dependency |
|
在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
Driver | Group Id | Artifact Id |
MySQL | mysql | mysql-connector-java |
下载好jar后,直接把上面2个jar包 直接复制到 Flink 发行版的 /lib 文件夹内。
- 重启flink集群
flink安装目录/bin下执行stop-cluster.sh停止集群,然后执行start-cluster.sh启动本地集群。
- SQL客户端执行SQL
flink安装目录/bin下执行./sql-client.sh启动SQL客户端命令行界面,用来执行SQL。
创建并使用 MySQL Catalog:
JDBC catalog 支持以下参数:
- name:必填,catalog 的名称。
- default-database:必填,默认要连接的数据库。
- username:必填,Postgres/MySQL 账户的用户名。
- password:必填,账户的密码。
- base-url:必填,(不应该包含数据库名)对于 Postgres Catalog base-url 应为 "jdbc:postgresql://<ip>:<port>" 的格式。对于 MySQL Catalog base-url 应为 "jdbc:mysql://<ip>:<port>" 的格式。
Flink SQL> CREATE CATALOG mysql_30_catalog WITH(
> 'type' = 'jdbc',
> 'default-database' = '库',
> 'username' = '用户名',
> 'password' = '*****',
> 'base-url' = 'jdbc:mysql://ip:3306'
> );
USE CATALOG my_catalog;
查询数据:
Flink 中的表的完整路径应该是 "<catalog>.<db>.`<schema.table>`"。如果指定了 schema,请注意需要转义 <schema.table>。
Flink SQL> select * from mysql_30_catalog.库名.`表名` limit 2;
大功告成,这样就可以查的配置的catalog对应数据源的数据了。