天天看點

HIVE查詢優化

所有的調優都離不開對CPU、記憶體、IO這三樣資源的權衡及調整

Hive QL的執行本質上是MR任務的運作,是以優化主要考慮到兩個方面:Mapreduce任務優化、SQL語句優化

一、Mapreduce任務優化 1、設定合理的task數量(map task、reduce task)

這裡有幾個考慮的點,一方面Hadoop MR task的啟動及初始化時間較長,如果task過多,可能會導緻任務啟動和初始化時間遠超邏輯處理時間,這種情況白白浪費了計算資源。另一方面,如果任務複雜,task過少又會導緻任務遲遲不能完成,這種情況又使計算資源沒有充分利用。

因為其讀取輸入使用Hadoop API,是以其map數量由以下參數共同決定:

minSize = Math.max(job.getLong(“mapred.min.split.size”, 1), minSplitSize);

mapred.min.split.size:設定每個map處理的最小資料量

minSplitSize:一般預設為都為1,可由子類複寫函數protected void setMinSplitSize(long minSplitSize) 重新設定。

blockSize:預設的HDFS檔案塊大小

goalSize=totalSize/numSplits 期望的每個Map處理的split大小,僅僅是期望的

numSplits :是在 job 啟動時通過JobConf.setNumMapTasks(int n) 設定的值,是給架構的map數量的提示 totalSize :整個job所有輸入的總大小

splitSize的計算方式如下:

max( minSize, min(blockSize, goalSize) )

最終map數量由以下方式計算得出:

map數量=totalSize/splitSize

可以看出在調整map數量時,可通過調整blockSize和mapred.min.split.size的方式實作,但是調整blockSize可能并不現實,是以程式執行時通過設定mapred.min.split.size參數來設定。

當然,需要特别注意的是,如果檔案特别大,需要支援分割才能進行分片,産生多個map,否則單個檔案不可分割那就一個map。如果檔案都特别小(比blockSize都小),可以使用CombineFileInputFormat将input path中的小檔案合并成再送給mapper處理。

reduce task個數确定:

1)設定set mapred.reduce.tasks=?

2)若1)沒有設定,hive通過下面兩個參數來計算

set hive.exec.reducers.maxs=? (每個job最大的task數量)

set hive.exec.reducers.bytes.per.reducer = ? (每個reduce任務處理的資料量,預設為1G)

2、對小檔案合并

過多的小檔案對于執行引擎起map reduce任務進行處理非常不利,一個是每個小檔案起一個task去處理,非常浪費資源,另一個是過多的小檔案也對NameNode造成很大壓力(每個小檔案都要記錄一個中繼資料 150 byte)。是以,減少小檔案的數量也是一個優化措施。

盡可能避免産生:

1)在執行hive查詢時合理設定reduce的數量

2)可以使用hadoop archive指令将多個小檔案進行歸檔

3)動态分區插入資料可能會産生大量小檔案,是以在使用時盡量避免動态分區

4)對map和reduce的輸出進行merge

set hive.merge.mapfiles = true //設定map端輸出進行合并,預設為true

set hive.merge.mapredfiles = true //設定reduce端輸出進行合并,預設為false

set hive.merge.size.per.task = 25610001000 //設定合并檔案的大小

set hive.merge.smallfiles.avgsize=16000000 //當輸出檔案的平均大小小于該值時,啟動一個獨立的MapReduce任務進行檔案merge

若已産生小檔案:

1)set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 用于設定在執行map前對小檔案進行合并,以下參數設定決定了合并檔案的大小,并最終減小map任務數

set mapred.max.split.size=256000000 //每個Map最大輸入大小(這個值決定了合并後檔案的數量)

set mapred.min.split.size.per.node=100000000 //一個節點上split的最小大小(這個值決定了多個DataNode上的檔案是否需要合并)

set mapred.min.split.size.per.rack=100000000 //一個機架下split的最小大小(這個值決定了多個機架上的檔案是否需要合并)

3、避免資料傾斜

不管task數量是多少,發生資料傾斜就會大大影響查詢效率。

1)設定hive.map.aggr=true

該配置可将頂層的聚合操作放在map端進行,以減輕reduce端的壓力。

4、并行執行

Hive會将查詢轉化成一個或者多個階段,這些階段可能包括:MR階段、抽樣階段、合并階段、limit階段等。預設單次執行一個階段,可進行如下配置,使得兩個并不沖突的階段可以并行執行,提高查詢效率。

set hive.exec.parallel=true // 可以開啟并行執行

set hive.exec.parallel.thread.number=16 // 同一個sql允許最大并行度,預設為8

5、本地模式

這一情況适用于資料集小的情況,并且啟動和運作MR任務消耗的時間可能超過邏輯處理的時候,可配置hive在适當情況下自動進行此項優化

條件如下:

輸入資料量大小必須小于 hive.exec.mode.local.auto.inputbytes.max(預設128MB)

map任務數量必須小于 hive.exec.mode.local.auto.tasks.max(預設4)

需開啟設定:

set hive.exec.mode.local.auto=true // 預設為false

6、JVM重用

hadoop也通過JVM來執行map或者reduce任務,當任務非常多時,啟動并初始化任務的耗時會大大增加,有時甚至比邏輯執行時間都要長,是以可通過JVM重用使得JVM執行個體在同一個job時可以由多task重用,避免增加過多的啟動和初始化時間。可通過如下參數設定:

set mapred.job.reuse.jvm.num.tasks=10; // 10為重用個數

二、SQL語句優化

SQL優化有些原則是通用的,部分可參考前一篇總結:

這裡針對HIVE來進行一些說明:

1、列裁剪分區裁剪這些基本縮小查找範圍的自不必說。 2、join優化

1)在查詢語句中如果小表在join的左邊,那麼hive可能将小表放入分布式緩存,實作map-side join。我們不用考慮哪張表是小表而調整join順序,新版hive提供了 hive.auto.convert.join 可以來優化,預設是啟動的。

hive.mapjoin.smalltable.filesize=25000000(預設值) // 小于該值的表在join時被hive認為是小表

2)在特定情況下,大表也可以使用map-side join,需要滿足以下條件

  • 表資料必須按照ON語句中的鍵進行分桶
  • 其中一張表的分桶數必須是另一張表的分桶數的倍數

    hive這種情況下在map階段可以進行分桶連接配接,這一功能需要通過如下設定進行開啟

    set hive.enforce.sortmergebucketmapjoin=true

如果分桶表資料是按照連接配接鍵或者桶的鍵進行排序的,hive可以進行一個更快的合并-連接配接(sort-merge join),需要如下配置

hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat

hive.auto.convert.sortmerge.join=true

hive.enforce.sortmergebucketmapjoin=true

3、group by操作

可先進行map端部分聚合,然後在reduce端最終聚合,進行如下設定

set hive.map.aggr=true; // 開啟Map端聚合參數設定

set hive.grouby.mapaggr.checkinterval=100000; // 聚合的鍵對應的記錄條數超過這個值則會進行分拆

如下設定可在出現資料傾斜時分兩個MR作業來完成,而不是在一個job中完成

set hive.groupby.skewindata = true (預設為false) // 在第一步mapreduce中map的結果随機分布于到reduce,于是這一步中每個reduce隻做部分聚合,在下一個mapreduce中再進行最終聚合。

4、使用LEFT SEMI JOIN (左半連接配接)替代 IN/EXISTS 子查詢

hive中前者 LEFT SEMI JOIN 是後者 IN/EXISTS 更高效的實作

例如:

select a.id, a.name from a where a.id in (select b.id from b);

應該改為 select a.id, a.name from a left semi job b on a.id=b.id;

5、選擇合适的分區、分桶、排序方式

distribute by : 用來在map端按鍵對資料進行拆分,根據reduce的個數進行資料分發,預設是采用hash算法

cluster by :除了有distribute by的功能外,還能對查詢結果進行排序,等于distribute by + sort by

sort by :資料在進入reducer之前就排好序,根據指定值對進入到同一reducer的所有行進行排序

order by :對所有資料進行排序(全局有序),若資料量很大,可能需要很多時間

注:嚴格模式( hive.mapred.mode=strict 預設為 nonstrict ) 用于 hive 在特定情況下阻止任務的送出,針對以下三種情況

(1)對于分區表,不加分區字段過濾條件,不能執行

(2)對于order by語句,必須使用limit語句

(3)限制笛卡爾積的查詢(join的時候不使用on,而使用where的)

三、其它 1、存儲格式

優先選擇列式存儲格式如orc、parquet,這将使hive查詢時避免全部掃描,列資料存儲在一起,隻需拿相應列即可,能夠大大縮短響應時間

2、壓縮方式

在mapreduce的世界中,性能瓶頸主要來自于磁盤IO和網絡IO,CPU基本不構成壓力,根據需要選擇合适的壓縮方式,如下圖可參考

HIVE查詢優化
3、vectorized query( 向量化查詢 Hive 0.13中引入 )

通過一次性批量執行1024行而不是每次單行執行,提供掃描、聚合、篩選器和連接配接等操作的性能。通過如下方式啟用:

set hive.vectorized.execution.enabled = true;

set hive.vectorized.execution.reduce.enabled = true;

4、CBO(cost based query optimization)基于成本的優化

hive 0.14.0 加入,hive1.1.0 後預設開啟,hive底層自動優化多個join的順序并選擇合适的join算法,優化每個查詢的執行邏輯和實體執行計劃。使用時需進行如下設定

set hive.cbo.enable = true;

set hive.compute.query.using.stats = true;

set hive.stats.fetch.column.stats = true;

set hive.stats.fetch.partition.stats = true;

5、推測執行

偵測執行緩慢的任務(通常由于負載不均衡或者資源分布原因導緻),通過執行其備份任務對相同資料進行重算,加速擷取單個task的結果,以此來提高整體的執行效率,可由如下參數控制:

set mapred.map.tasks.speculative.execution=true

set mapred.reduce.tasks.speculative.execution=true

以上内容,如有錯誤,請看到的小夥伴幫忙指正,多謝。

繼續閱讀