天天看點

如何在 Apache Flink 1.10 中使用 Python UDF?

作者:孫金城(金竹)

在 Apache Flink 1.9 版中,我們引入了 PyFlink 子產品,支援了 Python Table API。Python 使用者可以完成資料轉換和資料分析的作業。但是,您可能會發現在 PyFlink 1.9 中還不支援定義 Python UDFs,對于想要擴充系統内置功能的 Python 使用者來說,這可能有諸多不便。

在剛剛釋出的 ApacheFlink 1.10 中,PyFlink 添加了對 Python UDFs 的支援。這意味着您可以從現在開始用 Python 編寫 UDF 并擴充系統的功能。此外,本版本還支援 Python UDF 環境和依賴管理,是以您可以在 UDF 中使用第三方庫,進而利用 Python 生态豐富的第三方庫資源。

PyFlink 支援 Python UDFs 的架構

在深入了解如何定義和使用 Python UDFs 之前,我們将解釋 UDFs 在 PyFlink 中工作的架構和背景,并提供一些有關我們底層實作的細節介紹。

Beam on Flink

Apache Beam 是一個統一程式設計模型架構,實作了可使用任何語言開發可以運作在任何執行引擎上的批處理和流處理作業,這得益于 Beam 的 Portability Framework,如下圖所示:

如何在 Apache Flink 1.10 中使用 Python UDF?

Portability Framework

上圖是 Beam 的 Portability Framework 的體系結構。它描述了 Beam 如何支援多種語言和多種引擎的方式。關于 Flink Runner 部分,我們可以說是 Beam on Flink。那麼,這與 PyFlink 支援 Python UDF 有什麼關系呢?這将接下來“Flink on Beam”中介紹。

Flink on Beam

Apache Flink 是一個開源項目,是以,它的社群也更多地使用開源。例如,PyFlink 中對 Python UDF 的支援選擇了基于 Apache Beam 這輛豪華跑車之上進行建構。:)

如何在 Apache Flink 1.10 中使用 Python UDF?

PyFlink 對 Python UDFs 的支援上,Python 的運作環境管理以及 Python 運作環境 Python VM 和 Java 運作環境 JVM 的通訊至關重要。幸運的是,Apache Beam 的 Portability Framework 完美解決了這個問題。是以才有了如下 PyFlink on Beam Portability Framework 的架構如下:

如何在 Apache Flink 1.10 中使用 Python UDF?

PyFlink on Beam Portability Framework

Beam Portability Framework 是一個成熟的多語言支援架構,架構高度抽象了語言之間的通信協定(gRPC),定義了資料的傳輸格式(Protobuf),并且根據通用流計算架構所需要的元件,抽象個各種服務,比如, DataService,StateService,MetricsService 等。

在這樣一個成熟的架構下,PyFlink 可以快速的建構自己的 Python 算子,同時重用 Apache Beam Portability Framework 中現有 SDK harness 元件,可以支援多種 Python 運作模式,如:Process,Docker,etc.,這使得 PyFlink 對 Python UDF 的支援變得非常容易,在 Apache Flink 1.10 中的功能也非常的穩定和完整。那麼為啥說是 Apache Flink 和 Apache Beam 共同打造呢,是因為我發現目前 Apache Beam Portability Framework 的架構也存在很多優化的空間,是以我在 Beam 社群進行了優化讨論,并且在 Beam 社群也貢獻了 30+ 的優化更新檔。

JVM 和 Python VM 的通訊

由于 Python UDF 無法直接在 JVM 中運作,是以需要由 Apache Flink 算子在初始化時啟動的 Python 程序來準備 Python 執行環境。Python ENV 服務負責啟動,管理和終止 Python 程序。如下圖 4 所示,Apache Flink 算子和 Python 執行環境之間的通信和涉及多個元件:

如何在 Apache Flink 1.10 中使用 Python UDF?

Communication between JVM and Python VM

  • 環境管理服務: 負責啟動和終止 Python 執行環境。
  • 資料服務: 負責在 Apache Flink 算子和 Python 執行環境之間傳輸輸入資料和接收使用者 UDF 的執行結果。
  • 日志服務: 是記錄對使用者 UDF 日志輸出支援的機制。它可以将使用者 UDF 産生的日志傳輸到 Apache Flink 算子,并與 Apache Flink 的日志系統內建。

說明: 其中 metrics 服務計劃在 Apache Flink 1.11 進行支援。

下圖描述了從 Java 算子到 Python 程序之間初始化和執行 UDF 的概要流程。

如何在 Apache Flink 1.10 中使用 Python UDF?

High-level flow between Python VM and JVM

整體流程可以概括為如下兩部分:

  • 初始化 Python 執行環境。
    • Python UDF Runner 啟動所需的 gRPC 服務,如資料服務、日志服務等。
    • Python UDF Runner 另起程序并啟動 Python 執行環境。
    • Python worker 向 PythonUserDefinedFunctionRunner 進行注冊。
    • Python UDF Runner 向 Python worker 發送需要在 Python 程序中執行的使用者定義函數。
    • Python worker 将使用者定義的函數轉換為 Beam 執行算子(注意:目前,PyFlink 利用 Beam 的可移植性架構[1]來執行 Python UDF)。
    • Python worker 和 Flink Operator 之間建立 gRPC 連接配接,如資料連接配接、日志連接配接等。
  • 處理輸入元素。
    • Python UDF Runner 通過 gRPC 資料服務将輸入元素發送給 Python worker 執行。
    • Python 使用者定義函數還可以在執行期間通過 gRPC 日志服務和 metrics 服務将日志和 metrics 收集到 Python UDF Runner。
    • 執行結果可以通過 gRPC 資料服務發送到 Python UDF Runner。

如何在 Apache Flink 1.10 的 PyFlink 中使用 UDFs

本節将介紹使用者如何定義 UDF,并完整展示了如何安裝 PyFlink,如何在 PyFlink 中定義/注冊/調用 UDF,以及如何執行作業。

安裝 PyFlink

我們需要先安裝 PyFlink,可以通過 PyPI 獲得,并且可以使用 pip install 進行便捷安裝。

注意: 安裝和運作 PyFlink 需要 Python 3.5 或更高版本。

$ python -m pip install apache-Apache Flink           

定義一個 UDF

除了擴充基類 ScalarFunction 之外,定義 Python UDF 的方法有很多。下面的示例顯示了定義 Python UDF 的不同方法,該函數以 BIGINT 類型的兩列作為輸入參數,并傳回它們的和作為結果。

  • Option 1: extending the base class ScalarFunction
class Add(ScalarFunction):
  def eval(self, i, j):
    return i + j

add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())           
  • Option 2: Python function
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
  return i + j           
  • Option 3: lambda function
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())           
  • Option 4: callable function
class CallableAdd(object):
  def __call__(self, i, j):
    return i + j

add = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())           
  • Option 5: partial function
return i + j + k

add = udf(functools.partial(partial_add, k=1), [DataTypes.BIGINT(), DataTypes.BIGINT()],
          DataTypes.BIGINT())           

注冊一個UDF

  • register the Python function
table_env.register_function("add", add)           
  • Invoke a Python UDF
my_table.select(```js
"add(a, b)")           
  • Example Code

下面是一個使用 Python UDF 的完整示例。

from PyFlink.table import StreamTableEnvironment, DataTypes
from PyFlink.table.descriptors import Schema, OldCsv, FileSystem
from PyFlink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

t_env.register_function("add", udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()))

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")           
  • 送出作業

首先,您需要在“ / tmp / input”檔案中準備輸入資料。例如,

$ echo "1,2" > /tmp/input           

接下來,您可以在指令行上運作此示例:

$ python python_udf_sum.py           

通過該指令可在本地小叢集中建構并運作 Python Table API 程式。您還可以使用不同的指令行将 Python Table API 程式送出到遠端叢集。

最後,您可以在指令行上檢視執行結果:

$ cat /tmp/output
3           

Python UDF 的依賴管理

在許多情況下,您可能希望在 Python UDF 中導入第三方依賴。下面的示例将指導您如何管理依賴項。

假設您想使用 mpmath 來執行上述示例中兩數的和。Python UDF 邏輯可能如下:

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
    from mpmath import fadd # add third-party dependency
    return int(fadd(1, 2))           

要使其在不包含依賴項的工作節點上運作,可以使用以下 API 指定依賴項:

# echo mpmath==1.1.0 > requirements.txt
# pip download -d cached_dir -r requirements.txt --no-binary :all:
t_env.set_python_requirements("/path/of/requirements.txt", "/path/of/cached_dir")           

使用者需要提供一個 requirements.txt 檔案,并且在裡面申明使用的第三方依賴。如果無法在群集中安裝依賴項(網絡問題),則可以使用參數“requirements_cached_dir”,指定包含這些依賴項的安裝包的目錄,如上面的示例所示。依賴項将上傳到群集并脫機安裝。

下面是一個使用依賴管理的完整示例:

from PyFlink.datastream import StreamExecutionEnvironment
from PyFlink.table import StreamTableEnvironment, DataTypes
from PyFlink.table.descriptors import Schema, OldCsv, FileSystem
from PyFlink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
    from mpmath import fadd
    return int(fadd(1, 2))

t_env.set_python_requirements("/tmp/requirements.txt", "/tmp/cached_dir")
t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")           

首先,您需要在“/ tmp / input”檔案中準備輸入資料。例如,

echo "1,2" > /tmp/input
1
2           

其次,您可以準備依賴項需求檔案和緩存目錄:

$ echo "mpmath==1.1.0" > /tmp/requirements.txt
$ pip download -d /tmp/cached_dir -r /tmp/requirements.txt --no-binary :all:           
$ python python_udf_sum.py           
$ cat /tmp/output
3           

快速上手

PyFlink 為大家提供了一種非常友善的開發體驗方式 - PyFlink Shell。當成功執行 python -m pip install apache-flink 之後,你可以直接以 pyflink-shell.sh local 來啟動一個 PyFlink Shell 進行開發體驗,如下所示:

如何在 Apache Flink 1.10 中使用 Python UDF?

更多場景

不僅僅是簡單的 ETL 場景支援,PyFlink 可以完成很多複雜場的業務場景需求,比如我們最熟悉的雙 11 大屏的場景,如下:

如何在 Apache Flink 1.10 中使用 Python UDF?

關于上面示例的更多詳細請查閱:

https://enjoyment.cool/2019/12/05/Apache-Flink-

說道系列-如何在PyFlink-1-10中自定義Python-UDF/

總結和未來規劃

在本部落格中,我們介紹了 PyFlink 中 Python UDF 的架構,并給出了如何定義、注冊、調用和運作 UDF 的示例。随着 1.10 的釋出,它将為 Python 使用者提供更多的可能來編寫 Python 作業邏輯。同時,我們一直積極與社群合作,不斷改進 PyFlink 的功能和性能。今後,我們計劃在标量和聚合函數中引入對 Pandas 的支援;通過 SQL 用戶端增加對 Python UDF 使用的支援,以擴充 Python UDF 的使用範圍;并做更多的性能改進。近期,郵件清單上有一個關于新功能支援的讨論,您可以檢視并找到更多詳細資訊。

在社群貢獻者的不斷努力之下,PyFlink 的功能可以如上圖一樣可以迅速從幼苗變成大樹:

如何在 Apache Flink 1.10 中使用 Python UDF?

PyFlink 需要你的加入

PyFlink 是一個新元件,仍然需要做很多工作。是以,熱誠歡迎每個人加入對 PyFlink 的貢獻,包括提出問題,送出錯誤報告,提出新功能,加入讨論,貢獻代碼或文檔。期望在 PyFlink 見到你!