Catalog 提供了中繼資料資訊,例如資料庫、表、分區、視圖以及資料庫或其他外部系統中存儲的函數和資訊。
資料處理最關鍵的方面之一是管理中繼資料。 中繼資料可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。 中繼資料也可以是持久化的,例如 Hive Metastore 中的中繼資料。Catalog 提供了一個統一的API,用于管理中繼資料,并使其可以從 Table API 和 SQL 查詢語句中來通路。
下面以JdbcCatalog為例,介紹內建過程。
JdbcCatalog 使得使用者可以将 Flink 通過 JDBC 協定連接配接到關系資料庫。
參考 JdbcCatalog 文檔 擷取關于配置 JDBC catalog 的詳細資訊。
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/jdbc/
我使用的flink版本為:flink-1.17.1
- 添加依賴
示範連接配接mysql,需要添加JDBC SQL 連接配接器和mysql驅動的jar包。
去官網下載下傳對應版本的flink-connector-jdbc和mysql驅動jar包。
Maven dependency |
|
在連接配接到具體資料庫時,也需要對應的驅動依賴,目前支援的驅動如下:
Driver | Group Id | Artifact Id |
MySQL | mysql | mysql-connector-java |
下載下傳好jar後,直接把上面2個jar包 直接複制到 Flink 發行版的 /lib 檔案夾内。
- 重新開機flink叢集
flink安裝目錄/bin下執行stop-cluster.sh停止叢集,然後執行start-cluster.sh啟動本地叢集。
- SQL用戶端執行SQL
flink安裝目錄/bin下執行./sql-client.sh啟動SQL用戶端指令行界面,用來執行SQL。
建立并使用 MySQL Catalog:
JDBC catalog 支援以下參數:
- name:必填,catalog 的名稱。
- default-database:必填,預設要連接配接的資料庫。
- username:必填,Postgres/MySQL 賬戶的使用者名。
- password:必填,賬戶的密碼。
- base-url:必填,(不應該包含資料庫名)對于 Postgres Catalog base-url 應為 "jdbc:postgresql://<ip>:<port>" 的格式。對于 MySQL Catalog base-url 應為 "jdbc:mysql://<ip>:<port>" 的格式。
Flink SQL> CREATE CATALOG mysql_30_catalog WITH(
> 'type' = 'jdbc',
> 'default-database' = '庫',
> 'username' = '使用者名',
> 'password' = '*****',
> 'base-url' = 'jdbc:mysql://ip:3306'
> );
USE CATALOG my_catalog;
查詢資料:
Flink 中的表的完整路徑應該是 "<catalog>.<db>.`<schema.table>`"。如果指定了 schema,請注意需要轉義 <schema.table>。
Flink SQL> select * from mysql_30_catalog.庫名.`表名` limit 2;
大功告成,這樣就可以查的配置的catalog對應資料源的資料了。