一、Hive 小檔案概述
在Hive中,所謂的小檔案是指檔案大小遠小于HDFS塊大小的檔案,通常小于128 MB,甚至更少。這些小檔案可能是Hive表的一部分,每個小檔案都包含一個或幾個表的記錄,它們以文本格式存儲。
Hive通常用于分析大量資料,但它在處理小檔案方面表現不佳,Hive中存在大量小檔案會引起以下問題:
- 存儲空間占用過多:在Hadoop生态系統中,每個小檔案都将占用一定的存儲空間,而且每個小檔案也需要一個塊來存儲。如果存在大量的小檔案,将浪費大量的存儲空間。
- 處理延遲:小檔案數量過多,會引起大量IO操作,導緻處理延遲。
- 查詢性能下降:小檔案用于分區和表劃分,可能導緻查詢延遲并降低查詢性能。此外,小檔案還會增加中繼資料的數量,使得Hive在查詢中繼資料時變得更加緩慢。
- 資料傾斜:如果資料分布不均勻,會導緻一些Reduce任務處理了完全不同的分區,這會使某些Reduce任務的運作速度與其他Reduce任務相比非常慢。
是以,為了避免這些問題,我們需要對Hive中小檔案的處理進行優化,減少小檔案數量和大小,以提高資料處理效率和準确性。
二、Hive 小檔案産生的背景
Hive中小檔案産生的背景主要是因為以下兩個原因:
- 資料寫入頻率較高:如果表的寫入頻率較高,也就意味着會頻繁地添加、更新或删除記錄,這可能會導緻小檔案的産生。由于Hive表被映射到HDFS檔案,是以如果頻繁地寫入資料,它們可能以小檔案的形式存在。
- 映射表的切分限制:Hive表映射為HDFS檔案時會按照資料塊大小進行切分和管理。如果表中存在小于單個資料塊大小的資料,生成的檔案就會比資料塊小。這可能會導緻大量小檔案的産生,
綜上所述,Hive中小檔案的存在與資料寫入頻率高和表映射為HDFS檔案的切分方式有關。為了處理小檔案問題,我們需要了解這些背景并針對其原因來優化處理。
三、環境準備
如果已經有了環境了,可以忽略,如果想快速部署環境進行測試可以參考我這篇文章:通過 docker-compose 快速部署 Hive 詳細教程
# 登入容器
docker exec -it hive-hiveserver2 bash
# 連接配接hive
beeline -u jdbc:hive2://hive-hiveserver2:10000 -n hadoop
四、Hive 小檔案治理
為了處理Hive中的小檔案問題,可以采取以下一些有效措施:
- 檔案合并:将多個小檔案合并成一個大檔案,采用 Hadoop 檔案合并API可以将多個小檔案合并成一個大檔案。合并檔案後,可以減少小檔案數量,減少Hadoop檔案管理負擔,減少HDFS中繼資料和NameNode記憶體消耗。
- 壓縮檔案:可以使用壓縮算法(如gzip、bzip2等)對小檔案進行壓縮,這樣可以減少磁盤空間和網絡帶寬的使用,并減少小檔案損壞的可能性。
- 存儲格式優化:Hive支援多種存儲格式,如ORC、Parquet、Avro等。這些格式允許将多個小檔案壓縮并序列化成一個大檔案,存儲時占用更少的磁盤和網絡帶寬。存儲格式優化對于處理小檔案問題非常有效。
- 分區表:對于一些常變動的資料,推薦使用分區表。分區表将資料按照不同的分區值存儲在不同的目錄中。這減少了小檔案數量并提高了查詢效率。
- 垃圾回收:如果一個表舊資料經常更新或删除,就會産生大量無用的小檔案,是以建議進行垃圾回收。可以定期執行HDFS檔案删除指令或者設定TTL等機制,定期删除備援資料以減少HDFS檔案、中繼資料和NameNode記憶體的消耗。
通過采取上述措施中的一種或多種,可以極大地減少Hive中小檔案數量,優化Hive表的表現并提高查詢效率。
1)小檔案合并(常用)
可以使用以下指令将 Hive 表中的小檔案合并為一個大檔案:
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=16000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapfiles=true;
# 未分區
INSERT OVERWRITE TABLE table_new SELECT * FROM table_old;
# 分區
INSERT OVERWRITE TABLE table_new SELECT column1,column2 FROM table_old where partitions;
上述代碼中的參數含義如下:
- hive.merge.size.per.task:設定MapReduce任務處理的最大資料大小,機關是位元組,預設為256MB。
- hive.merge.smallfiles.avgsize:設定如果小于該平均大小的檔案需要合并在一起,以減小小檔案的數量和規模,機關是位元組,預設為16MB。
- hive.input.format:使用 CombinHiveInputFormat 作為輸入格式合并小檔案。
- hive.merge.mapfiles:合并Map檔案(.mapred或.mapreduce)以減少小檔案的數量。
1、示例示範一(非分區表)
# 非分區表
CREATE TABLE student (
id INT,
name STRING,
age INT,
address STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 添加資料,這裡多執行幾次,會生成多個檔案,友善下面檔案合并實驗
INSERT INTO TABLE student VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可使用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu.txt' INTO TABLE student;
從上圖可看到已經有很多小檔案了,接下來就是進行合并了。執行以下指令即可:
INSERT OVERWRITE TABLE student SELECT * FROM student;
已經将多個檔案合并成一個檔案了,達到了小檔案合并的效果了。
2、示例示範二(分區表)
其實用的多的還是按分區進行合并,一般表都是有分區的,按分區合并的好處就是減少讀寫壓力,資料量大的情況下分批合并是非常友好的。
# 分區表
CREATE TABLE student_patitions (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 開啟動态分區,預設是false
set hive.exec.dynamic.partition=true;
-- 開啟允許所有分區都是動态的,否則必須要有靜态分區才能使用。
set hive.exec.dynamic.partition.mode=nostrick;
-- Hive預設情況下設定的最大動态分區建立數是100。
set hive.exec.max.dynamic.partitions=10000;
-- 添加資料,這裡多執行幾次,會生成多個檔案,友善下面檔案合并實驗
INSERT INTO TABLE student_patitions PARTITION (year=2019) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
INSERT INTO TABLE student_patitions PARTITION (year=2023) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可使用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu_pt.txt' INTO TABLE student_patitions PARTITION (year=2020);
從上圖可看到已經有很多小檔案了,接下來就是進行合并了。執行以下指令即可:
-- 按分區合并
insert overwrite table student_patitions partition(year=2019)
select id, name, age, address from student_patitions where year=2019;
-- 動态分區合并,有些版本不支援*,
-- *
insert overwrite table student_patitions partition(year) select * from student_patitions;
-- insert overwrite table student_patitions partition(year) select id, name, age, address from student_patitions;
-- 也可以通過load data方式
load data local inpath './stu_pt.txt' overwrite into table student_patitions partition(year=2019);
3、示例示範三(臨時表)
還有一個更靠譜的方案就是通過将現有的表資料合并寫到另外一張臨時新表,然後确認合并無誤後,将原始表和表資料删除,再将新表名改成舊表名。
示例如下:
-- 分區表
CREATE TABLE student_patitions2 (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 開啟動态分區,預設是false
set hive.exec.dynamic.partition=true;
-- 開啟允許所有分區都是動态的,否則必須要有靜态分區才能使用。
set hive.exec.dynamic.partition.mode=nostrick;
-- Hive預設情況下設定的最大動态分區建立數是100。
set hive.exec.max.dynamic.partitions=10000;
-- 添加資料,這裡多執行幾次,會生成多個檔案,友善下面檔案合并實驗
INSERT INTO TABLE student_patitions2 PARTITION (year=2019) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
INSERT INTO TABLE student_patitions2 PARTITION (year=2023) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可使用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu_pt.txt' INTO TABLE student_patitions2 PARTITION (year=2020);
建立臨時表并将添加合并資料
CREATE TABLE student_patitions2_temp (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 按分區合并,有些版本不支援*
insert overwrite table student_patitions2_temp partition(year)
select * from student_patitions2;
-- insert overwrite table student_patitions2_temp partition(year) select id, name, age, address from student_patitions2;
-- 也可以通過load data方式
load data local inpath './stu_pt.txt' overwrite into table student_patitions2_temp partition(year=2019);
删除舊表,修改表表名稱
# 删表,如果是外部表還是删除資料檔案
DROP TABLE student_patitions2;
ALTER TABLE student_patitions2_temp RENAME TO student_patitions2;
2)檔案壓縮
可以使用以下指令将表中的小檔案進行壓縮:
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
INSERT OVERWRITE TABLE table_new SELECT * FROM table_old;
3)存儲格式優化
使用存儲格式進行優化,可以将多個小檔案壓縮和序列化成一個大檔案。以下是使用ORC格式的實作示例:
SET hive.exec.compress.output=true;
SET orc.compress=SNAPPY;
SET hive.exec.orc.default.compress=SNAPPY;
CREATE TABLE table_new STORED AS ORC AS SELECT * FROM table_old;
上述代碼中的參數含義如下:
- hive.exec.compress.output:指定是否開啟壓縮,如果啟用則會對輸出進行壓縮,以節省存儲空間和網絡帶寬。
- orc.compress:設定壓縮算法,這裡使用SNAPPY。
- hive.exec.orc.default.compress:設定ORC檔案預設壓縮算法,這裡使用SNAPPY。
4)分區表
可以使用以下SQL語句建立分區表:
CREATE TABLE table_new(
column1 INT,
column2 STRING
)
PARTITIONED BY (
day STRING
)
ROW FORMAT DELIMITED
STORED AS TEXTFILE;
這裡将表按照分區值進行存儲,可以提高查詢效率,減少小檔案數量。
5)垃圾回收
删除HDFS中過期的小檔案可以減少 HDFS 的存儲開銷。
可以使用如下指令進行删除操作:
hdfs dfs -rm /path/to/file-*
也可以使用 HiveQL 參數 EXPIRE 進行垃圾回收,以将無用的檔案從HDFS中删除:
ALTER TABLE table_old DROP PARTITION (day '2016-01-01') PURGE;
上述代碼中将删除舊的分區并從HDFS中永久删除不再需要的資料。
綜上所述,可以通過上述方式來處理Hive中小檔案問題,以提高Hive的查詢效率和性能。
五、HDFS 資料平衡
1)HDFS 資料傾斜
HDFS資料傾斜是指存在一些資料塊的大小明顯大于其他資料塊,導緻作業在運作時的處理時間和性能嚴重不平衡。這通常是由于資料分布不均勻,或者任務負載不均勻導緻的。Hive的MapReduce作業經常面臨HDFS資料傾斜的問題,這會導緻一部分Mapper處理的資料量很大,而其他Mapper卻沒有得到充分利用。
以下是一些緩解HDFS資料傾斜的方法:
- 增大檔案塊大小:如果您的作業經常面臨資料傾斜問題,可以嘗試增大資料塊的大小。這樣可以降低Mapper需要處理的資料塊數量,進而減少資料塊配置設定不均衡的可能性。
- 資料合并:如果您的作業中存在大量較小的檔案,可以嘗試将它們合并為幾個較大的檔案。這樣可以減少地圖任務的數目,并有助于均衡任務的負載。
- 資料重分區:如果在您的作業中資料分布極不均勻,可以嘗試使用資料重分區(例如Hive中的 CLUSTER BY 或 DISTRIBUTE BY 語句)來重新組織資料。這可以幫助将相似的資料放在同一個分區中,進而減少資料傾斜的可能性。
- 動态分區:在Hive中,動态分區可用于根據資料中實際的分區鍵動态建立分區。它可以使用較小的資料塊大小來提高作業的并行性。動态分區還可以通過確定資料配置設定均衡來緩解資料傾斜的問題。
- 壓縮:使用壓縮技術可以減小資料塊大小,并減少傾斜問題的可能性。常用的壓縮格式包括Gzip、Snappy、LZO等。
HDFS資料傾斜不僅可能出現在資料塊的大小上,還可能出現在資料節點(Datanode)的負載上。如果一個Datanode存儲的資料塊遠遠多于其他Datanode,那麼它處理作業時的負載将遠高于其他節點,進而導緻整個叢集性能下降。下面是一些緩解HDFS資料節點傾斜問題的方法:
- 增加節點:可以向叢集中添加更多的節點,以增加存儲能力。這樣可以分散節點的負載,避免單個節點負載過高。盡管這樣做可能會增加叢集的維護成本,但它可以提高叢集的性能和可靠性。一般增加完新節點需要做資料平衡,要不然新節點磁盤使用率遠低于其它節點的磁盤。
- 均衡資料分布:您可以使用HDFS中的均衡指令(hdfs balancer)來均衡資料分布。該指令将根據需要将塊移動到不同的節點,以保持所有節點的負載相對均衡。
- 更改塊大小:當塊大小不均衡時,您可以嘗試根據每個節點的存儲容量增加或減少塊大小,以確定每個節點的負載相對均衡。例如,如果一個節點存儲大量的小檔案,則可以将塊大小增加到更适合這種情況的大小(例如512MB或1GB),以減少每個節點的塊數。
- 資料遷移:如果一個節點負載過高,您可以從該節點中移動一些塊到其他節點中,以減輕該節點的負載。這可以通過将塊從一個節點複制到另一個節點來實作。需要注意的是,這樣做可能會影響作業的性能,是以建議在維護合适的性能的同時進行資料遷移。
需要注意的是,緩解HDFS資料節點傾斜問題需要綜合考慮多種因素,包括資料分布、叢集規模、硬體配置等。根據具體情況,您可以采取不同的措施來緩解資料節點傾斜的問題。
2)HDFS 資料平衡
HDFS提供了 hdfs balancer 指令來進行資料平衡呢。hdfs balancer指令可以讓HDFS叢集重新均衡分布資料塊,保證HDFS叢集中資料塊在各個節點上均衡分布。
hdfs balancer 指令的文法如下:
hdfs balancer -help
Usage: java Balancer
[-policy <policy>] the balancing policy: datanode or blockpool
[-threshold <threshold>] Percentage of disk capacity
[-exclude [-f <hosts-file> | comma-sperated list of hosts]] Excludes the specified datanodes.
[-include [-f <hosts-file> | comma-sperated list of hosts]] Includes only the specified datanodes.
參數詳解:
- -threshold:某datanode的使用率和整個叢集使用率的百分比內插補點門檻值,達到這個門檻值就啟動hdfs balancer,取值從1到100,不宜太小,因為在平衡過程中也有資料寫入,太小無法達到平衡,預設值:10
- -policy:分為blockpool和datanode,前者是block pool級别的平衡後者是datanode級别的平衡,BlockPool 政策平衡了塊池級别和 DataNode 級别的存儲。BlockPool 政策僅适用于 Federated HDFS 服務
- -exclude:不為空,則不在這些機器上進行平衡
-
-include:不為空,則僅在這些機器上進行平衡
-idleiterations:最大疊代次數
另外還有兩個常用的參數:
- dfs.datanode.balance.bandwidthPerSec :HDFS做均衡時使用的最大帶寬,預設為1048576,即1MB/s,對大多數千兆甚至萬兆帶寬的叢集來說過小。不過該值可以在啟動balancer腳本時再設定,可以不修改叢集層面預設值。目前目前我們産線環境設定的是50M/s~100M/s。
- dfs.balancer.block-move.timeout:是一個Hadoop資料平衡指令hdfs balancer的選項之一,用于設定資料塊移動的最長時間。該選項指定了塊移動操作在多長時間内必須完成。該選項預設值為120000毫秒(即2分鐘),可以通過以下指令進行修改:
簡單使用:
# 啟動資料平衡,預設門檻值為 10%
hdfs balancer
# 預設相內插補點為10% 帶寬速率為10M/s,逾時時間10分鐘,過程資訊會直接列印在用戶端 ctrl+c即可中止
hdfs balancer -Ddfs.balancer.block-move.timeout=600000
#可以手動設定相內插補點 一般相內插補點越小 需要平衡的時間就越長,//設定為20% 這個參數本身就是百分比 不用帶%
hdfs balancer -threshold 20
#如果怕影響業務可以動态設定一下帶寬再執行上述指令,1M/s
hdfs dfsadmin -setBalancerBandwidth 1048576
#或者直接帶參運作,帶寬為1M/s
hdfs balancer -Ddfs.datanode.balance.bandwidthPerSec=1048576 -Ddfs.balancer.block-move.timeout=600000
關于 Hive 小檔案治理和 HDFS 資料平衡講解就先到這裡了,有任何疑問歡迎給我留言或私信,也可關注我的公衆号【大資料與雲原生技術分享】加群交流或私信溝通,如果本文章對你有所幫幫助,幫忙一鍵三連(點贊、轉發、分享)~