天天看點

大資料篇--Hive調優

文章目錄

      • 一、表設計層面
        • 1.關閉動态分區:
        • 2.開啟分桶:
        • 3.采用合适的存儲格式:
      • 二、參數調優
        • 1.嚴格模式:
        • 2.Fetch Task功能:
        • 3.reduce個數控制:
        • 4.map join:
        • 5.skewjoin方案:
        • 6.group by導緻的資料傾斜:
        • 7.調整切片數(Map任務數):
        • 8.本地模式:
      • 三、文法層面調優
        • 1.order by和sort by:
        • 2.cluster by和distribute by:
        • 3.執行計劃Explain:
        • 4.where條件優化:
        • 5.union優化:
        • 6.count distinct優化:
      • 四、資料傾斜
        • 1.表現:
        • 2.一些常見的:
        • 3.空值産生的資料傾斜:
        • 4.不同資料類型關聯産生資料傾斜:

一、表設計層面

1.關閉動态分區:

  動态分區插入資料,會産生大量的小檔案,map資料會增加,同時namenode也需要存儲更多中繼資料資訊,檢索更多的小檔案。還有一個更加隐秘的問題,從A表導入資料到B表,AB倆表的分區列一樣,如果這時候偷懶,插入B表開動态分區,hadoop會生成假的reduce個數,真實的reduce個數,也就是處理資料reduce節點和分區數一緻,其他的reduce都是空跑。如果導入資料極大,redue個數很少,會産生嚴重的資料傾斜。解決辦法:使用distribute by+靜态分區。

  靜态分區是插入時對分區字段指定值,動态分區是插入時對分區字段不指定值;動态分區在資料量大于0時才會建立分區,靜态分區的資料量為0時也會建立分區。動态分區可以通過下面的設定來打開:

set hive.exec.dynamic.partition=true; -- 是開啟動态分區
set hive.exec.dynamic.partition.mode=nonstrict; -- 這個屬性預設值是strict,就是要求分區字段必須有一個是靜态的分區值,目前設定為nonstrict,那麼可以全部動态分區
           

2.開啟分桶:

分桶:按照使用者建立表時指定的分桶字段進行hash散列多個檔案。單個分區或者表中的資料量越來越大,當分區不能更細粒的劃分資料時,是以會采用分桶技術将資料更細粒度的劃分和管理。

  獲得更高的查詢處理效率:桶為表加上了額外的結構,Hive 在處理有些查詢時能利用這個結構。具體而言,連接配接兩個在(包含連接配接列的)相同列上劃分了桶的表,可以使用 Map 端連接配接 (Map-side join)高效的實作。比如JOIN操作。對于JOIN操作兩個表有一個相同的列,如果對這兩個表都進行了桶操作。那麼将儲存相同列值的桶進行JOIN操作就可以,可以大大較少JOIN的資料量。

set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
           

3.采用合适的存儲格式:

  在HiveSQL的create table語句中,可以使用

stored as ...

指定表的存儲格式。Hive表支援的存儲格式有TextFile、SequenceFile、RCFile、Avro、ORC、Parquet等。

  存儲格式一般需要根據業務進行選擇,在我們的實操中,絕大多數表都采用TextFile與Parquet兩種存儲格式之一。

  TextFile是最簡單的存儲格式,它是純文字記錄,也是Hive的預設格式。雖然它的磁盤開銷比較大,查詢效率也低,但它更多地是作為跳闆來使用。RCFile、ORC、Parquet等格式的表都不能由檔案直接導入資料,必須由TextFile來做中轉。

  Parquet和ORC都是Apache旗下的開源列式存儲格式。列式存儲比起傳統的行式存儲更适合批量OLAP查詢,并且也支援更好的壓縮和編碼。我們選擇Parquet的原因主要是它支援Impala查詢引擎,并且我們對update、delete和事務性操作需求很低。

  這裡就不展開講它們的細節,可以參考各自的官網:

https://parquet.apache.org/

https://orc.apache.org/

二、參數調優

1.嚴格模式:

  在hive裡面可以通過嚴格模式防止使用者執行那些可能産生意想不到的查詢,進而保護hive的叢集。在嚴格模式下,使用者在運作如下query的時候會報錯:

  • 分區表的查詢沒有使用分區字段來限制
  • 使用了order by 但沒有使用limit語句。(如果不使用limit,會對查詢結果進行全局排序,消耗時間長)
  • 當使用者寫代碼将表的别名寫錯的時候會引起笛卡爾積,例如

    select * from origindb.promotion__campaign c JOIN origindb.promotion__campaignex ce ON c.id = c.id limit 1000;

-- 預設是非嚴格模式(nonstrict)
hive> set hive.mapred.mode;
hive.mapred.mode=nonstrict
-- 設定成嚴格模式後一定要加limit,否則會報錯
hive> set hive.mapred.mode=strict;
hive> select * from emp order by empno desc;
FAILED: SemanticException 1:27 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'empno'
           

2.Fetch Task功能:

  某些 SELECT 查詢可以轉換為一個 FETCH 任務,進而最大限度地可以減少互動的延遲。在目前情況下,查詢隻能是單一資料源,不能有任何的子查詢,不能有任何的聚合,去重,Lateral views 以及 Join。Fetch 任務是 Hive 中執行效率比較高的任務之一。直接周遊檔案并輸出結果,而不是啟動 MapReduce 作業進行查詢。對于簡單的查詢,如帶有 LIMIT 語句的 SELECT * 查詢,這會非常快(機關數秒級)。在這種情況下,Hive 可以通過執行 HDFS 操作來傳回結果。

在hive-site.xml中有三個fetch task相關的值:

hive.fetch.task.conversion

hive.fetch.task.conversion.threshold

hive.fetch.task.aggr

hive.fetch.task.conversion.threshold:在輸入大小為多少以内的時候fetch task生效,從 Hive 0.13.0 版本到 Hive 0.13.1 版本起,預設值為-1(表示沒有任何的限制),Hive 0.14.0 版本以及更高版本預設值改為 1073741824 byte(1G)。

<property>
  <name>hive.fetch.task.conversion.threshold</name>
  <value>1073741824</value>
  <description>
    Input threshold for applying hive.fetch.task.conversion. If target table is native, input length
    is calculated by summation of file lengths. If it's not native, storage handler for the table
    can optionally implement org.apache.hadoop.hive.ql.metadata.InputEstimator interface.
  </description>
</property>
           

hive.fetch.task.aggr:對于沒有group by的聚合查詢,比如select count(*) from src,這種最終都會在一個reduce中執行,像這種查詢,可以把這個置為true将将其轉換為fetch task,這可能會節約一些時間。

<property>
  <name>hive.fetch.task.aggr</name>
  <value>true</value>
  <description>
    Aggregation queries with no group-by clause (for example, select count(*) from src) execute
    final aggregations in single reduce task. If this is set true, Hive delegates final aggregation
    stage to fetch task, possibly decreasing the query time.
  </description>
</property>
           

hive.fetch.task.conversion:

(1) 直接在指令行中使用set指令進行設定:

(2) 使用hiveconf進行設定:

bin/hive --hiveconf hive.fetch.task.conversion=more
           

(3) 上面的兩種方法都可以開啟了Fetch Task,但是都是臨時起作用的;如果你想一直啟用這個功能,可以在${HIVE_HOME}/conf/hive-site.xml裡面修改配置:

<property>
  <name>hive.fetch.task.conversion</name>
  <value>more</value>
  <description>
    Expects one of [none, minimal, more].
    Some select queries can be converted to single FETCH task minimizing latency.
    Currently the query should be single sourced not having any subquery and should not have
    any aggregations or distincts (which incurs RS), lateral views and joins.
    0. none : disable hive.fetch.task.conversion
    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
    2. more    : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
  </description>
</property>
           

可支援的選項有 none,minimal 和 more,從Hive 0.10.0 版本到 Hive 0.13.1 版本起,預設值為 minimal,Hive 0.14.0版本以及更高版本預設值改為 more:

  • none:禁用fetch task優化(在Hive 0.14.0版本中引入)
  • minimal:隻在select *、使用分區列過濾、帶有limit的語句上進行優化
  • more:在minimal的基礎上更加強大了,select不僅僅可以是*,還可以單獨選擇幾列,并且filter也不再局限于分區字段,同時支援虛拟列(别名)

3.reduce個數控制:

  為什麼要設定rereduce的個數?

  • reduce數量很大就可能生成很多小檔案。
  • reduce數量很少就可能導緻作業耗費時間長,最終有可能任務跑不出來。
    大資料篇--Hive調優
    參考官網:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
hive> set hive.exec.reducers.bytes.per.reducer;
hive.exec.reducers.bytes.per.reducer=256000000
hive> set hive.exec.reducers.max;
hive.exec.reducers.max=1009; -- 預設是它自己根據相應公式算的,具體可翻閱源碼
hive> set mapred.reduce.tasks = 3; -- 設定reduce數量
           

在Spark中相當于是要設定partition的數量:

  Partition數量太少:太少的影響顯而易見,就是資源不能充分利用,例如local模式下,有16core,但是Partition數量僅為8的話,有一半的core沒利用到。

  Partition數量太多:太多,資源利用沒什麼問題,但是導緻task過多,task的序列化和傳輸的時間開銷增大。

  那麼多少的partition數是合适的呢,這裡我們參考spark doc給出的建議,Typically you want 2-4 partitions for each CPU in your cluster。(Spark 官網建議的 Task 的設定原則是:設定 Task 數目為

num-executors * executor-cores

的2~3倍較為合适。)

  也可以參考:如何管理Spark的分區

4.map join:

參考官網:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins

https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties中的

hive.auto.convert.join

參數

Map Join:

優點: 沒有shuffle/Reduce過程,效率提高

缺點 :由于小表都加載到記憶體當中,讀記憶體的要求提高了

原理執行流程可參考:

Hive Map Join 原理

http://lxw1234.com/archives/2015/06/313.htm

  Hive0.11之前,預設的join方式是reduce端join,即Common Join=Shuffle join=Reduce join(hive.auto.convert.join預設為false),其原理是map的輸出資料通過hash進行partition,然後shuffle至對應的reduce端執行join。如果join key分布不均勻,則會造成一定的資料傾斜,比較明顯的現象就是某一個reduce會一直運作在99%,在join運作完畢後,可以通過job的counter看到,reduce處理的資料量相差很大。

  join中還有一個方式是map join,即在map端進行join,其原理是broadcast join,即把比較小的表直接放到記憶體中去,然後再對比較大的表進行Map操作,join就會在map操作的時候,每當掃描一個大的table中的資料,就要去檢視小表的資料,哪條與之相符,繼而進行連接配接。這種方式比較适合表中有一個小表的情況,hive是rbo的方法來執行操作的,是以需要把小表放在前面,不過也可以手動指定hint,比如

  Hive 0.11之後,在表的大小符合設定時(

hive.auto.convert.join.noconditionaltask=true

hive.auto.convert.join.noconditionaltask.size=10000

hive.mapjoin.smalltable.filesize=25000000

,即25M),預設會把join轉換為map join(讓hive.ignore.mapjoin.hint為true,hive.auto.convert.join為true),不過hive0.11的map join bug比較多,可以通過預設關閉map join convert,在需要時再設定hint:

hive.auto.convert.join=false

hive.ignore.mapjoin.hint=false

  如果不指定MapJoin或者不符合MapJoin的條件,那麼Hive解析器會将Join操作轉換成Common Join,即在Reduce階段完成join,整個過程包含Map、Shuffle、Reduce階段。

  • Map階段:讀取源表的資料,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;Map輸出的value為join之後所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag資訊,用于标明此value對應哪個表;按照key進行排序。
  • Shuffle階段:根據key的值進行hash,并将key/value按照hash值推送至不同的reduce中,這樣確定兩個表中相同的key位于同一個reduce中。
  • Reduce階段:根據key的值完成join操作,期間通過Tag來識别不同表中的資料。

擴充:小表不小不大,怎麼用 map join 解決傾斜問題。

  使用 map join 解決小表(記錄數少)關聯大表的資料傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現bug或異常,這時就需要特别的處理。 以下例子:

select 
	* 
from 
	log a
left outer join 
	users b
on 
	a.user_id = b.user_id;
           

  users 表有 600w+ 的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支援這麼大的小表。如果用普通的 join,又會碰到資料傾斜的問題。解決方法:

select /*+mapjoin(x)*/* from log a
  left outer join (
    select  /*+mapjoin(c)*/d.*
      from ( select distinct user_id from log ) c
      join users d
      on c.user_id = d.user_id
    ) x
  on a.user_id = b.user_id;
           

  假如,log裡user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點選的會員不會太多,有傭金的會員不會太多等等。是以這個方法能解決很多場景下的資料傾斜問題。

5.skewjoin方案:

參考:

https://www.cda.cn/discuss/post/details/5ef4b5dae76c715bf35703e4

https://www.cnblogs.com/aukle/p/3233704.html

  在Hive的資料處理過程中,由于join造成的傾斜,常見情況是不能做map join的兩個表(能做map join的話基本上可以避免傾斜),其中一個是行為表,另一個應該是屬性表。比如我們有三個表,一個使用者屬性表users,一個商品屬性表items,還有一個使用者對商品的操作行為表日志表logs。假設現在需要将行為表關聯使用者表:

select * from logs l join users u on l.user_id = u.user_id;

  其中logs表裡面會有一個特殊使用者user_id = 0,代表未登入使用者,假如這種使用者占了相當的比例,那麼個别reduce會收到比其他reduce多得多的資料,因為它要接收所有user_id = 0的記錄進行處理,使得其處理效果會非常差,其他reduce都跑完很久了它還在運作。

  hive給出的解決方案叫skew join,其原理把這種user_id = 0的特殊值先不在reduce端計算掉,而是先寫入hdfs,然後啟動一輪map join專門做這個特殊值的計算,期望能提高計算這部分值的處理速度。當然你要告訴hive這個join是個skew join,即:

set hive.optimize.skewjoin = true;

  還有要告訴hive如何判斷特殊值,根據

set hive.skewjoin.key = skew_key_threshold (default = 100000)

設定的數量hive可以知道,比如預設值是100000,那麼超過100000條記錄的值就是特殊值。是以使用這個參數控制傾斜的門檻值,如果超過這個值,新的值會發送給那些還沒有達到的reduce, 一般可以設定成你(處理的總記錄數/reduce個數)的2-4倍都可以接受。傾斜是經常會存在的,一般select 的層數超過2層,翻譯成執行計劃多于3個以上的mapreduce job 都很容易産生傾斜,建議每次運作比較複雜的sql 之前都可以設一下這個參數. 如果你不知道設定多少,可以就按官方預設的1個reduce 隻處理1G 的算法,那麼 skew_key_threshold = 1G/平均行長. 或者預設直接設成250000000 (差不多算平均行長4個位元組)。

  其他相關參數:

hive.skewjoin.mapjoin.map.tasks = <用于處理skew join的map join 的最大數量> (defaul : 10000)

hive.skewjoin.mapjoin.min.split=33554432

(通過指定最小split的大小,執行細粒度的控制)

  skew join的流程可以用下圖描述:

大資料篇--Hive調優

6.group by導緻的資料傾斜:

原因:group by 次元過小,某值的數量過多。處理某值的reduce灰常耗時。

set hive.map.aggr=true;

:Map 端部分聚合,相當于Combiner

set hive.groupby.skewindata=true;

:有資料傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會随機分布到 Reduce 中,每個 Reduce 做部分聚合操作,并輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,進而達到負載均衡的目的;第二個 MR Job 再根據預處理的資料結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最後完成最終的聚合操作。

7.調整切片數(Map任務數):

  Hive底層自動對小檔案做了優化,用了CombineTextInputFormat,将多個小檔案切片合成一個切片。合成完之後的切片大小,如果大于mapred.max.split.size 的大小,就會生成一個新的切片。

mapred.max.split.size

預設是128MB,

set mapred.max.split.size=134217728

(128MB),對于切片數(MapTask)數量的調整,要根據實際業務來定,比如一個100MB的檔案,假設有1千萬條資料,此時可以調成10個MapTask,則每個MapTask處理1百萬條資料。

8.本地模式:

  Hive也可以不将任務送出到叢集進行運算,而是直接在一台節點上處理。因為消除了送出到叢集的overhead,是以比較适合資料量很小,且邏輯不複雜的任務。設定

hive.exec.mode.local.auto

為true可以開啟本地模式。但任務的輸入資料總量必須小于

hive.exec.mode.local.auto.inputbytes.max

(預設值128MB),且mapper數必須小于

hive.exec.mode.local.auto.tasks.max

(預設值4),reducer數必須為0或1,才會真正用本地模式執行。

三、文法層面調優

1.order by和sort by:

參考hive官網:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

大資料篇--Hive調優

Order by隻會産生一個reducer,全局排序。

Sort by隻能保證每個reduce内部是有序的,并不能保證全局有序。

大資料篇--Hive調優
hive> set mapred.reduce.tasks = 3;
hive> insert overwrite local directory '/home/hadoop/hive_tmp/sort' select * from emp sort by empno desc;
           
大資料篇--Hive調優

2.cluster by和distribute by:

大資料篇--Hive調優

distribute by:按照指定的字段把資料分散到不同的reduce裡面去。

cluster by:如果sort by和distribute by中所有的列相同,可以縮寫為cluster by以便同時指定兩者所用的列。

大資料篇--Hive調優

3.執行計劃Explain:

  我們都知道,hive在執行的時候會把所對應的SQL語句都會轉換成mapreduce代碼執行,但是具體的MR執行資訊我們怎樣才能看出來呢?這裡就用到了explain的關鍵字,他可詳細的表示出在執行所對應的語句所對應的MR代碼。extended關鍵字可以更加詳細的列舉出代碼的執行過程。 explain會把查詢語句轉化成stage組成的序列,主要由三方面組成:

  • 查詢的抽象文法樹
  • plane中各個stage的依賴情況
  • 每個階段的具體描述:描述具體來說就是顯示出對應的操作算子和與之操作的對應的資料,例如查詢算子,filter算子,fetch算子等等。

參考官網:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain

大資料篇--Hive調優

檢視下面這條語句的執行計劃:

hive (default)> explain select * from emp;

大資料篇--Hive調優

hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;

大資料篇--Hive調優

4.where條件優化:

優化前:

select m.cid,u.id from order m join customer u on m.cid =u.id where m.dt='20180808';

優化後(where條件在map端執行而不是在reduce端執行):

select m.cid,u.id from (select * from order where dt='20180818') m join customer u on m.cid=u.id;

注意:hive在做join時小表寫在前面。

5.union優化:

  盡量不要使用union(union會去掉重複的記錄)而是使用 union all 然後再用 group by 去重。

6.count distinct優化:

  資料量小的時候無所謂,資料量大的情況下,由于COUNT DISTINCT操作需要用一個Reduce Task來完成,這一個Reduce需要處理的資料量太大,就會導緻整個Job很難完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替換:

SELECT day,
COUNT(DISTINCT id) AS uv
FROM users;

可以轉換成:

SELECT day,
COUNT(id) AS uv
FROM (SELECT day,id FROM users GROUP BY day,id) a;
           

注意:count操作是全局計數,在底層轉換MRJob時用于計數的分區(reduce Task)隻有一個。

-- 優化前
Select a,sum(b),count(distinct c),count(distinct d) from test group by a
-- 優化後的語句
Select a ,sum(b),count(c),count(d) from (
				Select a,b,null c,null d from test
				Union all
				Select a,0 b,c,null d from test group by a,c
				Union all
				Select a,0,null c ,d from group by a,d
)
           

四、資料傾斜

1.表現:

  任務進度長時間維持在99%(或100%),檢視任務監控頁面,發現隻有少量(1個或幾個)reduce子任務未完成。因為其處理的資料量和其他reduce差異過大。單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大于平均時長。

2.一些常見的:

關鍵詞 情形 後果 解決
大小表Join 其中一個表較小,但是key集中 分發到某一個或幾個Reduce上的資料遠高于平均值 參考上面第二大節的第4小節
大表Join大表 兩個表都較大,不能支援map join,其中一個表中資料量某一類值特别多 配置設定到該值的reducer,耗時較長 參考上面第二大節的第5小節
group by group by 次元過小,某值的數量過多 處理某值的reduce非常耗時 參考上面第二大節的第6小節
count(distinct) 資料量大的情況下 用一個Reduce Task來完成,就會導緻整個Job很難完成 參考上面第三大節的第6小節

3.空值産生的資料傾斜:

場景:如日志中,常會有資訊丢失的問題,比如日志中的 user_id,如果取其中的 user_id 和 使用者表中的user_id 關聯,會碰到資料傾斜的問題。

解決方法1: user_id為空的不參與關聯

select * from log a
  join users b
  on a.user_id is not null
  and a.user_id = b.user_id
union all
select * from log a
  where a.user_id is null;
           

解決方法2 :賦與空值分新的key值

select *
  from log a
  left outer join users b
  on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
           

結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化适合無效 id (比如 -99 , ’’, null 等) 産生的傾斜問題。把空值的 key 變成一個字元串加上随機數,就能把傾斜的資料分到不同的reduce上 ,解決資料傾斜問題。

4.不同資料類型關聯産生資料傾斜:

場景:使用者表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,預設的Hash操作會按int型的id來進行配置設定,這樣會導緻所有string類型id的記錄都配置設定到一個Reducer中。

解決方法:把數字類型轉換成字元串類型

select * from users a
  left outer join logs b
  on a.usr_id = cast(b.user_id as string)
           

參考:https://my.oschina.net/osenlin/blog/1603056

繼續閱讀