天天看點

Flink 高效 sink 寫入 oss | 學習筆記

開發者學堂課程【資料湖 JindoFS + OSS 實操幹貨36講:Flink 高效 sink 寫入 oss】學習筆記,與課程緊密聯系,讓使用者快速學習知識。

課程位址:

https://developer.aliyun.com/learning/course/833/detail/13967

Flink 高效 sink 寫入 oss

内容介紹

一.背景介紹

二.功能介紹

三.如何配置

四.如何使用

一、背景介紹

Apache Flink 簡介

Apache Flink 是新一代大資料計算引擎的代表,以分布式流計算為核心,同時支援批處理。

特點:

低延時: Flink 流式計算可以做到亞秒甚至毫秒級延時,相比之下Spa流計算很難達到毫秒級

高吞吐: Fink 以分布式快照算法實作容錯對吞吐量的影響很小

高容錯:基于分布式快照算法,Fink實作了低代價、高效的容錯表現,以及 Exactly Once 語義保證。

JindoFS Flink Connector 産生背景

阿裡雲對象存儲 Object Storage Service(oss):

海量:無限容量,彈性伸縮

安全:12個9的資料安全性,多種加密方式

低成本:遠低于雲磁盤,且有多種存儲方式、生命周期管理等節約成本

高可靠:服務可用性99.9

已服務于海量使用者

二、功能介紹

Flink 應用廣泛:

流計算領域業内主要解決方案

Apache 基金會最活躍項目之一

未來:流批一體、線上分析

Flink 使用痛點:

開源 Apache Flink 尚不支援直接寫入OSS

Hadoop OSS SDK 寫入性能不一定滿足需求

JindoFS Flink Connector 介紹

整體架構:

1.兩階段 Checkpoint (檢查點)機制:

第一階段 MPU (MultiPartUpload,分片上傳)寫入 OSS

第二階段 MPU 送出

2.Recoverable Writer 可恢複性寫入:

臨時檔案以普通檔案格式上傳 OSS

Sink節點狀态快照

寫入 OSS vS. 寫入亞馬遜 S3:

Native 實作:資料寫入以C++代碼實作,相比 java 更高效

高速讀寫:多線程讀寫臨時檔案,對大于1的檔案優勢尤其明顯

資料緩存:讀寫 OSS 實作本地緩存,加速外部通路

OSs 通路加速, JindoFs 提供新支援

Flink 高效 sink 寫入 oss | 學習筆記

三、如何配置JindoFS Flink Connector

1.環境要求

叢集上有開源版本 Flink軟體,版本不低于1.10.1

2.SDK 配置:

下載下傳所需 SDK 檔案:

jindo-flink-sink-S{(version)} jar

jindofs-sdk-S{version} jar

下載下傳連結( Github ):

@https://github.com/aliyun/alibabacloud-jin/blob/master/docs/ jindofs sdk_ download.md

将兩個jar放置于叢集 Flink 目錄下 lib 檔案夾

-Flink 根目錄通常可由 SFLINKHOME 環境變量擷取

-叢集所有節點均需配置

Java SPI:自動加載資源,無需額外配置

文檔連結(Github):

https://github.com/aliyun/alibabacloud-jing/blob/master/docs/flink/jindofs_sdk

on_flink_for_oss.md

四、在程式中使用 JindoFS Flink Connector

確定叢集能夠通路 OSS Bucket

1. 前提:已購買 Oss 産品,Oss 網站連結 . OSS OSS :

https://www.aliyun.com/product/oss

2.確定能夠通路 OSS Bucket,例如正确配置密鑰或免密服務等

使用合适的路徑,流式寫入 OSS Bucket

寫入 ss 須使用 os 字首路徑,類似于

ossuser-bucket->user--defined--sink-dir>

更多優化!用 JindoFS SDK加速OSS通路,參考:

https://github. com/aliyun/alibabacloud-jir/blob/master/docs

/jindofs_sdk_vs_hadoop_sdk.md

在程式中使用 JindoFS Flink Connector:Java

在程式中開啟 Flink Checkpoint

前提:使用可重發的資料源,如 Kafka

通過 StreamExecutionEnvironment 對象打開 Checkpoint (示例)

建立:

StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEr

打開:

envenableCheckpointing(

示例程式

下文中, outputStream 是一個預先形成的 DataStream 對象,若需寫入 ss,則可以這樣添加sink:String outputPath ="oss:///":

StreamingFileSinksink= StreamingF. forRowformat

new Path(outputPath)

new SimpleStringEncoder("UTF-8)

)buildo:

outputStream.addSink(sink)

上述程式指定将 outputStream 中的 String 内容寫入 ss 路徑 oss //user-bucket-/user--defined--sink-dir>

最後還需用 envexecute 語句執行 Flin 作業,env 是已建立的 StreamExecutionEnviro 對象

最後,将 ava 作業打包為 jar 檔案,并用 flink run在叢集送出即可。 

在程式中使用 JindoFS Flink Connector:更多配置

使用者通過 flink run 送出 java 或 pyflink 程式時可以額外自定義一些參數,格式

flink run-m yarn-cluster-yD key1= valuel-yD key2=value2

目前支援“熵注入”及“分片上傳并行度”兩項配置

熵注入 (entropy injection):

1.功能:将寫入路徑的一段特定字元串比對出來,用一段随機的字元串進行替換

2.效果:削弱所謂片區 (sharding) 效應,提高寫入效率

3.配置參數:

oss.entropy.key=

oss.entropy length=

分片上傳并行度

配置參數: oss upload.max. concurrentuploads

預設值:目前可用的處理器數量