開發者學堂課程【資料湖 JindoFS + OSS 實操幹貨36講:Flink 高效 sink 寫入 oss】學習筆記,與課程緊密聯系,讓使用者快速學習知識。
課程位址:
https://developer.aliyun.com/learning/course/833/detail/13967Flink 高效 sink 寫入 oss
内容介紹
一.背景介紹
二.功能介紹
三.如何配置
四.如何使用
一、背景介紹
Apache Flink 簡介
Apache Flink 是新一代大資料計算引擎的代表,以分布式流計算為核心,同時支援批處理。
特點:
低延時: Flink 流式計算可以做到亞秒甚至毫秒級延時,相比之下Spa流計算很難達到毫秒級
高吞吐: Fink 以分布式快照算法實作容錯對吞吐量的影響很小
高容錯:基于分布式快照算法,Fink實作了低代價、高效的容錯表現,以及 Exactly Once 語義保證。
JindoFS Flink Connector 産生背景
阿裡雲對象存儲 Object Storage Service(oss):
海量:無限容量,彈性伸縮
安全:12個9的資料安全性,多種加密方式
低成本:遠低于雲磁盤,且有多種存儲方式、生命周期管理等節約成本
高可靠:服務可用性99.9
已服務于海量使用者
二、功能介紹
Flink 應用廣泛:
流計算領域業内主要解決方案
Apache 基金會最活躍項目之一
未來:流批一體、線上分析
Flink 使用痛點:
開源 Apache Flink 尚不支援直接寫入OSS
Hadoop OSS SDK 寫入性能不一定滿足需求
JindoFS Flink Connector 介紹
整體架構:
1.兩階段 Checkpoint (檢查點)機制:
第一階段 MPU (MultiPartUpload,分片上傳)寫入 OSS
第二階段 MPU 送出
2.Recoverable Writer 可恢複性寫入:
臨時檔案以普通檔案格式上傳 OSS
Sink節點狀态快照
寫入 OSS vS. 寫入亞馬遜 S3:
Native 實作:資料寫入以C++代碼實作,相比 java 更高效
高速讀寫:多線程讀寫臨時檔案,對大于1的檔案優勢尤其明顯
資料緩存:讀寫 OSS 實作本地緩存,加速外部通路
OSs 通路加速, JindoFs 提供新支援

三、如何配置JindoFS Flink Connector
1.環境要求
叢集上有開源版本 Flink軟體,版本不低于1.10.1
2.SDK 配置:
下載下傳所需 SDK 檔案:
jindo-flink-sink-S{(version)} jar
jindofs-sdk-S{version} jar
下載下傳連結( Github ):
@https://github.com/aliyun/alibabacloud-jin/blob/master/docs/ jindofs sdk_ download.md
将兩個jar放置于叢集 Flink 目錄下 lib 檔案夾
-Flink 根目錄通常可由 SFLINKHOME 環境變量擷取
-叢集所有節點均需配置
Java SPI:自動加載資源,無需額外配置
文檔連結(Github):
https://github.com/aliyun/alibabacloud-jing/blob/master/docs/flink/jindofs_sdkon_flink_for_oss.md
四、在程式中使用 JindoFS Flink Connector
確定叢集能夠通路 OSS Bucket
1. 前提:已購買 Oss 産品,Oss 網站連結 . OSS OSS :
https://www.aliyun.com/product/oss2.確定能夠通路 OSS Bucket,例如正确配置密鑰或免密服務等
使用合适的路徑,流式寫入 OSS Bucket
寫入 ss 須使用 os 字首路徑,類似于
ossuser-bucket->user--defined--sink-dir>
更多優化!用 JindoFS SDK加速OSS通路,參考:
https://github. com/aliyun/alibabacloud-jir/blob/master/docs/jindofs_sdk_vs_hadoop_sdk.md
在程式中使用 JindoFS Flink Connector:Java
在程式中開啟 Flink Checkpoint
前提:使用可重發的資料源,如 Kafka
通過 StreamExecutionEnvironment 對象打開 Checkpoint (示例)
建立:
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEr
打開:
envenableCheckpointing(
示例程式
下文中, outputStream 是一個預先形成的 DataStream 對象,若需寫入 ss,則可以這樣添加sink:String outputPath ="oss:///":
StreamingFileSinksink= StreamingF. forRowformat
new Path(outputPath)
new SimpleStringEncoder("UTF-8)
)buildo:
outputStream.addSink(sink)
上述程式指定将 outputStream 中的 String 内容寫入 ss 路徑 oss //user-bucket-/user--defined--sink-dir>
最後還需用 envexecute 語句執行 Flin 作業,env 是已建立的 StreamExecutionEnviro 對象
最後,将 ava 作業打包為 jar 檔案,并用 flink run在叢集送出即可。
在程式中使用 JindoFS Flink Connector:更多配置
使用者通過 flink run 送出 java 或 pyflink 程式時可以額外自定義一些參數,格式
flink run-m yarn-cluster-yD key1= valuel-yD key2=value2
目前支援“熵注入”及“分片上傳并行度”兩項配置
熵注入 (entropy injection):
1.功能:将寫入路徑的一段特定字元串比對出來,用一段随機的字元串進行替換
2.效果:削弱所謂片區 (sharding) 效應,提高寫入效率
3.配置參數:
oss.entropy.key=
oss.entropy length=
分片上傳并行度
配置參數: oss upload.max. concurrentuploads
預設值:目前可用的處理器數量