本期導讀 :【OSS 通路加速】第七講
主題:Flink 高效 sink 寫入 OSS
講師:重湖,阿裡巴巴計算平台事業部 EMR 進階工程師
内容架構:
- 背景介紹
- 功能介紹
- 如何配置
- 如何使用
直播回放連結:(7/8講)
https://developer.aliyun.com/live/246851一、背景介紹
Apache Flink 簡介
Apache Flink 是新一代大資料計算引擎的代表,以分布式流計算為核心,同時支援批處理。特點:
- 低延時:Flink 流式計算可以做到亞秒甚至毫秒級延時,相比之下 Spark 流計算很難達到秒級
- 高吞吐:Flink 以分布式快照算法實作容錯,對吞吐量的影響很小
- 高容錯:基于分布式快照算法,Flink 實作了低代價、高效的容錯表現,以及 Exactly_Once 語義保證

JindoFS Flink Connector 産生背景
阿裡雲對象存儲 Object Storage Service(OSS):
- 海量:無限容量,彈性伸縮
- 安全:12個9的資料安全性,多種加密方式
- 低成本:遠低于雲磁盤,且有多種存儲方式、生命周期管理等節約成本
- 高可靠:服務可用性 99.9%
- 已服務于海量使用者
Flink 應用廣泛:
- 流計算領域業内主要解決方案
- Apache 基金會最活躍項目之一
- 未來:流批一體、線上分析
Flink 使用痛點:
- 開源 ApacheFlink 尚不支援直接寫入 OSS
- Hadoop OSS SDK 寫入性能不一定滿足需求
JindoFS Flink Connector 介紹
整體架構:
兩階段 Checkpoint (檢查點) 機制:
- 第一階段 MPU (MultiPartUpload,分片上傳) 寫入 OSS
- 第二階段 MPU 送出
Recoverable Writer 可恢複性寫入:
- 臨時檔案以普通檔案格式上傳 OSS
- Sink 節點狀态快照
寫入 OSS vs. 寫入 亞馬遜S3:
- Native 實作:資料寫入以 C++ 代碼實作,相比 Java 更高效
- 高速讀寫:多線程讀寫臨時檔案,對大于1MB的檔案優勢尤其明顯
- 資料緩存:讀寫 OSS 實作本地緩存,加速外部通路
OSS 通路加速,JindoFS 提供新支援
二、如何配置
如何配置 JindoFS Flink Connector
環境要求:
- 叢集上有開源版本 Flink 軟體,版本不低于1.10.1
SDK 配置:
下載下傳所需 SDK 檔案:
- jindo-flink-sink-${version}.jar
- jindofs-sdk-${version}.jar
- 下載下傳連結⭐ (Github): https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_download.md
将兩個 jar 放置于叢集 Flink 目錄下 lib 檔案夾:
- Flink 根目錄通常可由 $FLINK_HOME 環境變量擷取
- 叢集所有節點均需配置
Java SPI:自動加載資源,無需額外配置
⭐文檔連結(Github):
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flink/jindofs_sdk_on_flink_for_oss.md在程式中使用 JindoFS Flink Connector
確定叢集能夠通路 OSS Bucket
- 前提:已購買 OSS 産品,OSS 網站連結: https://www.aliyun.com/product/oss
- 確定能夠通路 OSS Bucket,例如正确配置密鑰或免密服務等
使用合适的路徑,流式寫入OSS Bucket
- 寫入 OSS 須使用 oss:// 字首路徑,類似于:
oss://<user-bucket>/<user-defined-sink-dir>
更多優化!用 JindoFS SDK 加速 OSS 通路,參考
⭐Github:
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_vs_hadoop_sdk.md在程式中使用 JindoFS Flink Connector:Java
在程式中開啟 Flink Checkpoint
- 前提:使用可重發的資料源,如 Kafka
- 通過 StreamExecutionEnvironment 對象打開 Checkpoint(示例):
建立:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
打開:
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
示例程式
- 下文中,outputStream 是一個預先形成的 DataStream 對象,若需寫入 OSS,則可以這樣添加 sink:
String outputPath = "oss://<user-bucket>/<user-defined-sink-dir>";
StreamingFileSink<String> sink= StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<String>("UTF-8")
).build();
outputStream.addSink(sink);
- 上述程式指定将 outputStream 中的String 内容寫入 OSS 路徑 oss:///,最後還需用 env.execute() 語句執行 Flink 作業,env 是已建立的 StreamExecutionEnvironment 對象
- 最後,将 Java 作業打包為 jar 檔案,并用 flink run 在叢集送出即可
在程式中使用 JindoFS Flink Connector:Pyflink
與Java 示例類似,在 Pyflink 中使用 JindoFS Flink Connector 與寫入 HDFS 等其他媒體方式相同,隻需:
- 将寫入路徑寫作合适的 OSS 路徑
- 注意打開 Checkpoint 功能
例如,下列 Python 程式定義了一張位于 OSS 的表:
sink_dest = "oss://<user-bucket>/<user-defined-sink-dir>"
sink_ddl = f"""
CREATE TABLE mySink (
uid INT,
pid INT
) PARTITIONED BY (
pid
) WITH (
'connector' = 'filesystem',
'fpath' = '{sink_dest}',
'format' = 'csv',
'sink.rolling-policy.file-size' = '2MB',
'sink.partition-commit.policy.kind' = 'success-file'
)
"""
然後将其添加到 StreamTableEnvironmentt_env 中即可:t_env.sql_update(sink_ddl)
在程式中使用 JindoFS Flink Connector:更多配置
使用者通過 flink run 送出 java 或 pyflink 程式時,可以額外自定義一些參數,格式:
flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
目前支援“熵注入”及“分片上傳并行度”兩項配置
熵注入(entropyinjection):
- 功能:将寫入路徑的一段特定字元串比對出來,用一段随機的字元串進行替換
- 效果:削弱所謂 “片區” (sharding) 效應,提高寫入效率
- 配置參數:
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>
分片上傳并行度
- 配置參數:oss.upload.max.concurrent.uploads
- 預設值:目前可用的處理器數量
直接觀看第四課(7/8講)視訊回放,擷取執行個體講解~
⭐Github連結:
https://github.com/aliyun/alibabacloud-jindofs不錯過每次直播資訊、探讨更多資料湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!