天天看點

【資料湖Hudi-10-Hudi內建Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】一、讀取方式二、限流三、寫入方式四、寫入模式六、Bucket索引七、Hudi CataLog七、離線 Compaction八、離線 Clustering

資料湖Hudi-10-Hudi內建Flink-讀取方式&限流&寫入方式

  • 一、讀取方式
    • 1 流讀(Streaming Query)
  • 二、限流
  • 三、寫入方式
    • 1.CDC 資料同步
      • 1.使用第二種方式 cdc+kafka進行mysql資料同步到hudi
    • 2.離線批量導入
    • 3.全量接增量
  • 四、寫入模式
    • 1、Changelog模式
    • 2 Append 模式
  • 六、Bucket索引
  • 七、Hudi CataLog
  • 七、離線 Compaction
  • 八、離線 Clustering

一、讀取方式

1 流讀(Streaming Query)

目前表預設是快照讀取,即讀取最新的全量快照資料并一次性傳回。通過參數 read.streaming.enabled 參數開啟流讀模式,通過 read.start-commit 參數指定起始消費位置,支援指定 earliest 從最早消費。

  • 1.with參數
名稱 Required 預設值 說明
read.streaming.enabled false false 設定 true 開啟流讀模式
read.start-commit false 最新 commit 指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(閉區間)
read.streaming.skip_compaction false false 流讀時是否跳過 compaction 的 commits,跳過 compaction 有兩個用途:1)避免 upsert 語義下重複消費 (compaction 的 instant 為重複資料,如果不跳過,有小機率會重複消費)2) changelog 模式下保證語義正确性 0.11 開始,以上兩個問題已經通過保留 compaction 的 instant time 修複
clean.retain_commits false 10 cleaner 最多保留的曆史 commits 數,大于此數量的曆史 commits 會被清理掉,changelog 模式下,這個參數可以控制 changelog 的保留時間,例如 checkpoint 周期為 5 分鐘一次,預設最少保留 50 分鐘的時間。

注意:當參數 read.streaming.skip_compaction 打開并且 streaming reader 消費落後于clean.retain_commits 數時,流讀可能會丢失資料。從 0.11 開始,compaction 不會再變更 record 的 instant time,是以理論上資料不會再重複消費,但是還是會重複讀取并丢棄,是以額外的開銷還是無法避免,對性能有要求的話還是可以開啟此參數。

案例展示:
CREATE TABLE t5(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t5',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'   -- 預設60s
);


insert into t5 select * from sourceT;

select * from t5;
           

二、限流

限流的邏輯是,源頭資料量級很大,百億級别。

下面是資料流向圖:

全量&增量資料 --> kafka --> flink --> hudi

  • 限流,是限制的flink寫出到hudi的速度。這樣就減少了flink的背壓,消費按照給定速率消費。

    這樣就可以提高作業的穩定性。

如果将全量資料(百億數量級) 和增量先同步到 kafka,再通過 flink 流式消費的方式将庫表資料直接導成 hoodie 表,因為直接消費全量部分資料:量大(吞吐高)、亂序嚴重(寫入的 partition 随機),會導緻寫入性能退化,出現吞吐毛刺,這時候可以開啟限速參數,保證流量平穩寫入。

WITH 參數

名稱 Required 預設值 說明
write.rate.limit false 預設關閉限速

三、寫入方式

1.CDC 資料同步

CDC 資料儲存了完整的資料庫變更,目前可通過兩種途徑将資料導入 hudi:

【資料湖Hudi-10-Hudi內建Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】一、讀取方式二、限流三、寫入方式四、寫入模式六、Bucket索引七、Hudi CataLog七、離線 Compaction八、離線 Clustering

第一種:通過 cdc-connector 直接對接 DB 的 binlog 将資料導入 hudi,優點是不依賴消息隊列,缺點是對 db server 造成壓力。

第二種:對接 cdc format 消費 kafka 資料導入 hudi,優點是可擴充性強,缺點是依賴 kafka。

使用mysql進行案例分析:

1.使用第二種方式 cdc+kafka進行mysql資料同步到hudi

  • 1)準備MySQL表

    (1)MySQL開啟binlog

    (2)建表

create database test;
use test;
create table stu3 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '學生名字',
  school varchar(20) not null comment '學校名字',
  nickname varchar(20) not null comment '學生小名',
  age int not null comment '學生年齡',
  class_num int not null comment '班級人數',
  phone bigint not null comment '電話号碼',
  email varchar(64) comment '家庭網絡郵箱',
  ip varchar(32) comment 'IP位址'
  ) engine=InnoDB default charset=utf8;
           
  • 2)flink讀取mysql binlog并寫入kafka

    (1)建立MySQL表

create table stu3_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop1',
  'port' = '3306',
  'username' = 'root',
  'password' = 'aaaaaa',
  'database-name' = 'test',
  'table-name' = 'stu3'
);
           

(2)建立Kafka表

create table stu3_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'upsert-kafka'
  ,'topic' = 'cdc_mysql_stu3_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
);
           

(3)将mysql binlog日志寫入kafka

insert into stu3_binlog_sink_kafka
select * from stu3_binlog;
           
  • 3)flink讀取kafka資料并寫入hudi資料湖

    (1)建立kafka源表

create table stu3_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
  );
           

(2)建立hudi目标表

create table stu3_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'insert',
  'write.precombine.field' = 'school'
  );
           

(3)将kafka資料寫入到hudi中

insert into stu3_binlog_sink_hudi
select * from  stu3_binlog_source_kafka;
           
  • 5)統計資料入Hudi情況
create table stu3_binlog_hudi_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

select count(*) from stu3_binlog_hudi_view;  
           
  • 6)實時檢視資料入湖情況
create table stu3_binlog_hudi_streaming_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school',
  'read.streaming.enabled' = 'true'
  );

 
select * from  stu3_binlog_hudi_streaming_view;
           

2.離線批量導入

如果存量資料來源于其他資料源,可以使用批量導入功能,快速将存量資料導成 Hoodie 表格式。

  • 1)原理

    (1)批量導入省去了 avro 的序列化以及資料的 merge 過程,後續不會再有去重操作,資料的唯一性需要自己來保證。

    (2)bulk_insert 需要在 Batch Execuiton Mode 下執行更高效,Batch 模式預設會按照 partition path 排序輸入消息再寫入 Hoodie,避免 file handle 頻繁切換導緻性能下降。

SET execution.runtime-mode = batch; 
SET execution.checkpointing.interval = 0;
           

(3)bulk_insert write task 的并發通過參數 write.tasks 指定,并發的數量會影響到小檔案的數量,理論上,bulk_insert write task 的并發數就是劃分的 bucket 數,當然每個 bucket 在寫到檔案大小上限(parquet 120 MB)的時候會 roll over 到新的檔案句柄,是以最後:寫檔案數量 >= bulk_insert write task 數。

  • 2)WITH 參數
名稱 Required 預設值 說明
write.operation true upsert 配置 bulk_insert 開啟該功能
write.tasks false 4 bulk_insert 寫 task 的并發,最後的檔案數 >=write.tasks
write.bulk_insert.shuffle_by_partition write.bulk_insert.shuffle_input(從 0.11 開始) false true 是否将資料按照 partition 字段 shuffle 再通過 write task 寫入,開啟該參數将減少小檔案的數量 但是可能有資料傾斜風險
write.bulk_insert.sort_by_partition write.bulk_insert.sort_input(從 0.11 開始) false true 是否将資料線按照 partition 字段排序再寫入,當一個 write task 寫多個 partition,開啟可以減少小檔案數量
write.sort.memory 128 sort 算子的可用 managed memory(機關 MB)
  • 3)案例

    (1)MySQL建表

create database test;
use test;
create table stu4 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '學生名字',
  school varchar(20) not null comment '學校名字',
  nickname varchar(20) not null comment '學生小名',
  age int not null comment '學生年齡',
  score decimal(4,2) not null comment '成績',
  class_num int not null comment '班級人數',
  phone bigint not null comment '電話号碼',
  email varchar(64) comment '家庭網絡郵箱',
  ip varchar(32) comment 'IP位址'
  ) engine=InnoDB default charset=utf8;
           

(4)Flink SQL client 建立myql資料源

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'aaaaaa',
  'table-name' = 'stu4'
);
           

(5)Flink SQL client建立hudi表

create table stu4_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
  );
           

(3)Flink SQL client執行mysql資料插入到hudi中

3.全量接增量

如果已經有全量的離線 Hoodie 表,需要接上實時寫入,并且保證資料不重複,可以開啟 index bootstrap 功能。

如果覺得流程冗長,可以在寫入全量資料的時候資源調大直接走流模式寫,全量走完接新資料再将資源調小(或者開啟限流功能)。

名稱 Required 預設值 說明
index.bootstrap.enabled true false 開啟索引加載,會将已存表的最新資料一次性加載到 state 中
index.partition.regex false * 設定正規表達式進行分區篩選,預設為加載全部分區

使用流程

(1) CREATE TABLE 建立和 Hoodie 表對應的語句,注意 table type 要正确

(2)設定 index.bootstrap.enabled = true開啟索引加載功能

(3)重新開機任務将 index.bootstrap.enabled 關閉,參數配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并發不同,可以重新開機避免 shuffle

說明:

(1)索引加載為并發加載,根據資料量大小加載時間不同,可以在log中搜尋

finish loading the index under partition 和 Load records from file 日志來觀察索引加載的進度

四、寫入模式

1、Changelog模式

如果希望 Hoodie 保留消息的所有變更(I/-U/U/D),之後接上 Flink 引擎的有狀态計算實作全鍊路近實時數倉生産(增量計算),Hoodie 的 MOR 表通過行存原生支援保留消息的所有變更(format 層面的內建),通過流讀 MOR 表可以消費到所有的變更記錄。

  • 1)WITH 參數
名稱 Required 預設值 說明
changelog.enabled false false 預設是關閉狀态,即 UPSERT 語義,所有的消息僅保證最後一條合并消息,中間的變更可能會被 merge 掉;改成 true 支援消費所有變更。

批(快照)讀仍然會合并所有的中間結果,不管 format 是否已存儲中間狀态。

開啟 changelog.enabled 參數後,中間的變更也隻是 Best Effort: 異步的壓縮任務會将中間變更合并成 1 條,是以如果流讀消費不夠及時,被壓縮後隻能讀到最後一條記錄。當然,通過調整壓縮的 buffer 時間可以預留一定的時間 buffer 給 reader,比如調整壓縮的兩個參數:

compaction.delta_commits:5 
compaction.delta_seconds: 3600。
           

說明:

Changelog 模式開啟流讀的話,要在 sql-client 裡面設定參數:

set sql-client.execution.result-mode=tableau; 
或者
set sql-client.execution.result-mode=changelog;
           

2)流讀 changelog

僅在 0.10.0 支援,本 feature 為實驗性。

開啟 changelog 模式後,hudi 會保留一段時間的 changelog 供下遊 consumer 消費,我們可以通過流讀 ODS 層 changelog 接上 ETL 邏輯寫入到 DWD 層,如下圖的 pipeline:

流讀的時候我們要注意 changelog 有可能會被 compaction 合并掉,中間記錄會消除,可能會影響計算結果,需要關注sql-client的屬性(result-mode)同上。

3)案例示範

(1)使用changelog

CREATE TABLE t6(
  id int,
  ts int,
  primary key (id) not enforced
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4',
  'changelog.enabled' = 'true'
);

insert into t6 values (1,1);
insert into t6 values (1,2);

set table.dynamic-table-options.enabled=true;
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;
select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;
           

(2)不使用changelog

CREATE TABLE t6_v(
  id int,
  ts int,
  primary key (id) not enforced
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'
);


select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;
select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;
           

2 Append 模式

從 0.10 開始支援

對于 INSERT 模式:

MOR 預設會 apply 小檔案政策: 會追加寫 avro log 檔案

COW 每次直接寫新的 parquet 檔案,沒有小檔案政策

Hudi 支援豐富的 Clustering 政策,優化 INSERT 模式下的小檔案問題:

  • 1)Inline Clustering

    隻有 Copy On Write 表支援該模式

    【資料湖Hudi-10-Hudi內建Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】一、讀取方式二、限流三、寫入方式四、寫入模式六、Bucket索引七、Hudi CataLog七、離線 Compaction八、離線 Clustering
  • 2) Async Clustering

    從 0.12 開始支援

六、Bucket索引

從 0.11 開始支援

預設的 flink 流式寫入使用 state 存儲索引資訊:primary key 到 fileId 的映射關系。當資料量比較大的時候,state的存儲開銷可能成為瓶頸,bucket 索引通過固定的 hash 政策,将相同 key 的資料配置設定到同一個 fileGroup 中,避免了索引的存儲和查詢開銷。

1)WITH參數

名稱 Required 預設值 說明
index.type false FLINK_STATE 設定 BUCKET 開啟 Bucket 索引功能
hoodie.bucket.index.hash.field false 主鍵 可以設定成主鍵的子集
hoodie.bucket.index.num.buckets false 4 預設每個 partition 的 bucket 數,目前設定後則不可再變更。
2)和 state 索引的對比:
(1)bucket index 沒有 state 的存儲計算開銷,性能較好
(2)bucket index 無法擴 buckets,state index 則可以依據檔案的大小動态擴容
(3)bucket index 不支援跨 partition 的變更(如果輸入是 cdc 流則沒有這個限制),state index 沒有限制

七、Hudi CataLog

從 0.12.0 開始支援,通過 catalog 可以管理 flink 建立的表,避免重複建表操作,另外 hms 模式的 catalog 支援自動補全 hive 同步參數。

DFS 模式 Catalog SQL樣例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的預設路徑}',
    'mode'='dfs' 
  );
Hms 模式 Catalog SQL 樣例:
CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的預設路徑}',
    'hive.conf.dir' = '${hive-site.xml 所在的目錄}',
    'mode'='hms' -- 支援 'dfs' 模式通過檔案系統管理表屬性
  );
           
  • 1)WITH 參數
名稱 Required 預設值 說明
catalog.path true 預設的 catalog 根路徑,用作表路徑的自動推導,預設的表路徑: c a t a l o g . p a t h / {catalog.path}/ catalog.path/{db_name}/${table_name}
default-database false default 預設的 database 名
hive.conf.dir false hive-site.xml 所在的目錄,隻在 hms 模式下生效
mode false dfs 支援 hms模式通過 hive 管理中繼資料
table.external false false 是否建立外部表,隻在 hms 模式下生效
  • 2)使用dfs方式

    (1)建立sql-client初始化sql檔案

vim /opt/module/flink-1.13.6/conf/sql-client-init.sql

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '/tmp/hudi_catalog',
    'mode'='dfs' 
  );

USE CATALOG hoodie_catalog;
           

(2)指定sql-client啟動時加載sql檔案

hadoop fs -mkdir /tmp/hudi_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session
           

(3)建庫建表插入

create database test;
use test;

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20),
primary key (uuid) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t2',
  'table.type' = 'MERGE_ON_READ'
);

insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');
           

(4)退出sql-client,重新進入,表資訊還在

use test;
show tables;
select * from t2;
           

七、離線 Compaction

MOR 表的 compaction 預設是自動打開的,政策是 5 個 commits 執行一次壓縮。 因為壓縮操作比較耗費記憶體,和寫流程放在同一個 pipeline,在資料量比較大的時候(10w+/s qps),容易幹擾寫流程,此時采用離線定時任務的方式執行 compaction 任務更穩定。

  • 1 設定參數
compaction.async.enabled 為 false,關閉線上 compaction。
compaction.schedule.enabled 仍然保持開啟,由寫任務階段性觸發壓縮 plan。
           
  • 2 原理

    一個 compaction 的任務的執行包括兩部分:

    schedule 壓縮 plan

    該過程推薦由寫任務定時觸發,寫參數 compaction.schedule.enabled 預設開啟

    執行對應的壓縮 plan

  • 3 使用方式

    1)執行指令

    離線 compaction 需要手動執行 Java 程式,程式入口:

// 指令行的方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table
           

2)案例示範

(1)建立表,關閉線上壓縮

create table t7(
  id int,
  ts int,
  primary key (id) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t7',
  'compaction.async.enabled' = 'false',
  'compaction.schedule.enabled' = 'true',
  'table.type' = 'MERGE_ON_READ'
);


insert into t7 values(1,1);
insert into t7 values(2,2);
insert into t7 values(3,3);
insert into t7 values(4,4);
insert into t7 values(5,5);
           

// 指令行的方式

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t7
           

八、離線 Clustering

異步的 clustering 相對于 online 的 async clustering 資源隔離,進而更加穩定。

  • 1 設定參數
clustering.async.enabled 為 false,關閉線上 clustering。
clustering.schedule.enabled 仍然保持開啟,由寫任務階段性觸發 clustering plan。
           
  • 2 原理

    一個 clustering 的任務的執行包括兩部分:

    • schedule plan

      推薦由寫任務定時觸發,寫參數 clustering.schedule.enabled 預設開啟。

    • 執行對應的 plan
  • 3 使用方式

    1)執行指令

    離線 clustering 需要手動執行 Java 程式,程式入口:

// 指令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table
           

注意:必須是分區表,否則報錯空指針異常。

2)案例示範
(1)建立表,關閉線上壓縮
           
create table t8(
  id int,
  age int,
  ts int,
  primary key (id) not enforced
) partitioned by (age)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t8',
  'clustering.async.enabled' = 'false',
  'clustering.schedule.enabled' = 'true',
  'table.type' = 'COPY_ON_WRITE'
);


insert into t8 values(1,18,1);
insert into t8 values(2,18,2);
insert into t8 values(3,18,3);
insert into t8 values(4,18,4);
insert into t8 values(5,18,5);
           
// 指令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t8
           

繼續閱讀