天天看點

PyFlink 1.11 遊樂場 - 開發環境準備開發環境依賴PyCharm 配置 Python interpreter安裝PyFlinkHelloWorld 示例開發日志遊樂場設施

開發環境依賴

PyFlink作業的開發和運作需要依賴Python 3.5/3.6/3.7 版本和Java 8或者Java 11,本遊樂場所使用的環境是Java 1.8.0_211, Python 3.7.7 還有一些其他基礎軟體如下;

  • Java 1.8.0_211
  • Python 3.7.7
  • PIP 20.0.2
  • PyCharm Runtime version: 11.0.7
  • MocOS 10.14.6

PyCharm 配置 Python interpreter

應用PyCharm進行開發首先要配置一下項目所使用的Python環境,配置路徑

PyCharm -> Preferences -> Project Interpreter

如下:

PyFlink 1.11 遊樂場 - 開發環境準備開發環境依賴PyCharm 配置 Python interpreter安裝PyFlinkHelloWorld 示例開發日志遊樂場設施

點選

Add

配置新的環境,如下:

PyFlink 1.11 遊樂場 - 開發環境準備開發環境依賴PyCharm 配置 Python interpreter安裝PyFlinkHelloWorld 示例開發日志遊樂場設施

一路”OK“,完成配置。

安裝PyFlink

我們先利用PyCharm建立一些項目,名為

PyFlinkPlayground

, 并為項目選擇我們剛才建立的Virtualenv環境,如下:

PyFlink 1.11 遊樂場 - 開發環境準備開發環境依賴PyCharm 配置 Python interpreter安裝PyFlinkHelloWorld 示例開發日志遊樂場設施

建立之後,我們會看到

External Libraries

裡面使用了

PlaygroundEnv

, 但是初始化并沒有PyFlink,是以我們需要進行顯示的安裝,如下:

PyFlink 1.11 遊樂場 - 開發環境準備開發環境依賴PyCharm 配置 Python interpreter安裝PyFlinkHelloWorld 示例開發日志遊樂場設施

我們可以手工安裝PyFlink,直接在PyCharm的

Terminal

下進行安裝,這時候我們自動就是啟動的

PlaygroundEnv

環境,在安裝的過程中你也可以看到

site-packages

内容會不斷增加,

(PlaygroundEnv) jincheng:~ jincheng.sunjc$ python --version
Python 3.7.7
(PlaygroundEnv) jincheng:~ jincheng.sunjc$ python -m pip install apache-flink==1.11.1
Collecting apache-flink==1.11.1
  Using cached apache_flink-1.11.1-cp37-cp37m-macosx_10_9_x86_64.whl (206.7 MB)

...
...
Successfully installed apache-beam-2.19.0 apache-flink-1.11.1 avro-python3-1.9.1 certifi-2020.6.20 chardet-3.0.4 cloudpickle-1.2.2 crcmod-1.7 dill-0.3.1.1 docopt-0.6.2 fastavro-0.21.24 future-0.18.2 grpcio-1.30.0 hdfs-2.5.8 httplib2-0.12.0 idna-2.10 jsonpickle-1.2 mock-2.0.0 numpy-1.19.1 oauth2client-3.0.0 pandas-0.25.3 pbr-5.4.5 protobuf-3.12.4 py4j-0.10.8.1 pyarrow-0.15.1 pyasn1-0.4.8 pyasn1-modules-0.2.8 pydot-1.4.1 pymongo-3.11.0 pyparsing-2.4.7 python-dateutil-2.8.0 pytz-2020.1 requests-2.24.0 rsa-4.6 six-1.15.0 typing-3.7.4.3 typing-extensions-3.7.4.2 urllib3-1.25.10
(PlaygroundEnv) jincheng:~ jincheng.sunjc$            

最終完成之後你可以在

site-packages

下面找的

pyflink

目錄,如下:

PyFlink 1.11 遊樂場 - 開發環境準備開發環境依賴PyCharm 配置 Python interpreter安裝PyFlinkHelloWorld 示例開發日志遊樂場設施

有了這些資訊我們就可以進行PyFlink的作業開發了。

HelloWorld 示例

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

def hello_world():
    """
    從随機Source讀取資料,然後直接利用PrintSink輸出。
    """
    settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
    source_ddl = """
                    CREATE TABLE random_source (
                        f_sequence INT,
                        f_random INT,
                        f_random_str STRING
                    ) WITH (
                        'connector' = 'datagen',
                        'rows-per-second'='5',
                        'fields.f_sequence.kind'='sequence',
                        'fields.f_sequence.start'='1',
                        'fields.f_sequence.end'='1000',
                        'fields.f_random.min'='1',
                        'fields.f_random.max'='1000',
                        'fields.f_random_str.length'='10'
                    )
                    """

    sink_ddl = """
                  CREATE TABLE print_sink (
                    f_sequence INT,
                    f_random INT,
                    f_random_str STRING 
                ) WITH (
                  'connector' = 'print'
                )
        """

    # 注冊source和sink
    t_env.execute_sql(source_ddl);
    t_env.execute_sql(sink_ddl);

    # 資料提取
    tab = t_env.from_path("random_source");
    # 這裡我們暫時先使用 标注了 deprecated 的API, 因為新的異步送出測試有待改進...
    tab.insert_into("print_sink");
    # 執行作業
    t_env.execute("Flink Hello World");

if __name__ == '__main__':
    hello_world()           

上面代碼在PyCharm裡面右鍵運作就應該列印如下結果了:

PyFlink 1.11 遊樂場 - 開發環境準備開發環境依賴PyCharm 配置 Python interpreter安裝PyFlinkHelloWorld 示例開發日志遊樂場設施

開發日志

正常來講我們可能開發一些UDF,可能列印一些日志或者特殊情況還可能進行Python代碼的調試,怎麼解?

  • 首先,我們定義一個UDF,在UDF裡面添加調試日志,如下:
# 定義UDF
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def pass_by(str):
    logging.error("Some debugging infomation...")
    return str           
  • 然後在SQL裡面使用這個UDF,如下:
# 注冊 UDF
t_env.register_function('pass_by', pass_by)
# 使用UDF
tab.select("f_sequence, f_random, pass_by(f_random_str) ")           
  • 完整的代碼
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

import logging

# 定義UDF
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def pass_by(str):
    logging.error("Some debugging infomation...")
    return "pass_by_" + str

def hello_world():
    """
    從随機Source讀取資料,然後直接利用PrintSink輸出。
    """
    settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
    t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

    source_ddl = """
                    CREATE TABLE random_source (
                        f_sequence INT,
                        f_random INT,
                        f_random_str STRING
                    ) WITH (
                        'connector' = 'datagen',
                        'rows-per-second'='5',
                        'fields.f_sequence.kind'='sequence',
                        'fields.f_sequence.start'='1',
                        'fields.f_sequence.end'='1000',
                        'fields.f_random.min'='1',
                        'fields.f_random.max'='1000',
                        'fields.f_random_str.length'='10'
                    )
                    """

    sink_ddl = """
                  CREATE TABLE print_sink (
                    f_sequence INT,
                    f_random INT,
                    f_random_str STRING 
                ) WITH (
                  'connector' = 'print'
                )
        """

    # 注冊source和sink
    t_env.execute_sql(source_ddl);
    t_env.execute_sql(sink_ddl);

    # 注冊 UDF
    t_env.register_function('pass_by', pass_by)

    # 資料提取
    tab = t_env.from_path("random_source");
    # 這裡我們暫時先使用 标注了 deprecated 的API, 因為新的異步送出測試有待改進...
    tab.select("f_sequence, f_random, pass_by(f_random_str) ").insert_into("print_sink")
    # 執行作業
    t_env.execute("Flink Hello World");

if __name__ == '__main__':
    hello_world()           

那麼運作之後,日志在哪裡呢?就是在項目的

PlaygroundEnv -> site-packages -> pyflink -> log

目錄 ,如下:

到這裡,簡單的 開發環境就OK了,大家可以改改代碼,直覺體驗一下。。。

遊樂場設施