天天看點

資料湖實操講解【OSS 通路加速】第七講:Flink 高效 sink 寫入 OSS

本期導讀 :【OSS 通路加速】第七講

主題:Flink 高效 sink 寫入 OSS

講師:重湖,阿裡巴巴計算平台事業部 EMR 進階工程師

内容架構:

  • 背景介紹
  • 功能介紹
  • 如何配置
  • 如何使用

直播回放連結:(7/8講)

https://developer.aliyun.com/live/246851

一、背景介紹

Apache Flink 簡介

Apache Flink 是新一代大資料計算引擎的代表,以分布式流計算為核心,同時支援批處理。特點:

  • 低延時:Flink 流式計算可以做到亞秒甚至毫秒級延時,相比之下 Spark 流計算很難達到秒級
  • 高吞吐:Flink 以分布式快照算法實作容錯,對吞吐量的影響很小
  • 高容錯:基于分布式快照算法,Flink 實作了低代價、高效的容錯表現,以及 Exactly_Once 語義保證
資料湖實操講解【OSS 通路加速】第七講:Flink 高效 sink 寫入 OSS

JindoFS Flink Connector 産生背景

阿裡雲對象存儲 Object Storage Service(OSS):

  • 海量:無限容量,彈性伸縮
  • 安全:12個9的資料安全性,多種加密方式
  • 低成本:遠低于雲磁盤,且有多種存儲方式、生命周期管理等節約成本
  • 高可靠:服務可用性 99.9%
  • 已服務于海量使用者

Flink 應用廣泛:

  • 流計算領域業内主要解決方案
  • Apache 基金會最活躍項目之一
  • 未來:流批一體、線上分析

Flink 使用痛點:

  • 開源 ApacheFlink 尚不支援直接寫入 OSS
  • Hadoop OSS SDK 寫入性能不一定滿足需求

JindoFS Flink Connector 介紹

整體架構:

兩階段 Checkpoint (檢查點) 機制:

  • 第一階段 MPU (MultiPartUpload,分片上傳) 寫入 OSS
  • 第二階段 MPU 送出

Recoverable Writer 可恢複性寫入:

  • 臨時檔案以普通檔案格式上傳 OSS
  • Sink 節點狀态快照
資料湖實操講解【OSS 通路加速】第七講:Flink 高效 sink 寫入 OSS

寫入 OSS vs.  寫入 亞馬遜S3:

  • Native 實作:資料寫入以 C++ 代碼實作,相比 Java 更高效
  • 高速讀寫:多線程讀寫臨時檔案,對大于1MB的檔案優勢尤其明顯
  • 資料緩存:讀寫 OSS 實作本地緩存,加速外部通路

OSS 通路加速,JindoFS 提供新支援

資料湖實操講解【OSS 通路加速】第七講:Flink 高效 sink 寫入 OSS

二、如何配置

如何配置 JindoFS Flink Connector

環境要求:

  • 叢集上有開源版本 Flink 軟體,版本不低于1.10.1

SDK 配置:

下載下傳所需 SDK 檔案:

将兩個 jar 放置于叢集 Flink 目錄下 lib 檔案夾:

  • Flink 根目錄通常可由 $FLINK_HOME 環境變量擷取
  • 叢集所有節點均需配置

Java SPI:自動加載資源,無需額外配置

⭐文檔連結(Github):

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flink/jindofs_sdk_on_flink_for_oss.md

在程式中使用 JindoFS Flink Connector

確定叢集能夠通路 OSS Bucket

使用合适的路徑,流式寫入OSS Bucket

  • 寫入 OSS 須使用 oss:// 字首路徑,類似于:

oss://<user-bucket>/<user-defined-sink-dir>

更多優化!用 JindoFS SDK 加速 OSS 通路,參考

⭐Github:

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_vs_hadoop_sdk.md

在程式中使用 JindoFS Flink Connector:Java

在程式中開啟 Flink Checkpoint

  • 前提:使用可重發的資料源,如 Kafka
  • 通過 StreamExecutionEnvironment 對象打開 Checkpoint(示例):

建立:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();      

打開:

env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);      

示例程式

  • 下文中,outputStream 是一個預先形成的 DataStream 對象,若需寫入 OSS,則可以這樣添加 sink:
String outputPath = "oss://<user-bucket>/<user-defined-sink-dir>";
StreamingFileSink<String> sink= StreamingFileSink.forRowFormat(
        new Path(outputPath),
        new SimpleStringEncoder<String>("UTF-8")
).build();
outputStream.addSink(sink);      
  • 上述程式指定将 outputStream 中的String 内容寫入 OSS 路徑 oss:///,最後還需用 env.execute() 語句執行 Flink 作業,env 是已建立的 StreamExecutionEnvironment 對象
  • 最後,将 Java 作業打包為 jar 檔案,并用 flink run 在叢集送出即可

在程式中使用 JindoFS Flink Connector:Pyflink

與Java 示例類似,在 Pyflink 中使用 JindoFS Flink Connector 與寫入 HDFS 等其他媒體方式相同,隻需:

  • 将寫入路徑寫作合适的 OSS 路徑
  • 注意打開 Checkpoint 功能

例如,下列 Python 程式定義了一張位于 OSS 的表:

sink_dest = "oss://<user-bucket>/<user-defined-sink-dir>"
sink_ddl = f""" 
        CREATE TABLE mySink (
                uid INT,
                pid INT
        ) PARTITIONED BY (
                pid
        ) WITH (
                'connector' = 'filesystem',
                'fpath' = '{sink_dest}',
                'format' = 'csv',
                'sink.rolling-policy.file-size' = '2MB',
                'sink.partition-commit.policy.kind' = 'success-file'
        )
"""      

然後将其添加到 StreamTableEnvironmentt_env 中即可:t_env.sql_update(sink_ddl)

在程式中使用 JindoFS Flink Connector:更多配置

使用者通過 flink run 送出 java 或 pyflink 程式時,可以額外自定義一些參數,格式:

      flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

目前支援“熵注入”及“分片上傳并行度”兩項配置

熵注入(entropyinjection):

  • 功能:将寫入路徑的一段特定字元串比對出來,用一段随機的字元串進行替換
  • 效果:削弱所謂 “片區” (sharding) 效應,提高寫入效率
  • 配置參數:

  ​oss.entropy.key=<user-defined-key>

  oss.entropy.length=<user-defined-length>

分片上傳并行度

  • 配置參數:oss.upload.max.concurrent.uploads
  • 預設值:目前可用的處理器數量

直接觀看第四課(7/8講)視訊回放,擷取執行個體講解~

⭐Github連結:

https://github.com/aliyun/alibabacloud-jindofs

不錯過每次直播資訊、探讨更多資料湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!

資料湖實操講解【OSS 通路加速】第七講:Flink 高效 sink 寫入 OSS