作者 | 張鵬(七器),阿裡巴巴開發工程師
本篇内容将向大家介紹如何将實時計算 Flink 與其他系統打通。介紹内容包含四個部分,分别是:
1、Jar的存儲與使用;
2、實時計算 Flink 如何與一些典型資料源進行互動;
3、如何将VVP平台上 Flink的名額打入Metrics外部系統;
4、如何将VVP平台上運作的 Flink作業日志打入到外部系統。
一、運作作業的Jar如何存儲在OSS上
在VVP平台有兩種方法可以上傳作業的jar。
方法一,借助VVP提供的資源上傳功能,可以直接使用這個功能對Jar進行上傳目前該功能支援200兆以内的Jar包上傳。使用時,直接在建立作業的時候選擇上傳的jar包就可以了,示範如下:
● 進入到VVP平台,點選左側資源上傳功能,然後在打開頁面點選右上角的上傳資源,選擇要上傳的Jar包,完成上傳;

● 上傳成功後,點選左側建立作業,完善作業名等資訊。在Jar URI欄,下拉選擇剛剛上傳的Jar包,點選确定完成建立作業,然後啟動即可使用。
方法二,直接在OSS的控制台上面,将要使用的Jar上傳上去,然後使用OSS是提供的Jar連結來行使用。使用的時候也比較簡單,直接使用OSS提供的Jar連結,示範如下:
● 打開OSS控制台,選擇在建立VVP時候使用的Bucket,再選擇目錄,點選上傳檔案,上傳時可以将它的權限設定為公共讀,點選上傳檔案即完成;
● 使用時,OSS控制台上點選已上傳包右側的“詳情”,擷取該Jar包的URL連結。
● 建立作業時,将jar包的URL的連結填入Jar URI,如下圖所示:
需要注意,OSS詳情頁面提供的連結是公網通路的,開通的VVP并不能直接通路公網,是以在建立作業使用HTTPS的時候,需要使用VPC通路的endpoint(例如:
https://vvp-training.oss-cn-shanghai-internal.aliyuncs.com/artifacts/namespaces/vvp-training/WordCount.jar),這樣才能正常的啟動作業。
如果想用公網擷取一個HTTPS的連結,怎麼操作呢?可以首先對VVP進行公網打通,打通的操作流程可以參考阿裡雲幫助文檔中的《Flink 全托管叢集如何通路公網》(
https://help.aliyun.com/document_detail/174840.html),簡單來說步驟如下:
● 首先,建立一個NAT網關。建立時選擇“組合購買ERP”,然後選擇區域并補充名稱等資訊,然後綁定彈性公網IP,完成建立;
● 其次,建立SNAT條目。建立好NAT之後,點選“建立SNAT條目”,在彈窗選擇交換機并補充名稱資訊,完成建立。
完成上述兩個步驟,該VVP執行個體就已經打通公網,在建立Deployment時就可以直接使用https公網可通路的jar包了。
二、在VVP平台上 Flink 如何與典型資料源進行互動
這部介紹如何通過SQL以及connectors與外部的一些資料存儲系統進行互動,以SLS,Kafka作為資料源讀寫資料為例。
(實操示範)點選SQL編輯器,建立一個Datagen Table,它是用于資料的随機生成的,然後點選運作。然後再點選生成一個SLS Table,補充所需參數資訊,然後點選建立完成。
建立完成後,寫入SQL語句,比如insert into sls select id, name from datagen,然後另存後點選運作,建立Deployment并啟動。
當作業成功運作後,在SLS上查詢資料。如下圖所示,說明datagen已經生成資料并成功寫入SLS。
類似的,我們可以按照上面的步驟從SLS讀資料然後寫入Kafka:
● 在vvp的sql編輯器頁面建立一個Kafka table
● 用SQL文法從SLS讀取資料寫入Kafka中并啟動
● 作業運作成功後,即開始從SLS讀資料寫入Kafka中
三、如何将VVP平台上 Flink的名額打入外部Metrics系統
接下介紹如果想把運作作業的名額放入到一些系統當中去,并進行名額觀測。VVP提供了兩種方法:
方法一,VVP預設的将 Flink 作業名額打入到arms,不需要額外的處理,直接運作作業之後,就能通過名額按鈕看到,如下圖所示:
方法二,如果自己有名額系統,想把 Flink 的作業名額打入到自己的系統裡,主要有兩點:首先保證VVP上作業與自己名額系統網絡的連通性;其次在 Flink conf 中配置好相應的metrics reporter。如下圖所示,在建立作業過程中,進行metric配置(metrics reporters配置參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html):
例:使用premetheus的pushGateway方式,是以reporter class就選擇org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter。按上圖所示配置pushGateway的port和host,Metric reporter就配置完成了。作業啟動成功後在配置好的grafana大盤上檢視名額,如下例所示。
四、如何将Flink作業日志打入到外部系統
如果在作業運作中,突然運作失敗,我們想要檢視運作失敗作業的日志,就需要把 Flink 作業的日志儲存下來。在VVP平台為這個目的提供了兩種方案,将Logs寫入OSS中或SLS中,簡單來說,在建立作業的時候, 在Log配置項裡面配置一些Log參數。
配置參考文檔:
https://help.aliyun.com/document_detail/173646.html方法一,将日志寫入OSS中。在建立作業的時候,在進階配置中的Log配置裡,選擇使用使用者自定義,然後将(幫助文檔)裡面的配置放在自定義的配置中去,再将一些參數換成OSS的必要參數就可以了。
需要檢視日志時,可以通過幫助文檔的指導,找到日志存放的檔案,然後點選下載下傳檢視。
方法二,将日志寫入SLS中。與方法一類似,隻是LOG配置項稍有差異;下載下傳和檢視方法與方法一一緻。