天天看點

FlinkSQL使用Catalog通路外部資料源

FlinkSQL使用Catalog通路外部資料源

Catalog 提供了中繼資料資訊,例如資料庫、表、分區、視圖以及資料庫或其他外部系統中存儲的函數和資訊。

資料處理最關鍵的方面之一是管理中繼資料。 中繼資料可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。 中繼資料也可以是持久化的,例如 Hive Metastore 中的中繼資料。Catalog 提供了一個統一的API,用于管理中繼資料,并使其可以從 Table API 和 SQL 查詢語句中來通路。

下面以JdbcCatalog為例,介紹內建過程。

JdbcCatalog 使得使用者可以将 Flink 通過 JDBC 協定連接配接到關系資料庫。

參考 JdbcCatalog 文檔 擷取關于配置 JDBC catalog 的詳細資訊。

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/jdbc/

我使用的flink版本為:flink-1.17.1

  1. 添加依賴

示範連接配接mysql,需要添加JDBC SQL 連接配接器和mysql驅動的jar包。

去官網下載下傳對應版本的flink-connector-jdbc和mysql驅動jar包。

Maven dependency
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>3.1.0-1.17</version>
</dependency>           

在連接配接到具體資料庫時,也需要對應的驅動依賴,目前支援的驅動如下:

Driver Group Id Artifact Id
MySQL mysql mysql-connector-java

下載下傳好jar後,直接把上面2個jar包 直接複制到 Flink 發行版的 /lib 檔案夾内。

FlinkSQL使用Catalog通路外部資料源
  1. 重新開機flink叢集

flink安裝目錄/bin下執行stop-cluster.sh停止叢集,然後執行start-cluster.sh啟動本地叢集。

  1. SQL用戶端執行SQL

flink安裝目錄/bin下執行./sql-client.sh啟動SQL用戶端指令行界面,用來執行SQL。

建立并使用 MySQL Catalog:

JDBC catalog 支援以下參數:

  • name:必填,catalog 的名稱。
  • default-database:必填,預設要連接配接的資料庫。
  • username:必填,Postgres/MySQL 賬戶的使用者名。
  • password:必填,賬戶的密碼。
  • base-url:必填,(不應該包含資料庫名)對于 Postgres Catalog base-url 應為 "jdbc:postgresql://<ip>:<port>" 的格式。對于 MySQL Catalog base-url 應為 "jdbc:mysql://<ip>:<port>" 的格式。

Flink SQL> CREATE CATALOG mysql_30_catalog WITH(

> 'type' = 'jdbc',

> 'default-database' = '庫',

> 'username' = '使用者名',

> 'password' = '*****',

> 'base-url' = 'jdbc:mysql://ip:3306'

> );

USE CATALOG my_catalog;           

查詢資料:

Flink 中的表的完整路徑應該是 "<catalog>.<db>.`<schema.table>`"。如果指定了 schema,請注意需要轉義 <schema.table>。

Flink SQL> select * from mysql_30_catalog.庫名.`表名` limit 2;
FlinkSQL使用Catalog通路外部資料源

大功告成,這樣就可以查的配置的catalog對應資料源的資料了。

繼續閱讀