天天看點

HiveQL 性能優化筆記

HiveQL Performance Tuning Note

HiveQL 性能優化

一個Hive查詢生成多個map reduce job,一個map reduce job又有map,reduce,spill,shuffle,sort等多個階段,是以針對hive查詢的優化可以大緻分為針對MR中單個步驟的優化(其中又會有細分),針對MR全局的優化,和針對整個查詢(多MR job)的優化,下文會分别闡述。

HiveQL 性能優化筆記

在開始之前,先把MR的流程圖帖出來(摘自Hadoop權威指南),友善後面對照。本文寫作時,使用hive0.9版本 + hadoop 1.x版本。

HiveQL 性能優化筆記

1.Map階段的優化(map phase)

Map階段的優化,主要是确定合适的map數。那麼首先要了解map數的計算公式:

num_map_tasks = max[${mapred.min.split.size},
                min(${dfs.block.size}, ${mapred.max.split.size})]           
- mapred.min.split.size指的是資料的最小分割單元大小。
- mapred.max.split.size指的是資料的最大分割單元大小。
- dfs.block.size指的是HDFS設定的資料塊大小。
           

一般來說dfs.block.size這個值是一個已經指定好的值,而且這個參數hive是識别不到的:

hive> set dfs.block.size;
dfs.block.size is undefined           

是以實際上隻有mapred.min.split.size和mapred.max.split.size這兩個參數(本節内容後面就以min和max指代這兩個參數)來決定map數量。在hive中min的預設值是1B,max的預設值是256MB:

hive> set mapred.min.split.size;
mapred.min.split.size=1
hive> set mapred.max.split.size;
mapred.max.split.size=256000000           

是以如果不做修改的話,就是1個map task處理256MB資料,我們就以調整max為主。通過調整max可以起到調整map數的作用,減小max可以增加map數,增大max可以減少map數。需要提醒的是,直接調整_mapred.map.tasks_這個參數是沒有效果的。

調整大小的時機根據查詢的不同而不同,總的來講可以通過觀察map task的完成時間來确定是否需要增加map資源。如果map task的完成時間都是接近1分鐘,甚至幾分鐘了,那麼往往增加map數量,使得每個map task處理的資料量減少,能夠讓map task更快完成;而如果map task的運作時間已經很少了,比如10-20秒,這個時候增加map不太可能讓map task更快完成,反而可能因為map需要的初始化時間反而讓job總體速度變慢,這個時候反而需要考慮是否可以把map的數量減少,這樣可以節省更多資源給其他Job。

2.Reduce階段的優化(reduce phase)

這裡說的reduce階段,是指前面流程圖中的reduce phase(實際的reduce計算)而非圖中整個reduce task。Reduce階段優化的主要工作也是選擇合适的reduce task數量,跟上面的map優化類似。

與map優化不同的是,reduce優化時,可以直接設定_mapred.reduce.tasks_參數進而直接指定reduce的個數。當然直接指定reduce個數雖然比較友善,但是不利于自動擴充。Reduce數的設定雖然相較map更靈活,但是也可以像map一樣設定一個自動生成規則,這樣運作定時job的時候就不用擔心原來設定的固定reduce數會由于資料量的變化而不合适。

Hive估算reduce數量的時候,使用的是下面的公式:

num_reduce_tasks = min[${hive.exec.reducers.max}, 
                      (${input.size} / ${ hive.exec.reducers.bytes.per.reducer})]           

hive.exec.reducers.bytes.per.reducer

預設為1G,也就是每個reduce處理相當于job輸入檔案中1G大小的對應資料量,而且reduce個數不能超過一個上限參數值,這個參數的預設取值為999。是以我們也可以用調整這個公式的方式調整reduce數量,在靈活性和定制性上取得一個平衡。

設定reduce數同樣也是根據運作時間作為參考調整,并且可以根據特定的業務需求、工作負載類型總結出經驗,是以不再贅述。

3.Map與Reduce之間的優化(spill, copy, sort phase)

map phase和reduce phase之間主要有3道工序。首先要把map輸出的結果進行排序後做成中間檔案,其次這個中間檔案就能分發到各個reduce,最後reduce端在執行reduce phase之前把收集到的排序子檔案合并成一個排序檔案。需要強調的是,雖然這個部分可以調的參數挺多,但是大部分在一般情況下都是不要調整的,除非能精準的定位到這個部分有問題。

3.1.Spill 與 Sort

在spill階段,由于記憶體不夠,資料可能沒辦法在記憶體中一次性排序完成,那麼就隻能把局部排序的檔案先儲存到磁盤上,這個動作叫spill,然後spill出來的多個檔案可以在最後進行merge。如果發生spill,可以通過設定io.sort.mb來增大mapper輸出buffer的大小,避免spill的發生。另外合并時可以通過設定io.sort.factor來使得一次性能夠合并更多的資料,預設值為10,也就是一次歸并10個檔案。調試參數的時候,一個要看spill的時間成本,一個要看merge的時間成本,還需要注意不要撐爆記憶體(io.sort.mb是算在map的記憶體裡面的)。Reduce端的merge也是一樣可以用io.sort.factor。一般情況下這兩個參數很少需要調整,除非很明确知道這個地方是瓶頸。比如如果map端的輸出太大,考慮到map數不一定能很友善的調整,那麼這個時候就要考慮調大io.sort.mb(不過即使調大也要注意不能超過jvm heap size)。而map端的輸出很大,要麼是每個map讀入了很大的檔案(比如不能split的大gz壓縮檔案),要麼是計算邏輯導緻輸出膨脹了很多倍,都是比較少見的情況。

3.2.Copy(Hive shuffle)

這裡說的copy,一般叫做shuffle更加常見。但是由于一開始的配圖以及MR job的web監控頁對這個環節都是叫copy phase,指代更加精确,是以這裡稱為copy。

copy階段是把檔案從map端copy到reduce端。預設情況下在5%的map完成的情況下reduce就開始啟動copy,這個有時候是很浪費資源的,因為reduce一旦啟動就被占用,一直等到map全部完成,收集到所有資料才可以進行後面的動作,是以我們可以等比較多的map完成之後再啟動reduce流程,這個比例可以通過

_mapred.reduce.slowstart.completed.maps_去調整,他的預設值就是5%。
如果覺得這麼做會減慢reduce端copy的進度,可以把copy過程的線程增大。

_tasktracker.http.threads_可以決定作為server端的map用于提供資料傳輸服務的線程,
_mapred.reduce.parallel.copies_可以決定作為client端的reduce同時從map端拉取資料
的并行度(一次同時從多少個map拉資料),修改參數的時候這兩個注意協調一下,server端能處理client端的請求即可。

另外,在shuffle階段可能會出現的OOM問題,原因比較複雜,一般認為是記憶體配置設定不合理,GC無法及時釋放記憶體導緻。對于這個問題,可以嘗試調低shuffle buffer的控制參數
_mapred.job.shuffle.input.buffer.percent_這個比例值,預設值0.7,即shuffle buffer占到reduce task heap size的70%。另外也可以直接嘗試增加reduce數量。
           

4.檔案格式的優化

檔案格式方面有兩個問題,一個是給輸入和輸出選擇合适的檔案格式,另一個則是小檔案問題。小檔案問題在目前的hive環境下已經得到了比較好的解決,hive的預設配置中就可以在小檔案輸入時自動把多個檔案合并給1個map處理,輸出時如果檔案很小也會進行一輪單獨的合并,是以這裡就不專門讨論了。相關的參數可以在這裡找到。

關于檔案格式,Hive0.9版本有3種,textfile,sequencefile和rcfile。總體上來說,rcfile的壓縮比例和查詢時間稍好一點,是以推薦使用。

關于使用方法,可以在建表結構時可以指定格式,然後指定壓縮插入:

create table rc_file_test( col int ) stored as rcfile;
set hive.exec.compress.output = true;
insert overwrite table rc_file_test
select * from source_table;           

另外時也可以指定輸出格式,也可以通過hive.default.fileformat來設定輸出格式,适用于create table as select的情況:

set hive.default.fileformat = SequenceFile;
set hive.exec.compress.output = true; 
/*對于sequencefile,有record和block兩種壓縮方式可選,block壓縮比更高*/
set mapred.output.compression.type = BLOCK; 
create table seq_file_test
as select * from source_table;           

上面的檔案格式轉換,其實是由hive完成的(也就是插入動作)。但是也可以由外部直接導入純文字(可以按照這裡的做法預先壓縮),或者是由MapReduce Job生成的資料。

值得注意的是,hive讀取sequencefile的時候,是把key忽略的,也就是直接讀value并且按照指定分隔符分隔字段。但是如果hive的資料來源是從mr生成的,那麼寫sequencefile的時候,key和value都是有意義的,key不能被忽略,而是應該當成第一個字段。為了解決這種不比對的情況,有兩種辦法。一種是要求凡是結果會給hive用的mr job輸出value的時候帶上key。但是這樣的話對于開發是一個負擔,讀寫資料的時候都要注意這個情況。是以更好的方法是第二種,也就是把這個源自于hive的問題交給hive解決,寫一個InputFormat包裝一下,把value輸出加上key即可。以下是核心代碼,修改了RecordReader的next方法:

//注意:這裡為了簡化,假定了key和value都是Text類型,是以MR的輸出的k/v都要是Text類型。
//這個簡化還會造成資料為空時,出現org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text的錯誤,因為預設hive的sequencefile的key是一個空的ByteWritable。
public synchronized boolean next(K key, V value) throws IOException 
{
    Text tKey = (Text) key;
    Text tValue = (Text) value;
    if (!super.next(innerKey, innerValue)) 
        return false;

    Text inner_key = (Text) innerKey; //在構造函數中用createKey()生成
    Text inner_value = (Text) innerValue; //在構造函數中用createValue()生成

    tKey.set(inner_key);
    tValue.set(inner_key.toString() + '\t' + inner_value.toString()); // 分隔符注意自己定義
    return true;
}           

5.Job整體優化

有一些問題必須從job的整體角度去觀察。這裡讨論幾個問題:Job執行模式(本地執行v.s.分布式執行)、JVM重用、索引、Join算法、資料傾斜、Top N問題。

5.1. Job執行模式

Hadoop的map reduce job可以有3種模式執行,即本地模式,僞分布式,還有真正的分布式。本地模式和僞分布式都是在最初學習hadoop的時候往往被說成是做單機開發的時候用到。但是實際上對于處理資料量非常小的job,直接啟動分布式job會消耗大量資源,而真正執行計算的時間反而非常少。這個時候就應該使用本地模式執行mr job,這樣執行的時候不會啟動分布式job,執行速度就會快很多。比如一般來說啟動分布式job,無論多小的資料量,執行時間一般不會少于20s,而使用本地mr模式,10秒左右就能出結果。

設定執行模式的主要參數有三個,一個是hive.exec.mode.local.auto,把他設為true就能夠自動開啟local mr模式。但是這還不足以啟動local mr,輸入的檔案數量和資料量大小必須要控制,這兩個參數分别為hive.exec.mode.local.auto.tasks.max和hive.exec.mode.local.auto.inputbytes.max,預設值分别為4和128MB,即預設情況下,map處理的檔案數不超過4個并且總大小小于128MB就啟用local mr模式。

另外,如果是簡單的select語句,比如select某個列取個10條資料看看sample,那麼在hive0.10之後有專門的fetch task優化,使用參數hive.fetch.task.conversion即可。

5.2. JVM重用

正常情況下,MapReduce啟動的JVM在完成一個task之後就退出了,但是如果任務花費時間很短,又要多次啟動JVM的情況下(比如對很大資料量進行計數操作),JVM的啟動時間就會變成一個比較大的overhead。在這種情況下,可以使用jvm重用的參數:

set mapred.job.reuse.jvm.num.tasks = 5;           

他的作用是讓一個jvm運作多次任務之後再退出。這樣一來也能節約不少JVM啟動時間。

5.3. 索引

總體上來說,hive的索引目前還是一個不太适合使用的東西,這裡隻是考慮到叙述完整性,對其進行基本的介紹。

Hive中的索引架構開放了一個接口,允許你根據這個接口去實作自己的索引。目前hive自己有一個參考的索引實作(CompactIndex),後來在0.8版本中又加入位圖索引。這裡就講講CompactIndex。

CompactIndex的實作原理類似一個lookup table,而非傳統資料庫中的B樹。如果你對table A的col1做了索引,索引檔案本身就是一個table,這個table會有3列,分别是col1的枚舉值,每個值對應的資料檔案位置,以及在這個檔案位置中的偏移量。通過這種方式,可以減少你查詢的資料量(偏移量可以告訴你從哪個位置開始找,自然隻需要定位到相應的block),起到減少資源消耗的作用。但是就其性能來說,并沒有很大的改善,很可能還不如建構索引需要花的時間。是以在叢集資源充足的情況下,沒有太大必要考慮索引。

CompactIndex的還有一個缺點就是使用起來不友好,索引建完之後,使用之前還需要根據查詢條件做一個同樣剪裁才能使用,索引的内部結構完全暴露,而且還要花費額外的時間。具體看看下面的使用方法就了解了:

/*在index_test_table表的id字段上建立索引*/
create index idx on table index_test_table(id)  
as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild;
alter index idx on index_test_table rebuild;
    
/*索引的剪裁。找到上面建的索引表,根據你最終要用的查詢條件剪裁一下。*/
/*如果你想跟RDBMS一樣建完索引就用,那是不行的,會直接報錯,這也是其麻煩的地方*/
create table my_index
as select _bucketname, `_offsets`
from default__index_test_table_idx__ where id = 10;
    
/*現在可以用索引了,注意最終查詢條件跟上面的剪裁條件一緻*/
set hive.index.compact.file = /user/hive/warehouse/my_index; 
set hive.input.format = org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;
select count(*) from index_test_table where id = 10;           

5.4. Join算法

處理分布式join,一般有兩種方法:

- replication join:把其中一個表複制到所有節點,這樣另一個表在每個節點上面的分片就可以跟這個完整的表join了;
- repartition join:把兩份資料按照join key進行hash重分布,讓每個節點處理hash值相同的join key資料,也就是做局部的join。
           

這兩種方式在M/R Job中分别對應了map side join和reduce side join。在一些MPP資料庫中,資料可以按照某列字段預先進行hash分布,這樣在跟這個表以這個字段為join key進行join的時候,該表肯定不需要做資料重分布了。這種功能是以HDFS作為底層檔案系統的hive所沒有的,即使是hive中的bucket也隻能到檔案級别的hash,而非節點級别的hash。

在預設情況下,hive的join政策是進行reduce side join。當兩個表中有一個是小表的時候,就可以考慮用map join了,因為小表複制的代價會好過大表shuffle的代價。

使用map join的配置方法有兩種

一種直接在sql中寫hint,文法是 /*+MAPJOIN (tbl)*/,其中tbl就是你想要做replication的表。
    另一種方法是設定 hive.auto.convert.join = true,這樣hive會自動判斷目前的join操作是否合适做map join,主要是找join的兩個表中有沒有小表。至于多大的表算小表,則是由 hive.smalltable.filesize 決定,預設25MB。
           

但是有的時候,沒有一個表足夠小到能夠放進記憶體,但是還是想用map join怎麼辦?

這個時候就要用到bucket map join。其方法是兩個join表在join key上都做hash bucket,并且把你打算複制的那個(相對)小表的bucket數設定為大表的倍數。這樣資料就會按照join key做hash bucket。小表依然複制到所有節點,map join的時候,小表的每一組bucket加載成hashtable,與對應的一個大表bucket做局部join,這樣每次隻需要加載部分hashtable就可以了。

然後在兩個表的join key都具有唯一性的時候(也就是可做主鍵),還可以進一步做sort merge bucket map join。做法還是兩邊要做hash bucket,而且每個bucket内部要進行排序。這樣一來當兩邊bucket要做局部join的時候,隻需要用類似merge sort算法中的merge操作一樣把兩個bucket順序周遊一遍即可完成,這樣甚至都不用把一個bucket完整的加載成hashtable,這對性能的提升會有很大幫助。

然後這裡以一個完整的實驗說明這幾種join算法如何操作。

首先建表要帶上bucket:

create table map_join_test(id int)
clustered by (id) sorted by (id) into 32 buckets
stored as textfile;           

然後插入我們準備好的800萬行資料,注意要強制劃分成bucket(也就是用reduce劃分hash值相同的資料到相同的檔案):

set hive.enforce.bucketing = true;
insert overwrite table map_join_test
select * from map_join_source_data;           

這樣這個表就有了800萬id值(且裡面沒有重複值,是以可以做sort merge),占用80MB左右。

map join

接下來我們就可以一一嘗試map join的算法了。首先是普通的map join:

select /*+mapjoin(a) */count(*)
from map_join_test a
join map_join_test b on a.id = b.id;           
Tips:Map Join通常隻适用于一個大表和一個小表做關聯的場景,例如事實表和維表的關聯。
           

然後就會看到分發hash table的過程:

2013-08-31 09:08:43     Starting to launch local task to process map join;      maximum memory = 1004929024
2013-08-31 09:08:45     Processing rows:   200000  Hashtable size: 199999  Memory usage:   38823016        rate:   0.039
2013-08-31 09:08:46     Processing rows:   300000  Hashtable size: 299999  Memory usage:   56166968        rate:   0.056
……
2013-08-31 09:12:39     Processing rows:  4900000 Hashtable size: 4899999 Memory usage:   896968104       rate:   0.893
2013-08-31 09:12:47     Processing rows:  5000000 Hashtable size: 4999999 Memory usage:   922733048       rate:   0.918
Execution failed with exit status: 2
Obtaining error information

Task failed!
Task ID:
  Stage-4           

不幸的是,居然記憶體不夠了,直接做map join失敗了。但是80MB的大小為何用1G的heap size都放不下?觀察整個過程就會發現,平均一條記錄需要用到200位元組的存儲空間,這個overhead太大了,對于map join的小表size一定要好好評估,如果有幾十萬記錄數就要小心了。雖然不太清楚其中的構造原理,但是在網際網路上也能找到其他的例證,比如這裡和這裡,平均一行500位元組左右。這個明顯比一般的表一行占用的資料量要大。不過hive也在做這方面的改進,争取縮小hash table,比如HIVE-6430。

是以接下來我們就用bucket map join,之前分的bucket就派上用處了。隻需要在上述sql的前面加上如下的設定:

set hive.optimize.bucketmapjoin = true;           

然後還是會看到hash table分發:

2013-08-31 09:20:39     Starting to launch local task to process map join;      maximum memory = 1004929024
2013-08-31 09:20:41     Processing rows:   200000  Hashtable size: 199999  Memory usage:   38844832        rate:   0.039
2013-08-31 09:20:42     Processing rows:   275567  Hashtable size: 275567  Memory usage:   51873632        rate:   0.052
2013-08-31 09:20:42     Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable
2013-08-31 09:20:46     Upload 1 File to: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable File size: 11022975
2013-08-31 09:20:47     Processing rows:   300000  Hashtable size: 24432   Memory usage:   8470976 rate:   0.008
2013-08-31 09:20:47     Processing rows:   400000  Hashtable size: 124432  Memory usage:   25368080        rate:   0.025
2013-08-31 09:20:48     Processing rows:   500000  Hashtable size: 224432  Memory usage:   42968080        rate:   0.043
2013-08-31 09:20:49     Processing rows:   551527  Hashtable size: 275960  Memory usage:   52022488        rate:   0.052
2013-08-31 09:20:49     Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000001_0.hashtable
……           

這次就會看到每次建構完一個hash table(也就是所對應的對應一個bucket),會把這個hash table寫入檔案,重新建構新的hash table。這樣一來由于每個hash table的量比較小,也就不會有記憶體不足的問題,整個sql也能成功運作。不過光光是這個複制動作就要花去3分半的時間,是以如果整個job本來就花不了多少時間的,那這個時間就不可小視。

最後我們試試sort merge bucket map join(一般簡稱SMB join),在bucket map join的基礎上加上下面的設定即可:

set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;           

SMB join是不會産生hash table分發的步驟的,直接開始做實際map端join操作了,每個mapper讀取兩個表相同的bucket,不需要将小表的bucket加載到記憶體,而通過merge sort的方式把兩個bucket的資料走一遍即可。這個時候周遊資料肯定不會像正常情況下先周遊完第一個bucket,再周遊第二個bucket,那樣無法實作merge sort。SMB join最大的優勢就在于對于表的大小沒有要求的情況下能做map join。

關于join的算法雖然有這麼些選擇,但是個人覺得,對于日常使用,掌握預設的reduce join和普通的(無bucket)map join已經能解決大多數問題。如果小表不能完全放記憶體,但是小表相對大表的size量級差别也非常大的時候,或者是必須要做cross join,那也可以試試bucket map join,不過其hash table分發的過程會浪費不少時間,需要評估下是否能夠比reduce join更高效。而SMB join雖然性能不錯,也能适用于大表之間的join,但是把資料做成bucket本身也需要時間,如果隻是臨時join一次,還不如直接用reduce join。是以适用SMB join的場景相對比較少見,“使用者基本表 join 使用者擴充表”以及“使用者今天的資料快照 join 使用者昨天的資料快照”這類場景可能比較合适。

這裡順便說個題外話,在資料倉庫中,小表往往是次元表,而小表map join這件事情其實用udf代替還會更快,因為不用單獨啟動一輪job,是以這也是一種可選方案。當然前提條件是次元表是固定的自然屬性(比如日期),隻增加不修改(比如網站的頁面編号)的情況也可以考慮。如果次元有更新,要做緩慢變化維的,當然還是維表好維護。至于維表原本的一個主要用途OLAP,以Hive目前的性能是沒法實作的,也就不需要多慮了。

5.5. 資料傾斜

這裡說的所謂資料傾斜,說的是由于資料分布不均勻,個别值集中占據大部分資料量,加上hadoop的計算模式,導緻reduce階段不同的task配置設定到的不均勻引起性能下降。由于計算邏輯在不同資料上耗時不同導緻的計算傾斜或者是map端讀取資料不均勻造成的傾斜不在這裡的讨論範圍。下圖就是一個例子:

HiveQL 性能優化筆記

傾斜分成group by造成的傾斜和join造成的傾斜,需要分開看。

5.6. GroupBy資料傾斜

group by造成的傾斜相對來說比較容易解決。

hive提供兩個參數可以解決,一個是 hive.map.aggr ,預設值已經為true,他的意思是做map aggregation,也就是在mapper裡面做聚合。這個方法不同于直接寫mapreduce的時候可以實作的combiner,但是卻實作了類似combiner的效果。事實上各種基于mr的架構如pig,cascading等等用的都是map aggregation(或者叫partial aggregation)而非combiner的政策,也就是在mapper裡面直接做聚合操作而不是輸出到buffer給combiner做聚合。對于map aggregation,hive還會做檢查,如果aggregation的效果不好,那麼hive會自動放棄map aggregation。判斷效果的依據就是經過一小批資料的處理之後,檢查聚合後的資料量是否減小到一定的比例,預設是0.5,由 hive.map.aggr.hash.min.reduction 這個參數控制。是以如果确認資料裡面确實有個别取值傾斜,但是大部分值是比較稀疏的,這個時候可以把比例強制設為1,避免極端情況下map aggr失效。_hive.map.aggr_ 還有一些相關參數,比如map aggr的記憶體占用等,具體可以參考這篇文章。另一個參數是 _hive.groupby.skewindata_。這個參數的意思是做reduce操作的時候,拿到的key并不是所有相同值給同一個reduce,而是随機分發,然後reduce做聚合,做完之後再做一輪MR,拿前面聚合過的資料再算結果。是以這個參數其實跟hive.map.aggr做的是類似的事情,隻是拿到reduce端來做,而且要額外啟動一輪job,是以其實不怎麼推薦用,效果不明顯。

另外需要注意的是count distinct操作往往需要改寫SQL,可以按照下面這麼做:

/*改寫前*/
select a, count(distinct b) as c from tbl group by a;

/*改寫後*/
select a, count(*) as c from (select a, b from tbl group by a, b) group by a;           

5.7. Join資料傾斜

join造成的傾斜,常見情況是不能做map join的兩個表(能做map join的話基本上可以避免傾斜),其中一個是行為表,另一個應該是屬性表。

比如我們有三個表,一個使用者屬性表users,一個商品屬性表items,還有一個使用者對商品的操作行為表日志表logs。假設現在需要将行為表關聯使用者表:

select * from logs a join users b on a.user_id = b.user_id;           

其中logs表裡面會有一個特殊使用者user_id = 0,代表未登入使用者,假如這種使用者占了相當的比例,那麼個别reduce會收到比其他reduce多得多的資料,因為它要接收所有user_id = 0的記錄進行處理,使得其處理效果會非常差,其他reduce都跑完很久了它還在運作。

skew join

hive給出的解決方案叫skew join,其原理把這種user_id = 0的特殊值先不在reduce端計算掉,而是先寫入hdfs,然後啟動一輪map join專門做這個特殊值的計算,期望能提高計算這部分值的處理速度。當然你要告訴hive這個join是個skew join,即:

set hive.optimize.skewjoin = true;           

還有要告訴hive如何判斷特殊值,根據 hive.skewjoin.key 設定的數量hive可以知道,比如預設值是100000,那麼超過100000條記錄的值就是特殊值。總結起來,skew join的流程可以用下圖描述:

不過,這種方法還要去考慮門檻值之類的情況,其實也不夠通用。是以針對join傾斜的問題,一般都是通過改寫sql解決。對于上面這個問題,我們已經知道user_id = 0是一個特殊key,那麼可以把特殊值隔離開來單獨做join,這樣特殊值肯定會轉化成map join,非特殊值就是沒有傾斜的普通join了:

select *
from (select * from logs where user_id = 0)  a 
join (select * from users where user_id = 0) b 
on a.user_id =  b.user_id
union all
select * 
from logs a join users b
on a.user_id <> 0 and a.user_id = b.user_id;           

上面這種個别key傾斜的情況隻是一種傾斜情況。最常見的傾斜是因為資料分布本身就具有長尾性質,比如我們将日志表和商品表關聯:

select * from logs a join items b on a.item_id = b.item_id;           

這個時候,配置設定到熱門商品的reducer就會很慢,因為熱門商品的行為日志肯定是最多的,而且我們也很難像上面處理特殊user那樣去處理item。這個時候就會用到加随機數的方法,也就是在join的時候增加一個随機數,随機數的取值範圍n相當于将item給分散到n個reducer:

select a.*, b.*
from (select *, cast(rand() * 10 as int) as r_id from logs)a
join (select *, r_id from items 
lateral view explode(range_list(1,10)) rl as r_id)b
on a.item_id = b.item_id and a.r_id = b.r_id           

上面的寫法裡,對行為表的每條記錄生成一個1-10的随機整數,對于item屬性表,每個item生成10條記錄,随機key分别也是1-10,這樣就能保證行為表關聯上屬性表。其中range_list(1,10)代表用udf實作的一個傳回1-10整數序列的方法。這個做法是一個解決join傾斜比較根本性的通用思路,就是如何用随機數将key進行分散。當然,可以根據具體的業務場景做實作上的簡化或變化。

除了上面兩類情況,還有一類情況是因為業務設計導緻的問題,也就是說即使行為日志裡面join key的資料分布本身并不明顯傾斜,但是業務設計導緻其傾斜。比如對于商品item_id的編碼,除了本身的id序列,還人為的把item的類型也作為編碼放在最後兩位,這樣如果類型1(電子産品)的編碼是00,類型2(家居産品)的編碼是01,并且類型1是主要商品類,将會造成以00為結尾的商品整體傾斜。這時,如果reduce的數量恰好是100的整數倍,會造成partitioner把00結尾的item_id都hash到同一個reducer,引爆問題。這種特殊情況可以簡單的設定合适的reduce值來解決,但是這種坑對于不了解業務的情況下就會比較隐蔽。

Top N問題

有時候我們需要在一大堆資料中取top n,比如說取通路日志裡面時間最早的10條記錄。基于sql實作這個需求就是使用order by col limit n。hive預設的order by實作隻會用1個reduce做全局排序,這在資料量大的時候job運作效率非常低。hive在0.12版本引入了parallel order by,也就是通過sampling的方式實作并行(即基于_TotalOrderPartitioner_)。具體開關參數是 _hive.optimize.sampling.orderby_。但是如果使用這個參數還是很可能碰到問題的:

- 首先如果order by字段本身取值範圍過少,會造成Split points are out of order錯誤。這是因為,假設job中reduce數量為r的話,那麼TotalOrderPartitioner需要order by字段的取值至少要有r - 1個。那麼這樣一來還需要關心reduce數量,增加了開發負擔,而且如果把reduce數量設的很小,優化的效果就不太明顯了。
- 其次,設定這個參數還可能造成聚會函數出錯,這個問題隻在比較新的hive版本中解決了。
           

實際上,如果隻是取top n而非全局排序,隻需要使用_sort by col limit n_的寫法就能達到很好的效果。sort by文法本身保證每個reduce内資料有序,這樣就等于是做并行排序。而limit n則保證兩件事:一方面是使得并行排序時每個reduce的輸出記錄數隻是n,也就是先在每個reduce内部做top n(可以explain一下看看執行計劃更加清楚);另外一方面,等局部top n完成之後,再起一輪job,用1個reduce做全局top n,這個時候雖然不是并行排序,但是處理的資料量也已經大大減少,不會造成效率問題了。當然,如果自己實作mapreduce,可以在mapper任務内維護最小最大堆,直接在map端實作并行的top n,再輸出給1個reducer做全局top n,隻需要一輪job即可完成。不過如果n的大小沒有限制很可能會撐爆記憶體,而且即使沒有記憶體問題,實作也比較複雜,是以hive中沒有實作這樣的Operator,而是用上面描述的方式解決。畢竟在reduce端做top n,排序問題就已經在MR架構層面解決了,隻需要考慮limit即可。

除了對全部資料取top n,分組top n也是常見場景,比如學生成績表取每個學科前三名,使用者點選流資料取每個使用者最早的幾個點選等等。如果每個分組需要排序的資料量不大,那麼可以用視窗函數解決,或者在不支援視窗函數的比較老的hive版本自己實作udf。但是如果每個分組本身很大,還是會很慢。如果追求性能的話,同樣可以借鑒sort by limit的寫法,在分組個數不多且固定的情況下直接将分組寫死。比如“取每個性别通路次數最多的10人”類似這樣的情景,就可以拆解為“男性通路次數最多的10人 + 女性通路次數最多的10人”。

6.SQL整體優化

前面對于單個job如何做優化已經做過詳細讨論,但是hive查詢會生成多個job,針對多個job,有什麼地方需要優化?

6.1. Job間并行

首先,在hive生成的多個job中,在有些情況下job之間是可以并行的,典型的就是子查詢。當需要執行多個子查詢union all或者join操作的時候,job間并行就可以使用了。比如下面的代碼就是一個可以并行的場景示意:

select * from 
(
   select count(*) from logs 
   where log_date = 20130801 and item_id = 1
   union all 
   select count(*) from logs 
   where log_date = 20130802 and item_id = 2
   union all 
   select count(*) from logs 
   where log_date = 20130803 and item_id = 3
)t           

設定job間并行的參數是 _hive.exec.parallel_,将其設為true即可。預設的并行度為8,也就是最多允許sql中8個job并行。

set hive.exec.parallel=true           

如果想要更高的并行度,可以通過 hive.exec.parallel.thread.number 參數進行設定,但要避免設定過大而占用過多資源。

6.2. 減少Job數

另外在實際開發過程中也發現,一些實作思路會導緻生成多餘的job而顯得不夠高效。比如這個需求:查詢某網站日志中同時通路過頁面a和頁面b的使用者數量。低效的思路是面向明細的,先取出看過頁面a的使用者,再取出看過頁面b的使用者,然後取交集,代碼如下:

select count(*) 
from (select user_id from logs where page_name = 'a' group by user_id) a 
join (select user_id from logs where blog_owner = 'b' group by user_id) b 
on a.user_id = b.user_id;           

這樣一來,就要産生2個求子查詢的job,一個用于關聯的job,還有一個計數的job,一共有4個job。

但是我們直接用面向統計的方法去計算的話(也就是用group by替代join),則會更加符合M/R的模式,隻需要用兩個job就能跑完:

select count (*) from (
select user_id 
from logs group by user_id
having (count(case when page_name = 'a' then 1 end) *
        count(case when page_name = 'b' then 1 end) > 0)
)t;           

第一種查詢方法符合思考問題的直覺,是工程師和分析師在實際查資料中最先想到的寫法,但是如果在目前hive的query planner不是那麼智能的情況下,想要更加快速的跑出結果,懂一點工具的内部機理也是必須的。

當然了,也有同學有其它的思路,隻是沒有上面那麼高效:

select count(*) from
(
    select user_id,
    count(case when blog_owner = 'a' then 1 end) as visit_z,
    count(case when blog_owner = 'b' then 1 end) as visit_l
    from cnblogs_visit_20130801 group by user_id
) t
where visit_z > 0 and visit_l > 0;           

這種實作方式轉換成job就隻會有2個:内層的子查詢和外層的統計,是以對 SQL和原理都比較熟悉才能在 HIVE 中遊刃有餘