背景
- 随着flink在流計算領域越來越火,很多公司基于flink搭建了自己的實時計算平台,使用者可以在實時平台通過jar或者sql的方式來開發、上線、下線、運維flink任務,避免了建構flink任務的複雜性,使更多不會flink的人能夠使用flink。
- 平時我們自己開發一個flink任務之後,都是通過腳本的方式送出到叢集的,但是我們搭建了一個實時計算之後,就不能通過指令行來管理任務了,我們今天就主要講一下如何通過api的方式來和yarn叢集互動。
- 目前生産環境部署flink任務主要有yarn叢集和k8s叢集兩種方式,雖然說k8s号稱下一代資源管理系統,但是對于flink來說,還是有很多不太成熟,是以目前部署flink任務還是以yarn叢集為主。
- yarn叢集部署flink任務目前有兩種方式
-
yarn session 模式
session模式是在yarn上面預先啟動一個叢集,然後我們可以将任務部署到叢集上,沒有任務的時候叢集上沒有taskmanager,當有了新的任務之後,系統會自動為其配置設定資源,當任務結束之後,過一段時間(可配置)系統會自動釋放資源,這種叢集一般适合運作周期比較短的任務,比如批處理任務。
-
per job 模式
per job模式是每個任務都啟動一個flink叢集,這種模式的好處就是資源隔離,不互相影響,任務結束之後,釋放相應的資源。這種模式啟動任務時間長,一般适合運作常駐任務,比如flink流任務.
案例詳解
今天我們主要講一下如何通過api的方式來停止一個通過per job模式部署在yarn叢集上的任務。
指令行停止
我們在命名行模式下可以通過下面的指令來停止一個部署在yarn的per job模式的flink任務.
${FLINK_HOME}/bin/flink stop -m yarn-cluster -yid application_1592386606716_0005 c8ee546129e8480809ee62a4ce7dd91d
複制
我們看到,主要是有兩個參數,一個是yarn的applicationId,還有一個是flink的jobId,執行成功之後,會傳回一個類似的結果:
Savepoint completed. Path: hdfs://localhost/flink-savepoints/savepoint-c8ee54-ee7a059c2f98
複制
api實作
其實主要的方法就是構造出上面兩個id,然後我們使用ClusterClient來停止flink任務.
- 添加配置檔案 我們在classpath下添加hadoop和flink的配置檔案

- 構造ApplicationId對象
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId);
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);
複制
- 構造jobId
private static JobID parseJobId(String jobIdString) throws CliArgsException{
if (jobIdString == null){
throw new CliArgsException("Missing JobId");
}
final JobID jobId;
try {
jobId = JobID.fromHexString(jobIdString);
} catch (IllegalArgumentException e){
throw new CliArgsException(e.getMessage());
}
return jobId;
}
複制
- 停止任務 通過stopWithSavepoint方法來停止任務,如果savePoint沒指定的話,系統将會使用flink配置檔案中的state.savepoints.dir選項.
CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(
jobID,
true,
savePoint);
String savepoint = completableFuture.get();
System.out.println(savepoint);
複制
最後執行完成之後,會傳回一個savepoint的位址,和指令行一樣,我們可以把這個位址存起來,以便我們後續從這個checkpoint啟動。
完整的代碼請參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/StopYarnJob.java