文章目錄
- 01 引言
- 02 源碼分析
- 2.1 源碼入口
- 2.2 IOMetricsInfo
- 2.3 MutableIOMetrics
- 2.3 MetricFetcher
- 2.3.1 MetricFetcherImpl
- 2.4 MetricQueryServiceGateway
- 2.5 RpcEndpoint
- 2.6 MetricQueryService
- 2.7 MiniCluster
- 2.8 LocalExecutor
- 2.8 StreamExecutionEnviroment
- 03 小結
01 引言
附:Flink源碼下載下傳位址
在Flink的Web頁面,細心的話可以看到監控頁面裡,有任務的詳情,其中裡面有詳細的監控名額,如下圖(發送記錄數、接收記錄數、發送位元組數,接收位元組數等):
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SO1MTN3UTN2UDOlhDN4EjNyYzX0MTMyADMyIzLcRDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
很多時候,我們都需要 “取出這些資料”,并用在我們的需求上,那麼該如何取出這些資料呢?本文來分析下源碼。
02 源碼分析
2.1 源碼入口
在
Flink
的
web
頁面,按
F12
檢視源碼,可以看到:
- 接口位址:http://域名/jobs/ad75bbaaa624e41a249825a9820a65cc
Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等) - 響應内容:
{
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1650352652357,
"end-time": -1,
"duration": 64629227,
"maxParallelism": -1,
"now": 1650417281584,
"timestamps": {
"INITIALIZING": 1650352652357,
"FAILED": 0,
"CREATED": 1650352652449,
"RESTARTING": 0,
"FAILING": 0,
"FINISHED": 0,
"SUSPENDED": 0,
"RECONCILING": 0,
"CANCELLING": 0,
"CANCELED": 0,
"RUNNING": 1650352653087
},
"vertices": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1650352658363,
"end-time": -1,
"duration": 64623221,
"tasks": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}
],
"status-counts": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"plan": {
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"nodes": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"optimizer_properties": {
}
}
]
}
}
可以看到,
vertices.[0].metrics
下的内容就是本文要讀取的内容,如下圖:
Ctrl+H
全局搜尋
Flink
源碼,我們可能會想到先檢視接口 “
/jobs/{jobId}
”,其實這樣效率很低,最好的方法就是使用其 “特殊性”,比如,我們可以從傳回的字段
read-bytes
入手,發現定義的地方在
org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo
這個類:
好了,我們可以把
IOMetricsInfo
這個類當做我們的源碼分析入口。
2.2 IOMetricsInfo
在
IOMetricsInfo
,Ctrl+G檢視,可以看到這個類有多個地方被調用,其實真正的是被
JobDetailsHandler
調用了,其它的類字尾都是
Test
測試類,是以不作為分析的下一步,下面看看
JobDetailsHandler
。
在
JobDetailsHandler
,可以看到名額值是總
counts
裡擷取的,繼續看
counts
counts
在這裡指派了:
接下來,我們看看
MutableIOMetrics
這個類。
2.3 MutableIOMetrics
進入上一步指定的
MutableIOMetrics
裡的
addIOMetrics
方法,可以看到代碼根據程式的運作狀态,從不同的地方擷取名額值了:
- 終止狀态:從
擷取了名額值AccessExecution
- 運作狀态:從
擷取了名額值MetricFetcher
Flink 名額參數源碼解讀(讀取數量、發送數量、發送位元組數、接收位元組數等)
因為我們的程式是運作的,當然,我們需要研究
MetricFetcher
這個類裡面的值是怎麼拿到的。
2.3 MetricFetcher
fetcher:是抓取的意思,可以了解為取資料
我翻譯了
MetricFetcher
這個類的注釋,内容如下:
package org.apache.flink.runtime.rest.handler.legacy.metrics;
/**
* MetricFetcher可用于從JobManager和所有注冊的taskmanager中擷取名額。
* <p>
* 隻有在調用{@link MetricFetcher#update()}時名額才會被擷取,前提是自上次調用傳遞之後有足夠的時間。
*
* @author : YangLinWei
* @createTime: 2022/4/20 10:30 上午
* @version: 1.0.0
*/
public interface MetricFetcher {
/**
* 擷取{@link MetricStore},其中包含目前擷取的所有名額。
*
* @return {@link MetricStore} 包含的所有擷取的名額
*/
MetricStore getMetricStore();
/**
* 觸發擷取名額
*/
void update();
/**
* @return 最近一次更新的時間戳。
*/
long getLastUpdateTime();
}
繼續Ctrl+T檢視其實作:
可以看到有幾個實作類,毋庸置疑,
MetricFetcherImpl
是它真正的實作類,看看裡面的代碼。
2.3.1 MetricFetcherImpl
MetricFetcherImpl
裡面有幾個方法,如下:
我們需要知道這些名額從何而來?裡面的代碼不多,大部分都不是我們需要的,經一番閱讀,可以知道,名額是從
queryMetrics
這個方法裡擷取。看看這個方法的代碼:
/**
* Query the metrics from the given QueryServiceGateway.
*
* @param queryServiceGateway to query for metrics
*/
private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
queryServiceGateway
.queryMetrics(timeout)
.whenCompleteAsync(
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
if (t != null) {
LOG.debug("Fetching metrics failed.", t);
} else {
metrics.addAll(deserializer.deserialize(result));
}
},
executor);
}
是以,代碼追蹤了這麼久,發現名額是從網關(
MetricQueryServiceGateway
)裡調接口去擷取的,是以我們需要看源碼這個網關接口(
queryMetrics
)的代碼實作。
2.4 MetricQueryServiceGateway
從上一步,可以知道調用了
MetricQueryServiceGateway
的
queryMetrics
接口,具體的實作
MetricQueryService
類的
queryMetrics
方法,代碼如下:
@Override
public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(
Time timeout) {
return callAsync(
() -> enforceSizeLimit(serializer.serialize(counters, gauges, histograms, meters)),
timeout);
}
再看看
callAsync
方法:
可以得知,本質就是使用了
rpcServer
去遠端調用了接口擷取名額了(具體調用了哪裡呢?)。
我們看看
RpcEndpoint
這個類。
2.5 RpcEndpoint
我們看看
RpcEndpoint
這個類的方法結構:
從這些方法名,我們可以知道,它類似于一個
HTTP
伺服器,進而我們也可以知道,原來
Flink
的
Web
頁面通路的伺服器就是這個了。在看看其構造方法:
看看裡面是怎麼開啟服務的:
可以知道,是調用了
AkkaRpcService
的
startServer
方法去開啟了服務。
好了,這裡暫時該停止了,因為偏離了本文的中心,我們需要知道的是這些名額具體從哪裡來的?那該如何進行下一步呢?
我們再回到
2.4
裡面的
MetricQueryService
類,看看這個類是如何構造的?(這裡前後連貫性很強)。
2.6 MetricQueryService
可以看到
MetricQueryService
這個類裡面有一個
createMetricQueryService
方法,這個方法指的就是建立名額查詢服務:
看看在哪裡調用了這個方法:
可以在名額服務注冊中心(
MetricRegistryImpl
)裡面的
startQueryService
方法調用了,再看看哪裡調用了
startQueryService
這個方法:
可以看到有3個地方開啟了這個名額的服務,分别是:
- ClusterEntrypoint:
叢集入口點的基類Flink
- MiniCluster:
在本地執行MiniCluster
任務Flink
- TaskManagerRunner:在
或yarn
模式下,這個類是任務管理器的可執行入口點。它建構相關元件(網絡、standalone
管理器、記憶體管理器、I/O
服務、RPC
服務)并啟動。HA
為了友善了解,這裡解讀本地執行
Flink
任務的模式就好了,即繼續研讀
MiniCluster
。
2.7 MiniCluster
在
MiniCluster
的
start()
方法,可以看到了調用了
startQueryService
方法
繼續看看裡面的
metricQueryServiceRpcService
入參,可以知道,
metricQueryServiceRpcService
(名額查詢服務)是從配置裡初始化來的。
繼續看看
configuration
配置:
可以得知,配置是從
miniClusterConfiguration
裡擷取的,繼續深入:
發現,配置是從構造函數裡擷取的,繼續看看哪裡調用了
MiniCluster
這個類的構造函數方法:
調用這個方法的類有很多,根據命名,可以得知較為合理的是
LocalExecutor
這個類。
2.8 LocalExecutor
我對LocalExecutor的了解:一個用于執行本地
Pipelines
(例如:多條
FlinkSQL
)的執行器。
看看哪裡調用了
MiniCluster
的構造方法:
繼續看看哪裡調用了
create
方法,可以得知在
LocalExecutorFactory
裡的getExecutor方法調用了:
Ctrl+T
,可以看到在
ExecutionEnviroment
和
StreamExecutionEnviroment
裡調用了:
哦豁,這不是我們日常做
Flink
開發常用的兩個類了麼。随便打開
StreamExecutionEnviroment
這個類看看。
2.8 StreamExecutionEnviroment
可以看到,在裡面的
executeAsync
方法代用了:
到這裡,我們知道了配置是從使用者初始化
StreamExecutionEnviroment
傳入的。
03 小結
具體名額的參數從哪裡擷取,我們有了一個很好的分析思路了,我們可以自己編寫一個Flink的程式,使用的是
StreamExecutionEnviroment
,然後斷點本文的源碼,就知道來龍去脈了。