天天看點

Flink getExecutionEnvironment 區分運作環境的原理

作者:SapphireCoder

導讀:根據官方文檔描述,StreamExecutionEnvironment 是所有 Flink 程式的基礎,我們隻需要使用 getExecutionEnvironment() 靜态方法就可以擷取 StreamExecutionEnvironment,該方法會根據上下文做正确的處理。

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();           

StreamExecutionEnvironment

StreamExecutionEnvironment 有以下幾個主要的子類:

  • LocalStreamEnvironment: 作為流式作業的本地執行環境,在本地建立 JVM 中建立 MiniCluster 并将流式作業運作在MiniCluster中
  • RemoteStreamExecution: 通過遠端參數将流式作業送出到叢集運作,且建立過程中需要以應用程式的JAR包的作為參數
  • StreamContextEnvironment: 用在Cli-Client用戶端方式送出流式作業。

getExecutionEnvironment 如何擷取環境

public static StreamExecutionEnvironment getExecutionEnvironment() {
        return getExecutionEnvironment(new Configuration());
    }

public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                .map(factory -> factory.createExecutionEnvironment(configuration))
                .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    }           

這裡涉及到 StreamExecutionEnvironment 的一個靜态屬性 threadLocalContextEnvironmentFactory,生成運作環境的工廠類(StreamExecutionEnvironmentFactory)放在該 ThreadLocal 中。

/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
    private static final ThreadLocal<StreamExecutionEnvironmentFactory>
            threadLocalContextEnvironmentFactory = new ThreadLocal<>();           

分析上面 getExecutionEnvironment 代碼我們得知當在 ThreadLocal 擷取到的 StreamExecutionEnvironmentFactory為空時會生成本地運作環境(LocalStreamEnvironment),而非空時會通過 createExecutionEnvironment() 方法來生成運作環境。

工廠類執行個體是如何放到 ThreadLocal 中

這裡我們以 Cli 送出方式為例(通過 bin/flink run .. 指令送出 jar 包到叢集)進行分析。該 flink 腳本内部最終會運作 CliFrontend 的 main() 方法、

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"           

在CliFrontend 中進行方法追蹤 main() -> parseParameters() -> run() -> executeProgram()

protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
		ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
	}           

在org.apache.flink.client.ClientUtils的executeProgram()中調用 StreamContextEnvironment.setAsContext(...),StreamContextEnvironment 繼承自 StreamExecutionEnvironment。setAsContext()代碼如下

public static void setAsContext(
			final PipelineExecutorServiceLoader executorServiceLoader,
			final Configuration configuration,
			final ClassLoader userCodeClassLoader,
			final boolean enforceSingleJobExecution,
			final boolean suppressSysout) {
		StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
			executorServiceLoader,
			configuration,
			userCodeClassLoader,
			enforceSingleJobExecution,
			suppressSysout);
		initializeContextEnvironment(factory);
	}           

建立生成運作環境的工廠類執行個體 StreamContextEnvironment,在 initializeContextEnvironment() 方法中把執行個體放到 StreamExecutionEnvironment 類的靜态屬性threadLocalContextEnvironmentFactory 中 ,代碼如下

protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
		contextEnvironmentFactory = ctx;
		threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
	}           

最後

感謝您的閱讀,如果喜歡本文歡迎關注和轉發,轉載需注明出處,本頭條号将持續分享IT技術知識。對于文章内容有其他想法或意見建議等,歡迎提出共同讨論共同進步。

參考文章

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/overview/

https://juejin.cn/post/7041199990470869028

https://blog.51cto.com/u_15155081/2720586

繼續閱讀