作者:付典
背景
Python 自定義函數是 PyFlink Table API 中最重要的功能之一,其允許使用者在 PyFlink Table API 中使用 Python 語言開發的自定義函數,極大地拓寬了 Python Table API 的使用範圍。
目前 Python 自定義函數的功能已經非常完善,支援多種類型的自定義函數,比如 UDF(scalar function)、UDTF(table function)、UDAF(aggregate function),UDTAF(table aggregate function,1.13 支援)、Panda UDF、Pandas UDAF 等。接下來,我們詳細介紹一下如何在 PyFlink Table API 作業中使用 Python 自定義函數。
Python 自定義函數基礎
根據輸入 / 輸出資料的行數,Flink Table API & SQL 中,自定義函數可以分為以下幾類:
自定義函數 | Single Row Input | Multiple Row Input |
---|---|---|
Single Row Output | ScalarFunction | AggregateFunction |
Multiple Row Output | TableFunction | TableAggregateFunction |
PyFlink 針對以上四種類型的自定義函數都提供了支援,接下來,我們分别看一下每種類型的自定義函數如何使用。
Python UDF
Python UDF,即 Python ScalarFunction,針對每一條輸入資料,僅産生一條輸出資料。比如以下示例,展示了通過多種方式,來定義名字為 "sub_string" 的 Python UDF:
from pyflink.table.udf import udf, FunctionContext, ScalarFunction
from pyflink.table import DataTypes
方式一:
@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
return s[begin:end]
方式二:
sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING())
方式三:
class SubString(object):
def __call__(self, s: str, begin: int, end: int):
return s[begin:end]
sub_string = udf(SubString(), result_type=DataTypes.STRING())
方式四:
def sub_string(s: str, begin: int, end: int):
return s[begin:end]
sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), result_type=DataTypes.STRING())
方式五:
class SubString(ScalarFunction):
def open(self, function_context: FunctionContext):
pass
def eval(self, s: str, begin: int, end: int):
return s[begin:end]
sub_string = udf(SubString(), result_type=DataTypes.STRING())
說明:
- 需要通過名字為 “ udf ” 的裝飾器,聲明這是一個 scalar function;
- 需要通過裝飾器中的 result_type 參數,聲明 scalar function 的結果類型;
- 上述方式五,通過繼承 ScalarFunction 的方式來定義 Python UDF 有以下用處:
- ScalarFunction 的基類 UserDefinedFunction 中定義了一個 open 方法,該方法隻在作業初始化時執行一次,是以可以利用該方法,做一些初始化工作,比如加載機器學習模型、連接配接外部服務等。
- 此外,還可以通過 open 方法中的 function_context 參數,注冊及使用 metrics。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])
table.select(sub_string(table.a, 1, 3))
Python UDTF
Python UDTF,即 Python TableFunction,針對每一條輸入資料,Python UDTF 可以産生 0 條、1 條或者多條輸出資料,此外,一條輸出資料可以包含多個列。比如以下示例,定義了一個名字為 split 的Python UDF,以指定字元串為分隔符,将輸入字元串切分成兩個字元串:
from pyflink.table.udf import udtf
from pyflink.table import DataTypes
@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str, sep: str):
splits = s.split(sep)
yield splits[0], splits[1]
- 需要通過名字為 “ udtf ” 的裝飾器,聲明這是一個 table function;
- 需要通過裝飾器中的 result_types 參數,聲明 table function 的結果類型。由于 table function 每條輸出可以包含多個列,result_types 需要指定所有輸出列的類型;
- Python UDTF 的定義,也支援 Python UDF 章節中所列出的多種定義方式,這裡隻展示了其中一種。
定義完 Python UDTF 之後,可以直接在 Python Table API 中使用:
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table.join_lateral(split(table.a, '|').alias("c1, c2"))
table.left_outer_join_lateral(split(table.a, '|').alias("c1, c2"))
Python UDAF
Python UDAF,即 Python AggregateFunction。Python UDAF 用來針對一組資料進行聚合運算,比如同一個 window 下的多條資料、或者同一個 key 下的多條資料等。針對同一組輸入資料,Python AggregateFunction 産生一條輸出資料。比如以下示例,定義了一個名字為 weighted_avg 的 Python UDAF:
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udaf
class WeightedAvg(AggregateFunction):
def create_accumulator(self):
# Row(sum, count)
return Row(0, 0)
def get_value(self, accumulator: Row) -> float:
if accumulator[1] == 0:
return 0
else:
return accumulator[0] / accumulator[1]
def accumulate(self, accumulator: Row, value, weight):
accumulator[0] += value * weight
accumulator[1] += weight
def retract(self, accumulator: Row, value, weight):
accumulator[0] -= value * weight
accumulator[1] -= weight
weighted_avg = udaf(f=WeightedAvg(),
result_type=DataTypes.DOUBLE(),
accumulator_type=DataTypes.ROW([
DataTypes.FIELD("f0", DataTypes.BIGINT()),
DataTypes.FIELD("f1", DataTypes.BIGINT())]))
- 需要通過名字為 “ udaf ” 的裝飾器,聲明這是一個 aggregate function,
- 需要分别通過裝飾器中的 result_type 及 accumulator_type 參數,聲明 aggregate function 的結果類型及 accumulator 類型;
- create_accumulator,get_value 和 accumulate 這 3 個方法必須要定義,retract 方法可以根據需要定義,詳細資訊可以參見 Flink 官方文檔 [1];需要注意的是,由于必須定義 create_accumulator,get_value 和 accumulate 這 3 個方法,Python UDAF 隻能通過繼承AggregateFunction 的方式進行定義(Pandas UDAF 沒有這方面的限制)。
定義完 Python UDAF 之後,可以在 Python Table API 中這樣使用:
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")],
["value", "count", "name"])
t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))
Python UDTAF
Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用來針對一組資料進行聚合運算,比如同一個 window 下的多條資料、或者同一個 key 下的多條資料等,與 Python UDAF 不同的是,針對同一組輸入資料,Python UDTAF 可以産生 0 條、1 條、甚至多條輸出資料。
以下示例,定義了一個名字為 Top2 的 Python UDTAF:
from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.udf import udtaf, TableAggregateFunction
class Top2(TableAggregateFunction):
def create_accumulator(self):
# 存儲目前最大的兩個值
return [None, None]
def accumulate(self, accumulator, input_row):
if input_row[0] is not None:
# 新的輸入值最大
if accumulator[0] is None or input_row[0] > accumulator[0]:
accumulator[1] = accumulator[0]
accumulator[0] = input_row[0]
# 新的輸入值次大
elif accumulator[1] is None or input_row[0] > accumulator[1]:
accumulator[1] = input_row[0]
def emit_value(self, accumulator):
yield Row(accumulator[0])
if accumulator[1] is not None:
yield Row(accumulator[1])
top2 = udtaf(f=Top2(),
result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),
accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
- Python UDTAF 功能是 Flink 1.13 之後支援的新功能;
- create_accumulator,accumulate 和 emit_value 這 3 個方法必須定義,此外 TableAggregateFunction 中支援 retract、merge 等方法,可以根據需要選擇是否定義,詳細資訊可以參見 Flink 官方文檔[2]。
定義完 Python UDTAF 之後,可以在 Python Table API 中這樣使用:
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t = t_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(2, 'Hi', 'Hello'),
(7, 'Hi', 'Hello')],
['a', 'b', 'c'])
t_env.execute_sql("""
CREATE TABLE my_sink (
word VARCHAR,
`sum` BIGINT
) WITH (
'connector' = 'print'
)
""")
result = t.group_by(t.b).flat_aggregate(top2).select("b, a").execute_insert("my_sink")
# 1)等待作業執行結束,用于local執行,否則可能作業尚未執行結束,該腳本已退出,會導緻minicluster過早退出
# 2)當作業通過detach模式往remote叢集送出時,比如YARN/Standalone/K8s等,需要移除該方法
result.wait()
當執行以上程式,可以看到類似如下輸出:
11> +I[Hi, 7]
10> +I[Hi2, 5]
11> +I[Hi, 3]
- Python UDTAF 隻能用于 Table API,不能用于 SQL 語句中;
- flat_aggregate 的結果包含了原始的 grouping 列以及 UDTAF(top 2)的輸出,是以,可以在 select 中通路列 “ b ”。
Python 自定義函數進階
在純 SQL 作業中使用 Python 自定義函數
Flink SQL 中的 CREATE FUNCTION 語句支援注冊 Python 自定義函數,是以使用者除了可以在 PyFlink Table API 作業中使用 Python 自定義函數之外,還可以在純 SQL 作業中使用 Python 自定義函數。
CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON
CREATE TABLE source (
a VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TABLE sink (
a VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT sub_string(a, 1, 3)
FROM source;
在 Java 作業中使用 Python 自定義函數
使用者可以通過 DDL 的方式注冊 Python 自定義函數,這意味着,使用者也可以在 Java Table API 作業中使用 Python 自定義函數,比如:
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON");
tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));
tEnv.executeSql("SELECT sub_string(a) FROM source").collect();
詳細示例可以參見 PyFlink Playground [3]。
該功能的一個重要用處是将 Java 算子與 Python 算子混用。使用者可以使用 Java 語言來開發絕大部分的作業邏輯,當作業邏輯中的某些部分必須使用 Python 語言來編寫時,可以通過如上方式來調用使用 Python 語言編寫的自定義函數。
如果是 DataStream 作業,可以先将 DataStream 轉換成 Table,然後再通過上述方式,調用 Python 語言編寫的自定義函數。
依賴管理
在 Python 自定義函數中通路第三方 Python 庫是非常常見的需求,另外,在機器學習預測場景中,使用者也可能需要在 Python 自定義函數中加載一個機器學習模型。當我們通過 local 模式執行 PyFlink 作業時,可以将第三方 Python 庫安裝在本地 Python 環境中,或者将機器學習模型下載下傳到本地;然而當我們将 PyFlink 作業送出到遠端執行的時候,這也可能會出現一些問題:
- 第三方 Python 庫如何被 Python 自定義函數通路。不同的作業,對于 Python 庫的版本要求是不一樣的,将第三方 Python 庫預安裝到叢集的 Python 環境中,隻适用于安裝一些公共的依賴,不能解決不同作業對于 Python 依賴個性化的需求;
- 機器學習模型或者資料檔案,如何分發到叢集節點,并最終被 Python 自定義函數通路。
除此之外,依賴可能還包括 JAR 包等,PyFlink 中針對各種依賴提供了多種解決方案:
依賴類型 | 解決方案 | 用途描述 | 示例(flink run) | ||
---|---|---|---|---|---|
flink run參數 | 配置項 | API | |||
作業入口檔案 | -py / --python | 無 | 指定作業的入口檔案,隻能是.py檔案 | -py file:///path/to/table_api_demo.py | |
作業入口entry module | -pym / --pyModule | 指定作業的entry module,功能和--python類似,可用于當作業的Python檔案為zip包等情況,無法通過--python指定的時候,相比--python來說,更通用 | -pym table_api_demo-pyfs file:///path/to/table_api_demo.py | ||
Python三方庫檔案 | -pyfs / --pyFiles | python.files | add_python_file | 指定一個到多個Python檔案(.py/.zip/.whl等,逗号分割),這些Python檔案在作業執行時,會放到Python程序的PYTHONPATH中,可以在Python自定義函數中直接通路 | -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip |
存檔檔案 | -pyarch /--pyArchives | python.archives | add_python_archive | 指定一個到多個存檔檔案(逗号分割),這些存檔檔案,在作業執行的時候,會被解壓,并放到Python程序的工作目錄,可以通過相對路徑的方式進行通路 | -pyarchfile:///path/to/venv.zip |
Python解釋器路徑 | -pyexec / --pyExecutable | python.executable | set_python_executable | 指定作業執行時,所使用的Python解釋器路徑 | -pyarchfile:///path/to/venv.zip-pyexec venv.zip/venv/bin/python3 |
requirements檔案 | -pyreq / --pyRequirements | python.requirements | set_python_requirements | 指定requirements檔案,requirements檔案中定義了作業的Python三方庫依賴,作業執行時,會根據requirements的内容,通過pip安裝相關依賴 | -pyreq requirements.txt |
JAR包 | pipeline.classpaths,pipeline.jars | 沒有專門的API,可以通過configuration的set_string方法設定 | 指定作業依賴的JAR包,通常用于指定connector JAR包 |
- 需要注意的是,Python UDF 的實作所在的檔案,也需要在作業執行的時候,作為依賴檔案上傳;
- 可以通過合用 “存檔檔案” 與 “ Python 解釋器路徑”,指定作業使用上傳的 Python 虛拟環境來執行,比如:
table_env.add_python_archive("/path/to/py_env.zip")
# 指定使用py_env.zip包中帶的python來執行使用者自定義函數,必須通過相對路徑來指定
table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
- 推薦使用者使用 conda 來建構 Python 虛拟環境,conda 建構的 Python 虛拟環境包含了執行 Python 所需的絕大多數底層庫,可以極大地避免當本地環境與叢集環境不一樣時,所建構的 Python 虛拟環境在叢集執行時,缺少各種底層依賴庫的問題。關于如何使用 conda 建構的 Python 虛拟環境,可以參考阿裡雲 VVP 文檔中 “使用 Python 三方包” 章節的介紹 [4]
- 有些 Python 三方庫需要安裝才能使用,即并非 ”将其下載下傳下來就可以直接放到 PYTHONPATH 中引用“,針對這種類型的 Python 三方庫,有兩種解決方案:
- 将其安裝在 Python 虛拟環境之中,指定作業運作使用所建構的 Python 虛拟環境;
- 找一台與叢集環境相同的機器(或 docker),安裝所需的 Python 三方庫,然後将安裝檔案打包。該方式相對于 Python 虛拟環境來說,打封包件比較小。詳情可以參考阿裡雲 VVP 文檔中 “使用自定義的 Python 虛拟環境” 章節的介紹 [5]。
調試
PyFlink 支援使用者通過遠端調試的方式,來調試 Python 自定義函數,具體方法可以參見文章 “如何從 0 到 1 開發 PyFlink API 作業” [6] 中 “遠端調試” 章節的介紹。
另外,使用者還可以在 Python 自定義函數中,通過 logging 的方式,列印日志。需要注意的是,日志輸出需要在 TaskManager 的日志檔案中檢視,而不是目前 console。具體使用方式,請參見 “如何從 0 到 1 開發 PyFlink API 作業” [6] 中 “自定義日志” 章節的介紹。需要注意的是,當通過 local 方式運作作業的時候,TM 的日志位于 PyFlink 的安裝目錄,比如:
\>>> import pyflink
['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink']
調優
Python 自定義函數的性能在很大程度上取決于 Python 自定義函數自身的實作,如果遇到性能問題,您首先需要想辦法盡可能優化 Python 自定義函數的實作。
除此之外,Python 自定義函數的性能也受以下參數取值的影響。
參數 | 說明 |
---|---|
python.fn-execution.bundle.size | Python自定義函數的執行是異步的,在作業執行過程中,Java算子将資料異步發送給Python程序進行處理。Java算子在将資料發送給Python程序之前,會先将資料緩存起來,到達一定門檻值之後,再發送給Python程序。python.fn-execution.bundle.size參數可用來控制可緩存的資料最大條數,預設值為100000。 |
python.fn-execution.bundle.time | 用來控制資料的最大緩存時間。當緩存的資料條數到達python.fn-execution.bundle.size定義的門檻值或緩存時間到達python.fn-execution.bundle.time定義的門檻值時,會觸發緩存資料的計算。預設值為1000,機關是毫秒。 |
python.fn-execution.arrow.batch.size | 用來控制當使用Pandas UDF時,一個arrow batch可容納的資料最大條數,預設值為10000。說明 python.fn-execution.arrow.batch.size參數值不能大于python.fn-execution.bundle.size參數值。 |
- checkpoint 時,會觸發緩存資料的計算,是以當上述參數配置的值過大時,可能會導緻checkpoint 時需要處理過多的資料,進而導緻 checkpoint 時間過長,甚至會導緻 checkpoint 失敗。當遇到作業的 checkpoint 時間比較長的問題時,可以嘗試減少上述參數的取值。
常見問題
1)Python 自定義函數的實際傳回值類型與 result_type 中聲明的類型不一緻,該問題會導緻 Java 算子在收到 Python 自定義函數的執行結果,進行反序列化時報錯,錯誤堆棧類似:
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
2)在 Python 自定義函數的 init 方法裡執行個體化了一個不能被 cloudpickle 序列化的對象。
在送出作業的時候,PyFlink 會通過 cloudpickle 序列化 Python 自定義函數,若 Python 自定義函數包含不能被 cloudpickle 序列化的對象,則會遇到類似錯誤:TypeError: can't pickle xxx,可以将這種變量放在 open 方法裡初始化。
3)在 Python 自定義函數的 init 方法裡 load 一個非常大的資料檔案。
由于在送出作業的時候,PyFlink 會通過 cloudpickle 序列化 Python 自定義函數,若在 init 方法裡 load 一個非常大的資料檔案,則整個資料檔案都會被序列化并作為 Python 自定義函數實作的一部分,若資料檔案非常大,可能會導緻作業執行失敗,可以将 load 資料檔案的操作放在 open 方法裡執行。
4)用戶端 Python 環境與叢集端 Python 環境不一緻,比如 Python 版本不一緻、PyFlink 版本不一緻(大版本需要保持一緻,比如都為 1.12.x)等。
總結
在這篇文章中,我們主要介紹了各種 Python 自定義函數的定義及使用方式,以及 Python 依賴管理、 Python 自定義函數調試及調優等方面的資訊,希望可以幫助使用者了解 Python 自定義函數。接下來,我們會繼續推出 PyFlink 系列文章,幫助 PyFlink 使用者深入了解 PyFlink 中各種功能、應用場景、最佳實踐等。
另外,阿裡雲實時計算生态團隊長期招聘優秀大資料人才(包括實習+社招),我們的工作包括:
- 實時機器學習:支援機器學習場景下實時特征工程和 AI 引擎配合,基于 Apache Flink 及其生态打造實時機器學習的标準,推動例如搜尋、推薦、廣告、風控等場景的全面實時化;
- 大資料 + AI 一體化:包括程式設計語言一體化 (PyFlink 相關工作),執行引擎內建化 (TF on Flink),工作流及管理一體化(Flink AI Flow)。
如果你對開源、大資料或者 AI 感興趣,請發履歷到:[email protected]
此外,也歡迎大家加入 “PyFlink交流群”,交流 PyFlink 相關的問題。

引用連結
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#aggregate-functions[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/udfs/python_udfs/#table-aggregate-functions[3]
https://github.com/pyflink/playgrounds#7-python-udf-used-in-java-table-api-jobs[4]
https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.1fe76f50loCz96#title-09r-29j-9d7[5]
https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.4b18419aCuhgmq#title-r01-50c-j82[6]
https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q活動推薦一
報名連結:
https://1712399719478.huodongxing.com/event/1594531547711活動推薦二
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc