天天看點

Hive/HiveSQL常用優化方法全面總結

Hive作為大資料領域常用的資料倉庫元件,在平時設計和查詢時要特别注意效率。影響Hive效率的幾乎從不是資料量過大,而是資料傾斜、資料備援、job或I/O過多、MapReduce配置設定不合理等等。對Hive的調優既包含對HiveSQL語句本身的優化,也包含Hive配置項和MR方面的調整。

《2021年最新版大資料面試題全面開啟更新》

目錄

  • 列裁剪和分區裁剪
  • 謂詞下推
  • sort by代替order by
  • group by代替distinct
  • group by配置調整
    • map端預聚合
    • 傾斜均衡配置項
  • join基礎優化
    • build table(小表)前置
    • 多表join時key相同
    • 利用map join特性
    • 分桶表map join
  • 優化SQL處理join資料傾斜
    • 空值或無意義值
    • 單獨處理傾斜key
    • 不同資料類型
    • build table過大
  • MapReduce優化
    • 調整mapper數
    • 調整reducer數
    • 合并小檔案
    • 啟用壓縮
    • JVM重用
  • 并行執行與本地模式
  • 嚴格模式
  • 采用合适的存儲格式

最基本的操作。所謂列裁剪就是在查詢時隻讀取需要的列,分區裁剪就是隻讀取需要的分區。以我們的月曆記錄表為例:

select uid,event_type,record_data	
from calendar_record_log	
where pt_date >= 20190201 and pt_date <= 20190224	
and status = 0;           

當列很多或者資料量很大時,如果select *或者不指定分區,全列掃描和全表掃描效率都很低。

Hive中與列裁剪優化相關的配置項是

hive.optimize.cp

,與分區裁剪優化相關的則是

hive.optimize.pruner

,預設都是true。在HiveSQL解析階段對應的則是ColumnPruner邏輯優化器。

在關系型資料庫如MySQL中,也有謂詞下推(Predicate Pushdown,PPD)的概念。它就是将SQL語句中的where謂詞邏輯都盡可能提前執行,減少下遊處理的資料量。

例如以下HiveSQL語句:

select a.uid,a.event_type,b.topic_id,b.title	
from calendar_record_log a	
left outer join (	
select uid,topic_id,title from forum_topic	
where pt_date = 20190224 and length(content) >= 100	
) b on a.uid = b.uid	
where a.pt_date = 20190224 and status = 0;           

對forum_topic做過濾的where語句寫在子查詢内部,而不是外部。Hive中有謂詞下推優化的配置項

hive.optimize.ppd

,預設值true,與它對應的邏輯優化器是PredicatePushDown。該優化器就是将OperatorTree中的FilterOperator向上提,見下圖。

Hive/HiveSQL常用優化方法全面總結

圖來自https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html

上面的連結中是一篇講解HiveSQL解析與執行過程的好文章,前文提到的優化器、OperatorTree等概念在其中也有詳細的解釋,非常推薦。

HiveSQL中的order by與其他SQL方言中的功能一樣,就是将結果按某字段全局排序,這會導緻所有map端資料都進入一個reducer中,在資料量大時可能會長時間計算不完。

如果使用sort by,那麼還是會視情況啟動多個reducer進行排序,并且保證每個reducer内局部有序。為了控制map端資料配置設定到reducer的key,往往還要配合distribute by一同使用。如果不加distribute by的話,map端資料就會随機配置設定到reducer。

舉個例子,假如要以UID為key,以上傳時間倒序、記錄類型倒序輸出記錄資料:

select uid,upload_time,event_type,record_data	
from calendar_record_log	
where pt_date >= 20190201 and pt_date <= 20190224	
distribute by uid	
sort by upload_time desc,event_type desc;           

當要統計某一列的去重數時,如果資料量很大,count(distinct)就會非常慢,原因與order by類似,count(distinct)邏輯隻會有很少的reducer來處理。這時可以用group by來改寫:

select count(1) from (	
select uid from calendar_record_log	
where pt_date >= 20190101	
group by uid	
) t;           

但是這樣寫會啟動兩個MR job(單純distinct隻會啟動一個),是以要確定資料量大到啟動job的overhead遠小于計算耗時,才考慮這種方法。當資料集很小或者key的傾斜比較明顯時,group by還可能會比distinct慢。

那麼如何用group by方式同時統計多個列?下面是解決方法:

select t.a,sum(t.b),count(t.c),count(t.d) from (	
select a,b,null c,null d from some_table	
union all	
select a,0 b,c,null d from some_table group by a,c	
union all	
select a,0 b,null c,d from some_table group by a,d	
) t;           

group by時,如果先起一個combiner在map端做部分預聚合,可以有效減少shuffle資料量。預聚合的配置項是

hive.map.aggr

,預設值true,對應的優化器為GroupByOptimizer,簡單友善。

通過

hive.groupby.mapaggr.checkinterval

參數也可以設定map端預聚合的行數門檻值,超過該值就會分拆job,預設值100000。

group by時如果某些key對應的資料量過大,就會發生資料傾斜。Hive自帶了一個均衡資料傾斜的配置項

hive.groupby.skewindata

,預設值false。

其實作方法是在group by時啟動兩個MR job。第一個job會将map端資料随機輸入reducer,每個reducer做部分聚合,相同的key就會分布在不同的reducer中。第二個job再将前面預處理過的資料按key聚合并輸出結果,這樣就起到了均衡的效果。

但是,配置項畢竟是死的,單純靠它有時不能根本上解決問題,是以還是建議自行了解資料傾斜的細節,并優化查詢語句。

join優化是一個複雜的話題,下面先說5點最基本的注意事項。

在最常見的hash join方法中,一般總有一張相對小的表和一張相對大的表,小表叫build table,大表叫probe table。如下圖所示。

Hive/HiveSQL常用優化方法全面總結

圖來自http://hbasefly.com/2017/03/19/sparksql-basic-join/

Hive在解析帶join的SQL語句時,會預設将最後一個表作為probe table,将前面的表作為build table并試圖将它們讀進記憶體。如果表順序寫反,probe table在前面,引發OOM的風險就高了。

在次元模組化資料倉庫中,事實表就是probe table,次元表就是build table。假設現在要将月曆記錄事實表和記錄項編碼次元表來join:

select a.event_type,a.event_code,a.event_desc,b.upload_time	
from calendar_event_code a	
inner join (	
select event_type,upload_time from calendar_record_log	
where pt_date = 20190225	
) b on a.event_type = b.event_type;           

這種情況會将多個join合并為一個MR job來處理,例如:

select a.event_type,a.event_code,a.event_desc,b.upload_time	
from calendar_event_code a	
inner join (	
select event_type,upload_time from calendar_record_log	
where pt_date = 20190225	
) b on a.event_type = b.event_type	
inner join (	
select event_type,upload_time from calendar_record_log_2	
where pt_date = 20190225	
) c on a.event_type = c.event_type;           

如果上面兩個join的條件不相同,比如改成

a.event_code = c.event_code

,就會拆成兩個MR job計算。

負責這個的是相關性優化器CorrelationOptimizer,它的功能除此之外還非常多,邏輯複雜,參考Hive官方的文檔可以獲得更多細節:https://cwiki.apache.org/confluence/display/Hive/Correlation+Optimizer

map join特别适合大小表join的情況。Hive會将build table和probe table在map端直接完成join過程,消滅了reduce,效率很高。

select  a.event_type,b.upload_time	
from calendar_event_code a	
inner join (	
select event_type,upload_time from calendar_record_log	
where pt_date = 20190225	
) b on a.event_type < b.event_type;           

上面的語句中加了一條map join hint,以顯式啟用map join特性。早在Hive 0.8版本之後,就不需要寫這條hint了。map join還支援不等值連接配接,應用更加靈活。

map join的配置項是

hive.auto.convert.join

,預設值true,對應邏輯優化器是MapJoinProcessor。

還有一些參數用來控制map join的行為,比如

hive.mapjoin.smalltable.filesize

,當build table大小小于該值就會啟用map join,預設值25000000(25MB)。還有

hive.mapjoin.cache.numrows

,表示緩存build table的多少行資料到記憶體,預設值25000。

map join對分桶表還有特别的優化。由于分桶表是基于一列進行hash存儲的,是以非常适合抽樣(按桶或按塊抽樣)。

它對應的配置項是

hive.optimize.bucketmapjoin

,優化器是BucketMapJoinOptimizer。但我們的業務中用分桶表較少,是以就不班門弄斧了,隻是提一句。

這個配置與上面group by的傾斜均衡配置項異曲同工,通過

hive.optimize.skewjoin

來配置,預設false。

如果開啟了,在join過程中Hive會将計數超過門檻值

hive.skewjoin.key

(預設100000)的傾斜key對應的行臨時寫進檔案中,然後再啟動另一個job做map join生成結果。通過

hive.skewjoin.mapjoin.map.tasks

參數還可以控制第二個job的mapper數量,預設10000。

再重複一遍,通過自帶的配置項經常不能解決資料傾斜問題。join是資料傾斜的重災區,後面還要介紹在SQL層面處理傾斜的各種方法。

這種情況很常見,比如當事實表是日志類資料時,往往會有一些項沒有記錄到,我們視情況會将它置為null,或者空字元串、-1等。如果缺失的項很多,在做join時這些空值就會非常集中,拖累進度。

是以,若不需要空值資料,就提前寫where語句過濾掉。需要保留的話,将空值key用随機方式打散,例如将使用者ID為null的記錄随機改為負值:

select a.uid,a.event_type,b.nickname,b.age	
from (	
select	
  (case when uid is null then cast(rand()*-10240 as int) else uid end) as uid,	
  event_type from calendar_record_log	
where pt_date >= 20190201	
) a left outer join (	
select uid,nickname,age from user_info where status = 4	
) b on a.uid = b.uid;           

這其實是上面處理空值方法的拓展,不過傾斜的key變成了有意義的。一般來講傾斜的key都很少,我們可以将它們抽樣出來,對應的行單獨存入臨時表中,然後打上一個較小的随機數字首(比如0~9),最後再進行聚合。SQL語句與上面的相仿,不再贅述。

這種情況不太常見,主要出現在相同業務含義的列發生過邏輯上的變化時。

舉個例子,假如我們有一舊一新兩張月曆記錄表,舊表的記錄類型字段是(event_type int),新表的是(event_type string)。為了相容舊版記錄,新表的event_type也會以字元串形式存儲舊版的值,比如'17'。當這兩張表join時,經常要耗費很長時間。其原因就是如果不轉換類型,計算key的hash值時預設是以int型做的,這就導緻所有“真正的”string型key都配置設定到一個reducer上。是以要注意類型轉換:

select a.uid,a.event_type,b.record_data	
from calendar_record_log a	
left outer join (	
select uid,event_type from calendar_record_log_2	
where pt_date = 20190228	
) b on a.uid = b.uid and b.event_type = cast(a.event_type as string)	
where a.pt_date = 20190228;           

有時,build table會大到無法直接使用map join的地步,比如全量使用者次元表,而使用普通join又有資料分布不均的問題。這時就要充分利用probe table的限制條件,削減build table的資料量,再使用map join解決。代價就是需要進行兩次join。舉個例子:

select /*+mapjoin(b)*/ a.uid,a.event_type,b.status,b.extra_info	
from calendar_record_log a	
left outer join (	
select /*+mapjoin(s)*/ t.uid,t.status,t.extra_info	
from (select distinct uid from calendar_record_log where pt_date = 20190228) s	
inner join user_info t on s.uid = t.uid	
) b on a.uid = b.uid	
where a.pt_date = 20190228;           

Hive/HiveSQL常用優化方法全面總結

mapper數量與輸入檔案的split數息息相關,在Hadoop源碼

org.apache.hadoop.mapreduce.lib.input.FileInputFormat

類中可以看到split劃分的具體邏輯。這裡不貼代碼,直接叙述mapper數是如何确定的。

  • 可以直接通過參數

    mapred.map.tasks

    (預設值2)來設定mapper數的期望值,但它不一定會生效,下面會提到。
  • 設輸入檔案的總大小為

    total_input_size

    。HDFS中,一個塊的大小由參數

    dfs.block.size

    指定,預設值64MB或128MB。在預設情況下,mapper數就是:

    default_mapper_num = total_input_size / dfs.block.size

  • 參數

    mapred.min.split.size

    (預設值1B)和

    mapred.max.split.size

    (預設值64MB)分别用來指定split的最小和最大大小。split大小和split數計算規則是:

    split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size))

    split_num = total_input_size / split_size

  • 得出mapper數:

    mapper_num = MIN(split_num, MAX(default_num, mapred.map.tasks))

可見,如果想減少mapper數,就适當調高

mapred.min.split.size

,split數就減少了。如果想增大mapper數,除了降低

mapred.min.split.size

之外,也可以調高

mapred.map.tasks

一般來講,如果輸入檔案是少量大檔案,就減少mapper數;如果輸入檔案是大量非小檔案,就增大mapper數;至于大量小檔案的情況,得參考下面“合并小檔案”一節的方法處理。

reducer數量的确定方法比mapper簡單得多。使用參數

mapred.reduce.tasks

可以直接設定reducer數量,不像mapper一樣是期望值。但如果不設這個參數的話,Hive就會自行推測,邏輯如下:

  • hive.exec.reducers.bytes.per.reducer

    用來設定每個reducer能夠處理的最大資料量,預設值1G(1.2版本之前)或256M(1.2版本之後)。
  • hive.exec.reducers.max

    用來設定每個job的最大reducer數量,預設值999(1.2版本之前)或1009(1.2版本之後)。
  • 得出reducer數:

    reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)

reducer數量與輸出檔案的數量相關。如果reducer數太多,會産生大量小檔案,對HDFS造成壓力。如果reducer數太少,每個reducer要處理很多資料,容易拖慢運作時間或者造成OOM。

  • 輸入階段合并

    需要更改Hive的輸入檔案格式,即參數

    hive.input.format

    ,預設值是

    org.apache.hadoop.hive.ql.io.HiveInputFormat

    ,我們改成

    org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

    這樣比起上面調整mapper數時,又會多出兩個參數,分别是

    mapred.min.split.size.per.node

    mapred.min.split.size.per.rack

    ,含義是單節點和單機架上的最小split大小。如果發現有split大小小于這兩個值(預設都是100MB),則會進行合并。具體邏輯可以參看Hive源碼中的對應類。
  • 輸出階段合并

    直接将

    hive.merge.mapfiles

    hive.merge.mapredfiles

    都設為true即可,前者表示将map-only任務的輸出合并,後者表示将map-reduce任務的輸出合并。

    另外,

    hive.merge.size.per.task

    可以指定每個task輸出後合并檔案大小的期望值,

    hive.merge.size.smallfiles.avgsize

    可以指定所有輸出檔案大小的均值門檻值,預設值都是1GB。如果平均大小不足的話,就會另外啟動一個任務來進行合并。

壓縮job的中間結果資料和輸出資料,可以用少量CPU時間節省很多空間。壓縮方式一般選擇Snappy,效率最高。

要啟用中間壓縮,需要設定

hive.exec.compress.intermediate

為true,同時指定壓縮方式

hive.intermediate.compression.codec

org.apache.hadoop.io.compress.SnappyCodec

。另外,參數

hive.intermediate.compression.type

可以選擇對塊(BLOCK)還是記錄(RECORD)壓縮,BLOCK的壓縮率比較高。

輸出壓縮的配置基本相同,打開

hive.exec.compress.output

即可。

在MR job中,預設是每執行一個task就啟動一個JVM。如果task非常小而碎,那麼JVM啟動和關閉的耗時就會很長。可以通過調節參數

mapred.job.reuse.jvm.num.tasks

來重用。例如将這個參數設成5,那麼就代表同一個MR job中順序執行的5個task可以重複使用一個JVM,減少啟動和關閉的開銷。但它對不同MR job中的task無效。

  • 并行執行

    Hive中互相沒有依賴關系的job間是可以并行執行的,最典型的就是多個子查詢union all。在叢集資源相對充足的情況下,可以開啟并行執行,即将參數

    hive.exec.parallel

    設為true。另外

    hive.exec.parallel.thread.number

    可以設定并行執行的線程數,預設為8,一般都夠用。
  • 本地模式

    Hive也可以不将任務送出到叢集進行運算,而是直接在一台節點上處理。因為消除了送出到叢集的overhead,是以比較适合資料量很小,且邏輯不複雜的任務。

    設定

    hive.exec.mode.local.auto

    為true可以開啟本地模式。但任務的輸入資料總量必須小于

    hive.exec.mode.local.auto.inputbytes.max

    (預設值128MB),且mapper數必須小于

    hive.exec.mode.local.auto.tasks.max

    (預設值4),reducer數必須為0或1,才會真正用本地模式執行。

所謂嚴格模式,就是強制不允許使用者執行3種有風險的HiveSQL語句,一旦執行會直接失敗。這3種語句是:

  • 查詢分區表時不限定分區列的語句;
  • 兩表join産生了笛卡爾積的語句;
  • 用order by來排序但沒有指定limit的語句。

要開啟嚴格模式,需要将參數

hive.mapred.mode

設為strict。

在HiveSQL的create table語句中,可以使用

stored as ...

指定表的存儲格式。Hive表支援的存儲格式有TextFile、SequenceFile、RCFile、Avro、ORC、Parquet等。

存儲格式一般需要根據業務進行選擇,在我們的實操中,絕大多數表都采用TextFile與Parquet兩種存儲格式之一。

TextFile是最簡單的存儲格式,它是純文字記錄,也是Hive的預設格式。雖然它的磁盤開銷比較大,查詢效率也低,但它更多地是作為跳闆來使用。RCFile、ORC、Parquet等格式的表都不能由檔案直接導入資料,必須由TextFile來做中轉。

Parquet和ORC都是Apache旗下的開源列式存儲格式。列式存儲比起傳統的行式存儲更适合批量OLAP查詢,并且也支援更好的壓縮和編碼。我們選擇Parquet的原因主要是它支援Impala查詢引擎,并且我們對update、delete和事務性操作需求很低。

這裡就不展開講它們的細節,可以參考各自的官網:

https://parquet.apache.org/

https://orc.apache.org/

結束

寫了這麼多,肯定有遺漏或錯誤之處,歡迎各位大佬批評指正。

下一篇: uv計算

繼續閱讀