天天看點

flink實戰-實時計算平台通過api停止流任務背景案例詳解

背景

  • 随着flink在流計算領域越來越火,很多公司基于flink搭建了自己的實時計算平台,使用者可以在實時平台通過jar或者sql的方式來開發、上線、下線、運維flink任務,避免了建構flink任務的複雜性,使更多不會flink的人能夠使用flink。
  • 平時我們自己開發一個flink任務之後,都是通過腳本的方式送出到叢集的,但是我們搭建了一個實時計算之後,就不能通過指令行來管理任務了,我們今天就主要講一下如何通過api的方式來和yarn叢集互動。
  • 目前生産環境部署flink任務主要有yarn叢集和k8s叢集兩種方式,雖然說k8s号稱下一代資源管理系統,但是對于flink來說,還是有很多不太成熟,是以目前部署flink任務還是以yarn叢集為主。
  • yarn叢集部署flink任務目前有兩種方式
  1. yarn session 模式

    session模式是在yarn上面預先啟動一個叢集,然後我們可以将任務部署到叢集上,沒有任務的時候叢集上沒有taskmanager,當有了新的任務之後,系統會自動為其配置設定資源,當任務結束之後,過一段時間(可配置)系統會自動釋放資源,這種叢集一般适合運作周期比較短的任務,比如批處理任務。

  2. per job 模式

    per job模式是每個任務都啟動一個flink叢集,這種模式的好處就是資源隔離,不互相影響,任務結束之後,釋放相應的資源。這種模式啟動任務時間長,一般适合運作常駐任務,比如flink流任務.

案例詳解

今天我們主要講一下如何通過api的方式來停止一個通過per job模式部署在yarn叢集上的任務。

指令行停止

我們在命名行模式下可以通過下面的指令來停止一個部署在yarn的per job模式的flink任務.

${FLINK_HOME}/bin/flink stop -m yarn-cluster -yid application_1592386606716_0005 c8ee546129e8480809ee62a4ce7dd91d
           

複制

我們看到,主要是有兩個參數,一個是yarn的applicationId,還有一個是flink的jobId,執行成功之後,會傳回一個類似的結果:

Savepoint completed. Path: hdfs://localhost/flink-savepoints/savepoint-c8ee54-ee7a059c2f98
           

複制

api實作

其實主要的方法就是構造出上面兩個id,然後我們使用ClusterClient來停止flink任務.

  • 添加配置檔案 我們在classpath下添加hadoop和flink的配置檔案
flink實戰-實時計算平台通過api停止流任務背景案例詳解
  • 構造ApplicationId對象
Configuration flinkConfiguration = new Configuration();
  flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId);
  YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
  ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);

           

複制

  • 構造jobId
private static JobID parseJobId(String jobIdString) throws CliArgsException{
  if (jobIdString == null){
   throw new CliArgsException("Missing JobId");
  }

  final JobID jobId;
  try {
   jobId = JobID.fromHexString(jobIdString);
  } catch (IllegalArgumentException e){
   throw new CliArgsException(e.getMessage());
  }
  return jobId;
 }

           

複制

  • 停止任務 通過stopWithSavepoint方法來停止任務,如果savePoint沒指定的話,系統将會使用flink配置檔案中的state.savepoints.dir選項.
CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(
    jobID,
    true,
    savePoint);

  String savepoint = completableFuture.get();
  System.out.println(savepoint);
           

複制

最後執行完成之後,會傳回一個savepoint的位址,和指令行一樣,我們可以把這個位址存起來,以便我們後續從這個checkpoint啟動。

完整的代碼請參考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/StopYarnJob.java