天天看點

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

文章目錄

  • ​​01 引言​​
  • ​​02 源碼分析​​
  • ​​2.1 源碼入口​​
  • ​​2.2 IOMetricsInfo​​
  • ​​2.3 MutableIOMetrics​​
  • ​​2.3 MetricFetcher​​
  • ​​2.3.1 MetricFetcherImpl​​
  • ​​2.4 MetricQueryServiceGateway​​
  • ​​2.5 RpcEndpoint​​
  • ​​2.6 MetricQueryService​​
  • ​​2.7 MiniCluster​​
  • ​​2.8 LocalExecutor​​
  • ​​2.8 StreamExecutionEnviroment​​
  • ​​03 小結​​

01 引言

附:Flink源碼下載下傳位址

在Flink的Web頁面,細心的話可以看到監控頁面裡,有任務的詳情,其中裡面有詳細的監控名額,如下圖(發送記錄數、接收記錄數、發送位元組數,接收位元組數等):

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

很多時候,我們都需要 “取出這些資料”,并用在我們的需求上,那麼該如何取出這些資料呢?本文來分析下源碼。

02 源碼分析

2.1 源碼入口

在​

​Flink​

​​的​

​web​

​​頁面,按​

​F12​

​檢視源碼,可以看到:

  • 接口位址:​​http://域名/jobs/ad75bbaaa624e41a249825a9820a65cc​​​
    Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)
  • 響應内容:
{
  "jid": "ad75bbaaa624e41a249825a9820a65cc",
  "name": "insert-into_default_catalog.default_database.t_student_copy",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1650352652357,
  "end-time": -1,
  "duration": 64629227,
  "maxParallelism": -1,
  "now": 1650417281584,
  "timestamps": {
    "INITIALIZING": 1650352652357,
    "FAILED": 0,
    "CREATED": 1650352652449,
    "RESTARTING": 0,
    "FAILING": 0,
    "FINISHED": 0,
    "SUSPENDED": 0,
    "RECONCILING": 0,
    "CANCELLING": 0,
    "CANCELED": 0,
    "RUNNING": 1650352653087
  },
  "vertices": [
    {
      "id": "cbc357ccb763df2852fee8c4fc7d55f2",
      "name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
      "maxParallelism": 128,
      "parallelism": 1,
      "status": "RUNNING",
      "start-time": 1650352658363,
      "end-time": -1,
      "duration": 64623221,
      "tasks": {
        "CREATED": 0,
        "CANCELING": 0,
        "INITIALIZING": 0,
        "RECONCILING": 0,
        "CANCELED": 0,
        "RUNNING": 1,
        "DEPLOYING": 0,
        "FINISHED": 0,
        "FAILED": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 0,
        "read-bytes-complete": true,
        "write-bytes": 0,
        "write-bytes-complete": true,
        "read-records": 0,
        "read-records-complete": true,
        "write-records": 0,
        "write-records-complete": true
      }
    }
  ],
  "status-counts": {
    "CREATED": 0,
    "CANCELING": 0,
    "INITIALIZING": 0,
    "RECONCILING": 0,
    "CANCELED": 0,
    "RUNNING": 1,
    "DEPLOYING": 0,
    "FINISHED": 0,
    "FAILED": 0,
    "SCHEDULED": 0
  },
  "plan": {
    "jid": "ad75bbaaa624e41a249825a9820a65cc",
    "name": "insert-into_default_catalog.default_database.t_student_copy",
    "nodes": [
      {
        "id": "cbc357ccb763df2852fee8c4fc7d55f2",
        "parallelism": 1,
        "operator": "",
        "operator_strategy": "",
        "description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
        "optimizer_properties": {
          
        }
      }
    ]
  }
}      

可以看到,​

​vertices.[0].metrics​

​下的内容就是本文要讀取的内容,如下圖:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

​Ctrl+H​

​全局搜尋​

​Flink​

​源碼,我們可能會想到先檢視接口 “​

​/jobs/{jobId}​

​”,其實這樣效率很低,最好的方法就是使用其 “特殊性”,比如,我們可以從傳回的字段​

​read-bytes​

​入手,發現定義的地方在​

​org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo​

​ 這個類:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

好了,我們可以把​

​IOMetricsInfo​

​這個類當做我們的源碼分析入口。

2.2 IOMetricsInfo

在​

​IOMetricsInfo​

​,Ctrl+G檢視,可以看到這個類有多個地方被調用,其實真正的是被​

​JobDetailsHandler​

​調用了,其它的類字尾都是​

​Test​

​測試類,是以不作為分析的下一步,下面看看​

​JobDetailsHandler​

​。

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

在​

​JobDetailsHandler​

​,可以看到名額值是總​

​counts​

​裡擷取的,繼續看​

​counts​

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

​counts​

​在這裡指派了:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

接下來,我們看看​

​MutableIOMetrics​

​這個類。

2.3 MutableIOMetrics

進入上一步指定的​

​MutableIOMetrics​

​​裡的​

​addIOMetrics​

​方法,可以看到代碼根據程式的運作狀态,從不同的地方擷取名額值了:

  • 終止狀态:從​

    ​AccessExecution​

    ​擷取了名額值
  • 運作狀态:從​

    ​MetricFetcher​

    ​​擷取了名額值
    Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

因為我們的程式是運作的,當然,我們需要研究​

​MetricFetcher​

​這個類裡面的值是怎麼拿到的。

2.3 MetricFetcher

fetcher:是抓取的意思,可以了解為取資料

我翻譯了​

​MetricFetcher​

​這個類的注釋,内容如下:

package org.apache.flink.runtime.rest.handler.legacy.metrics;


/**
 * MetricFetcher可用于從JobManager和所有注冊的taskmanager中擷取名額。
 * <p>
 * 隻有在調用{@link MetricFetcher#update()}時名額才會被擷取,前提是自上次調用傳遞之後有足夠的時間。
 *
 * @author : YangLinWei
 * @createTime: 2022/4/20 10:30 上午
 * @version: 1.0.0
 */
public interface MetricFetcher {

    /**
     * 擷取{@link MetricStore},其中包含目前擷取的所有名額。
     *
     * @return {@link MetricStore} 包含的所有擷取的名額
     */
    MetricStore getMetricStore();

    /**
     * 觸發擷取名額
     */
    void update();

    /**
     * @return 最近一次更新的時間戳。
     */
    long getLastUpdateTime();
}      

繼續Ctrl+T檢視其實作:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

可以看到有幾個實作類,毋庸置疑,​

​MetricFetcherImpl​

​是它真正的實作類,看看裡面的代碼。

2.3.1 MetricFetcherImpl

​MetricFetcherImpl​

​裡面有幾個方法,如下:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

我們需要知道這些名額從何而來?裡面的代碼不多,大部分都不是我們需要的,經一番閱讀,可以知道,名額是從​

​queryMetrics​

​這個方法裡擷取。看看這個方法的代碼:

/**
     * Query the metrics from the given QueryServiceGateway.
     *
     * @param queryServiceGateway to query for metrics
     */
    private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
        LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());

        queryServiceGateway
                .queryMetrics(timeout)
                .whenCompleteAsync(
                        (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
                            if (t != null) {
                                LOG.debug("Fetching metrics failed.", t);
                            } else {
                                metrics.addAll(deserializer.deserialize(result));
                            }
                        },
                        executor);
    }      

是以,代碼追蹤了這麼久,發現名額是從網關(​

​MetricQueryServiceGateway​

​​)裡調接口去擷取的,是以我們需要看源碼這個網關接口(​

​queryMetrics​

​)的代碼實作。

2.4 MetricQueryServiceGateway

從上一步,可以知道調用了​

​MetricQueryServiceGateway​

​​的​

​queryMetrics​

​​接口,具體的實作​

​MetricQueryService​

​​類的​

​queryMetrics ​

​方法,代碼如下:

@Override
public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(
        Time timeout) {
    return callAsync(
            () -> enforceSizeLimit(serializer.serialize(counters, gauges, histograms, meters)),
            timeout);
}      

再看看​

​callAsync​

​方法:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

可以得知,本質就是使用了​

​rpcServer​

​去遠端調用了接口擷取名額了(具體調用了哪裡呢?)。

我們看看​

​RpcEndpoint​

​這個類。

2.5 RpcEndpoint

我們看看​

​RpcEndpoint​

​這個類的方法結構:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

從這些方法名,我們可以知道,它類似于一個​

​HTTP​

​伺服器,進而我們也可以知道,原來​

​Flink​

​的​

​Web​

​頁面通路的伺服器就是這個了。在看看其構造方法:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

看看裡面是怎麼開啟服務的:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

可以知道,是調用了​

​AkkaRpcService​

​的​

​startServer​

​方法去開啟了服務。

好了,這裡暫時該停止了,因為偏離了本文的中心,我們需要知道的是這些名額具體從哪裡來的?那該如何進行下一步呢?

我們再回到​

​2.4​

​​裡面的​

​MetricQueryService​

​類,看看這個類是如何構造的?(這裡前後連貫性很強)。

2.6 MetricQueryService

可以看到​

​MetricQueryService​

​這個類裡面有一個​

​createMetricQueryService​

​方法,這個方法指的就是建立名額查詢服務:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

看看在哪裡調用了這個方法:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

可以在名額服務注冊中心(​

​MetricRegistryImpl​

​)裡面的​

​startQueryService​

​方法調用了,再看看哪裡調用了​

​startQueryService​

​這個方法:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

可以看到有3個地方開啟了這個名額的服務,分别是:

  • ClusterEntrypoint:​

    ​Flink​

    ​叢集入口點的基類
  • MiniCluster:​

    ​MiniCluster​

    ​​在本地執行​

    ​Flink​

    ​任務
  • TaskManagerRunner:在​

    ​yarn​

    ​​或​

    ​standalone​

    ​​模式下,這個類是任務管理器的可執行入口點。它建構相關元件(網絡、​

    ​I/O​

    ​​管理器、記憶體管理器、​

    ​RPC​

    ​​服務、​

    ​HA​

    ​服務)并啟動。

為了友善了解,這裡解讀本地執行​

​Flink​

​​任務的模式就好了,即繼續研讀​

​MiniCluster​

​。

2.7 MiniCluster

在​

​MiniCluster​

​的​

​start()​

​方法,可以看到了調用了​

​startQueryService​

​方法

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

繼續看看裡面的​

​metricQueryServiceRpcService​

​入參,可以知道,​

​metricQueryServiceRpcService​

​(名額查詢服務)是從配置裡初始化來的。

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

繼續看看​

​configuration​

​配置:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

可以得知,配置是從​

​miniClusterConfiguration​

​裡擷取的,繼續深入:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

發現,配置是從構造函數裡擷取的,繼續看看哪裡調用了​

​MiniCluster​

​這個類的構造函數方法:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

調用這個方法的類有很多,根據命名,可以得知較為合理的是​

​LocalExecutor​

​這個類。

2.8 LocalExecutor

我對LocalExecutor的了解:一個用于執行本地​

​Pipelines​

​​(例如:多條​

​FlinkSQL​

​)的執行器。

看看哪裡調用了​

​MiniCluster​

​的構造方法:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

繼續看看哪裡調用了​

​create​

​方法,可以得知在​

​LocalExecutorFactory​

​裡的getExecutor方法調用了:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

​Ctrl+T​

​,可以看到在​

​ExecutionEnviroment​

​和​

​StreamExecutionEnviroment​

​裡調用了:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

哦豁,這不是我們日常做​

​Flink​

​​開發常用的兩個類了麼。随便打開​

​StreamExecutionEnviroment​

​這個類看看。

2.8 StreamExecutionEnviroment

可以看到,在裡面的​

​executeAsync​

​方法代用了:

Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)

到這裡,我們知道了配置是從使用者初始化​

​StreamExecutionEnviroment​

​傳入的。

03 小結

具體名額的參數從哪裡擷取,我們有了一個很好的分析思路了,我們可以自己編寫一個Flink的程式,使用的是​

​StreamExecutionEnviroment​

​,然後斷點本文的源碼,就知道來龍去脈了。