導讀:根據官方文檔描述,StreamExecutionEnvironment 是所有 Flink 程式的基礎,我們隻需要使用 getExecutionEnvironment() 靜态方法就可以擷取 StreamExecutionEnvironment,該方法會根據上下文做正确的處理。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment
StreamExecutionEnvironment 有以下幾個主要的子類:
- LocalStreamEnvironment: 作為流式作業的本地執行環境,在本地建立 JVM 中建立 MiniCluster 并将流式作業運作在MiniCluster中
- RemoteStreamExecution: 通過遠端參數将流式作業送出到叢集運作,且建立過程中需要以應用程式的JAR包的作為參數
- StreamContextEnvironment: 用在Cli-Client用戶端方式送出流式作業。
getExecutionEnvironment 如何擷取環境
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
這裡涉及到 StreamExecutionEnvironment 的一個靜态屬性 threadLocalContextEnvironmentFactory,生成運作環境的工廠類(StreamExecutionEnvironmentFactory)放在該 ThreadLocal 中。
/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
private static final ThreadLocal<StreamExecutionEnvironmentFactory>
threadLocalContextEnvironmentFactory = new ThreadLocal<>();
分析上面 getExecutionEnvironment 代碼我們得知當在 ThreadLocal 擷取到的 StreamExecutionEnvironmentFactory為空時會生成本地運作環境(LocalStreamEnvironment),而非空時會通過 createExecutionEnvironment() 方法來生成運作環境。
工廠類執行個體是如何放到 ThreadLocal 中
這裡我們以 Cli 送出方式為例(通過 bin/flink run .. 指令送出 jar 包到叢集)進行分析。該 flink 腳本内部最終會運作 CliFrontend 的 main() 方法、
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
在CliFrontend 中進行方法追蹤 main() -> parseParameters() -> run() -> executeProgram()
protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
在org.apache.flink.client.ClientUtils的executeProgram()中調用 StreamContextEnvironment.setAsContext(...),StreamContextEnvironment 繼承自 StreamExecutionEnvironment。setAsContext()代碼如下
public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout) {
StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
initializeContextEnvironment(factory);
}
建立生成運作環境的工廠類執行個體 StreamContextEnvironment,在 initializeContextEnvironment() 方法中把執行個體放到 StreamExecutionEnvironment 類的靜态屬性threadLocalContextEnvironmentFactory 中 ,代碼如下
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
}
最後
感謝您的閱讀,如果喜歡本文歡迎關注和轉發,轉載需注明出處,本頭條号将持續分享IT技術知識。對于文章内容有其他想法或意見建議等,歡迎提出共同讨論共同進步。
參考文章
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/overview/
https://juejin.cn/post/7041199990470869028
https://blog.51cto.com/u_15155081/2720586