天天看點

如何将實時計算 Flink 與自身環境打通

作者 | 張鵬(七器),阿裡巴巴開發工程師

本篇内容将向大家介紹如何将實時計算 Flink 與其他系統打通。介紹内容包含四個部分,分别是:

1、Jar的存儲與使用;

2、實時計算 Flink 如何與一些典型資料源進行互動;

3、如何将VVP平台上 Flink的名額打入Metrics外部系統;

4、如何将VVP平台上運作的 Flink作業日志打入到外部系統。

一、運作作業的Jar如何存儲在OSS上

在VVP平台有兩種方法可以上傳作業的jar。

方法一,借助VVP提供的資源上傳功能,可以直接使用這個功能對Jar進行上傳目前該功能支援200兆以内的Jar包上傳。使用時,直接在建立作業的時候選擇上傳的jar包就可以了,示範如下:

● 進入到VVP平台,點選左側資源上傳功能,然後在打開頁面點選右上角的上傳資源,選擇要上傳的Jar包,完成上傳;

如何将實時計算 Flink 與自身環境打通

● 上傳成功後,點選左側建立作業,完善作業名等資訊。在Jar URI欄,下拉選擇剛剛上傳的Jar包,點選确定完成建立作業,然後啟動即可使用。

如何将實時計算 Flink 與自身環境打通

方法二,直接在OSS的控制台上面,将要使用的Jar上傳上去,然後使用OSS是提供的Jar連結來行使用。使用的時候也比較簡單,直接使用OSS提供的Jar連結,示範如下:

● 打開OSS控制台,選擇在建立VVP時候使用的Bucket,再選擇目錄,點選上傳檔案,上傳時可以将它的權限設定為公共讀,點選上傳檔案即完成;

● 使用時,OSS控制台上點選已上傳包右側的“詳情”,擷取該Jar包的URL連結。

如何将實時計算 Flink 與自身環境打通

● 建立作業時,将jar包的URL的連結填入Jar URI,如下圖所示:

如何将實時計算 Flink 與自身環境打通

需要注意,OSS詳情頁面提供的連結是公網通路的,開通的VVP并不能直接通路公網,是以在建立作業使用HTTPS的時候,需要使用VPC通路的endpoint(例如:

https://vvp-training.oss-cn-shanghai-internal.aliyuncs.com/artifacts/namespaces/vvp-training/WordCount.jar),這樣才能正常的啟動作業

如果想用公網擷取一個HTTPS的連結,怎麼操作呢?可以首先對VVP進行公網打通,打通的操作流程可以參考阿裡雲幫助文檔中的《Flink 全托管叢集如何通路公網》(

https://help.aliyun.com/document_detail/174840.html),簡單來說步驟如下

● 首先,建立一個NAT網關。建立時選擇“組合購買ERP”,然後選擇區域并補充名稱等資訊,然後綁定彈性公網IP,完成建立;

● 其次,建立SNAT條目。建立好NAT之後,點選“建立SNAT條目”,在彈窗選擇交換機并補充名稱資訊,完成建立。

如何将實時計算 Flink 與自身環境打通

完成上述兩個步驟,該VVP執行個體就已經打通公網,在建立Deployment時就可以直接使用https公網可通路的jar包了。

二、在VVP平台上 Flink 如何與典型資料源進行互動

這部介紹如何通過SQL以及connectors與外部的一些資料存儲系統進行互動,以SLS,Kafka作為資料源讀寫資料為例。

如何将實時計算 Flink 與自身環境打通

(實操示範)點選SQL編輯器,建立一個Datagen Table,它是用于資料的随機生成的,然後點選運作。然後再點選生成一個SLS Table,補充所需參數資訊,然後點選建立完成。

如何将實時計算 Flink 與自身環境打通

建立完成後,寫入SQL語句,比如insert into sls select id, name from datagen,然後另存後點選運作,建立Deployment并啟動。

如何将實時計算 Flink 與自身環境打通

當作業成功運作後,在SLS上查詢資料。如下圖所示,說明datagen已經生成資料并成功寫入SLS。

如何将實時計算 Flink 與自身環境打通

類似的,我們可以按照上面的步驟從SLS讀資料然後寫入Kafka:

● 在vvp的sql編輯器頁面建立一個Kafka table

● 用SQL文法從SLS讀取資料寫入Kafka中并啟動

● 作業運作成功後,即開始從SLS讀資料寫入Kafka中

三、如何将VVP平台上 Flink的名額打入外部Metrics系統

接下介紹如果想把運作作業的名額放入到一些系統當中去,并進行名額觀測。VVP提供了兩種方法:

方法一,VVP預設的将 Flink 作業名額打入到arms,不需要額外的處理,直接運作作業之後,就能通過名額按鈕看到,如下圖所示:

如何将實時計算 Flink 與自身環境打通

方法二,如果自己有名額系統,想把 Flink 的作業名額打入到自己的系統裡,主要有兩點:首先保證VVP上作業與自己名額系統網絡的連通性;其次在 Flink conf 中配置好相應的metrics reporter。如下圖所示,在建立作業過程中,進行metric配置(metrics reporters配置參考:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html

):

如何将實時計算 Flink 與自身環境打通

例:使用premetheus的pushGateway方式,是以reporter class就選擇org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter。按上圖所示配置pushGateway的port和host,Metric reporter就配置完成了。作業啟動成功後在配置好的grafana大盤上檢視名額,如下例所示。

如何将實時計算 Flink 與自身環境打通

四、如何将Flink作業日志打入到外部系統

如果在作業運作中,突然運作失敗,我們想要檢視運作失敗作業的日志,就需要把 Flink 作業的日志儲存下來。在VVP平台為這個目的提供了兩種方案,将Logs寫入OSS中或SLS中,簡單來說,在建立作業的時候, 在Log配置項裡面配置一些Log參數。

如何将實時計算 Flink 與自身環境打通

配置參考文檔:

https://help.aliyun.com/document_detail/173646.html

方法一,将日志寫入OSS中。在建立作業的時候,在進階配置中的Log配置裡,選擇使用使用者自定義,然後将(幫助文檔)裡面的配置放在自定義的配置中去,再将一些參數換成OSS的必要參數就可以了。

需要檢視日志時,可以通過幫助文檔的指導,找到日志存放的檔案,然後點選下載下傳檢視。

如何将實時計算 Flink 與自身環境打通
如何将實時計算 Flink 與自身環境打通

方法二,将日志寫入SLS中。與方法一類似,隻是LOG配置項稍有差異;下載下傳和檢視方法與方法一一緻。