天天看點

3萬字史詩級Hive性能調優

前言

        Hive 作為大資料領域常用的資料倉庫元件,在平時設計和查詢的時候要特别注意效率 。影響 Hive 效率的幾乎從不是資料量過大,而是資料傾斜、資料備援、Job或I/O過多、MapReduce 配置設定不合理等等。 <font color='blue'>對Hive 的調優既包含 Hive 的建表設計方面,對 HiveHQL 語句本身的優化,也包含 Hive 配置參數 和 底層引擎 MapReduce 方面的調整 。</font>

        本次分享,将帶着大家主要從以下四個方面展開。

3萬字史詩級Hive性能調優

        内容較多,建議轉發收藏,分享給更多的朋友。

        為了不盲目地學習,我們需要先知道 Hive 調優的重要性:在保證業務結果不變的前提下,降低資源的使用量,減少任務的執行時間。

調優須知

        在開始之前,需要對下面的“ 注意事項” 有個大緻的印象 。

  1. 于大資料計算引擎來說:資料量大不是問題,資料傾斜是個問題。
  1. ive的複雜HQL底層會轉換成多個MapReduce Job并行或者串行執行,Job數比較多的作業運作效率相對比較低,比如即使隻有幾百行資料的表,如果多次關聯多次彙總,産生十幾個Job,耗時很長。原因是 MapReduce 作業初始化的時間是比較長的 。
  2. 進行Hive大資料分析時,常見的聚合操作比如 sum,count,max,min,UDAF等 ,不怕資料傾斜問題,MapReduce 在 Mappe階段 的預聚合操作,使資料傾斜不成問題 。
  3. 的建表設計,模型設計事半功倍。
  4. 置合理的 MapReduce 的 Task 并行度,能有效提升性能。(比如,10w+資料量 級别的計算,用 100 個 reduceTask,那是相當的浪費,1個足夠,但是如果是 億級别的資料量,那麼1個Task又顯得捉襟見肘)
  5. 解資料分布,自己動手解決資料傾斜問題是個不錯的選擇。這是通用的算法優化,但算法優化有時不能适應特定業務背景,開發人員了解業務,了解資料,可以通過業務邏輯精确有效的解決資料傾斜問題。
  6. 據量較大的情況下,慎用 count(distinct),group by 容易産生傾斜問題。
  7. 小檔案進行合并,是行之有效的提高排程效率的方法,假如所有的作業設定合理的檔案數,對任務的整體排程效率也會産生積極的正向影響 。
  8. 化時把握整體,單個作業最優不如整體最優。

調優具體細節

        好了, 下面正式開始談論調優過程中的細節。

Hive建表設計層面

        Hive的建表設計層面調優,主要講的怎麼樣合理的組織資料,友善後續的高效計算。比如建表的類型,檔案存儲格式,是否壓縮等等。

利用分區表優化

        先來回顧一下 hive 的表類型有哪些?

1、分區表

2、分桶表

        分區表 是在某一個或者幾個次元上對資料進行分類存儲,一個分區對應一個目錄。如果篩選條件裡有分區字段,那麼 Hive 隻需要周遊對應分區目錄下的檔案即可,不需要周遊全局資料,使得處理的資料量大大減少,進而提高查詢效率 。

        你也可以這樣了解:當一個 Hive 表的查詢大多數情況下,會根據某一個字段進行篩選時,那麼非常适合建立為分區表,該字段即為分區字段。

        舉個例子:

select1: select .... where country = "china" 

select2: select .... where country = "china" 

select3: select .... where country = "china" 

select4: select .... where country = "china"      

        這就像是分門别類:這個city字段的每個值,就單獨形成為一個分區。其實每個分區就對應着 HDFS的一個目錄 。在建立表時通過啟用 partitioned by 實作,用來 partition 的次元并不是實際資料的某一列,具體分區的标志是由插入内容時給定的。當要查詢某一分區的内容時可以采用 where 語句,形似 ​

​where tablename.partition_column = a​

​ 來實作 。

        接下來,請嘗試操作一下:

1、建立含分區的表:
CREATE TABLE page_view
             (
                          viewTime INT
                        , userid   BIGINT
                        , page_url STRING
                        , referrer_url STRING
                        , ip STRING COMMENT 'IP Address of the User'
             )
             PARTITIONED BY
             (
                          date STRING
                        , country STRING
             )
             ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' STORED AS TEXTFILE
;      
2、載入内容,并指定分區标志:
load data local inpath '/home/bigdata/pv_2018-07-08_us.txt' into table page_view partition(date='2018-07-08', country='US');      
3、查詢指定标志的分區内容:
SELECT
       page_views.*
FROM
       page_views
WHERE
       page_views.date               >= '2008-03-01'
       AND page_views.date           <= '2008-03-31'
       AND page_views.referrer_url like '%xyz.com'
;      

        讓我們來簡單總結一下:

1、當你意識到一個字段經常用來做where,建分區表,使用這個字段當做分區字段

2、在查詢的時候,使用分區字段來過濾,就可以避免全表掃描。隻需要掃描這張表的一個分區的資料即可

利用分桶表優化

        分桶跟分區的概念很相似,都是把資料分成多個不同的類别,差別就是規則不一樣!

1、分區:按照字段值來進行:一個分區,就隻是包含這個這一個值的所有記錄 不是目前分區的資料一定不在目前分區 目前分區也隻會包含目前這個分區值的資料

2、分桶:預設規則:Hash散列 一個分桶中會有多個不同的值 如果一個分桶中,包含了某個值,這個值的所有記錄,必然都在這個分桶

        Hive Bucket,分桶,是指将資料以指定列的值為 key 進行 hash,hash 到指定數目的桶中,這樣做的目的和分區表類似,使得篩選時不用全局周遊所有的資料,隻需要周遊所在桶就可以了。這樣也可以支援高效采樣 。

        分桶表的主要應用場景有:

1、采樣

2、join

        如下例就是以 userid 這一列為 bucket 的依據,共設定 32 個 buckets 。

CREATE TABLE page_view
             (
                          viewTime INT
                        , userid   BIGINT
                        , page_url STRING
                        , referrer_url STRING
                        , ip STRING COMMENT 'IP Address of the User'
             )
             COMMENT 'This is the page view table' PARTITIONED BY
             (
                          dt STRING
                        , country STRING
             )
             CLUSTERED BY
             (
                          userid
             )
             SORTED BY
             (
                          viewTime
             )
INTO
             32 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' COLLECTION ITEMS TERMINATED BY '2' MAP KEYS TERMINATED BY '3' STORED AS SEQUENCEFILE
;      

        分桶的文法也很簡單:

CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS

CLUSTERED BY(userid) 表示按照 userid 來分桶

SORTED BY(viewTime) 按照 viewtime 來進行桶内排序

INTO 32 BUCKETS 分成多少個桶

        通常情況下,抽樣會在全體資料上進行采樣,這樣效率自然就低,它要去通路所有資料。而如果一個表已經對某一列制作了 bucket,就可以采樣所有桶中指定序号的某個桶,這就減少了通路量 。

        如下例所示就是采樣了 page_view 中 32 個桶中的第三個桶的全部資料:

SELECT *
FROM
       page_view TABLESAMPLE(BUCKET 3 OUT OF 32)
;      

        如下例所示就是采樣了 page_view 中 32 個桶中的第三個桶的一半資料:

SELECT *
FROM
       page_view TABLESAMPLE(BUCKET 3 OUT OF 64)
;      

        總結一下常見的三種采樣方式:

分桶抽樣: 
select * from student tablesample(bucket 3 out of 32); 

随機采樣:rand() 函數 
select * from student order by rand() limit 100; // 效率低 
select * from student distribute by rand() sort by rand() limit 100; // 推薦使用這種 

資料塊抽樣:tablesample()函數 
select * from student tablesample(10 percent); # 百分比 
select * from student tablesample(5 rows); # 行數 
select * from student tablesample(5 M); # 大小      

選擇合适的檔案存儲格式

        在 HiveSQL 的 ​

​create table​

​ 語句中,可以使用 ​

​stored as ...​

​ 指定表的存儲格式。Apache Hive 支援 Apache Hadoop 中使用的幾種熟悉的檔案格式,比如 ​

​TextFile​

​、​

​SequenceFile​

​、​

​RCFile​

​、​

​Avro​

​、​

​ORC​

​、​

​ParquetFile​

​ 等 。

        存儲格式一般需要根據業務進行選擇,在我們的實操中,絕大多數表都采用​

​TextFile​

​與​

​Parquet​

​兩種存儲格式之一。 ​

​TextFile​

​是最簡單的存儲格式,它是純文字記錄,也是Hive的預設格式。雖然它的磁盤開銷比較大,查詢效率也低,但它更多地是作為跳闆來使用。​

​RCFile​

​、​

​ORC​

​、​

​Parquet​

​等格式的表都不能由檔案直接導入資料,必須由​

​TextFile​

​來做中轉。 ​

​Parquet​

​和​

​ORC​

​都是 Apache 旗下的開源列式存儲格式。列式存儲比起傳統的行式存儲更适合批量OLAP查詢,并且也支援更好的壓縮和編碼。

        建立表時,特别是寬表,盡量使用 ​

​ORC​

​、​

​ParquetFile​

​這些列式存儲格式,因為列式存儲的表,每一列的資料在實體上是存儲在一起的,Hive查詢時會隻周遊需要列資料,大大減少處理的資料量。

1、TextFile
  1. 儲方式:行存儲。預設格式,如果建表時不指定預設為此格式。
  2. 一行都是一條記錄,每行都以換行符"\n"結尾。資料不做壓縮時,磁盤會開銷比較大,資料解析開銷也 比較大。
  3. 結合​

    ​Gzip​

    ​、​

    ​Bzip2​

    ​等壓縮方式一起使用(系統會自動檢查,查詢時會自動解壓), 推薦選用可切分的壓縮算法。
2、Sequence File
  1. 種 Hadoop API提供的二進制檔案,使用友善、可分割壓縮的特點。
  2. 持三種壓縮選擇:​

    ​NONE​

    ​、​

    ​RECORD​

    ​、​

    ​BLOCK​

    ​。RECORD壓縮率低,一般建議使用​

    ​BLOCK​

    ​壓縮 。
3、RC File
  1. 儲方式:資料按行分塊,每塊按照列存儲 。

A、首先,将資料按行分塊,保證同一個 record 在一個塊上,避免讀一個記錄需要讀取多個 block。

B、其次,塊資料列式存儲,有利于資料壓縮和快速的列存取。

  1. 對來說,RCFile對于提升任務執行性能提升不大,但是能節省一些存儲空間。可以使用更新版的ORC格式。
4、ORC File
  1. 儲方式:資料按行分塊,每塊按照列存儲
  2. ive提供的新格式,屬于RCFile的更新版,性能有大幅度提升,而且資料可以壓縮存儲,壓縮快,快速 列存取。
  3. RC File會基于列建立索引,當查詢的時候會很快。
5、Parquet File
  1. 儲方式:列式存儲。
  2. arquet 對于大型查詢的類型是高效的。對于掃描特定表格中的特定列查詢,Parquet 特别有用。 Parquet 一般使用Snappy、Gzip壓縮。預設Snappy。
  3. arquet 支援 Impala 查詢引擎。
  4. 的檔案存儲格式盡量采用​

    ​Parquet​

    ​或​

    ​ORC​

    ​,不僅降低存儲量,還優化了查詢,壓縮,表關聯等性能。

選擇合适的壓縮格式

        Hive 語句最終是轉化為 MapReduce 程式來執行的,而 MapReduce 的性能瓶頸在與 網絡IO 和 磁盤 IO,要解決性能瓶頸,最主要的是 減少資料量,對資料進行壓縮是個好方式。壓縮雖然是減少了資料量,但是壓縮過程要消耗 CPU,但是在 Hadoop 中,往往性能瓶頸不在于 CPU,CPU 壓力并不大,是以壓縮充分利用了比較空閑的 CPU。

        常用的壓縮方法對比

3萬字史詩級Hive性能調優

        如何選擇壓縮方式

1、壓縮比率

2、壓縮解壓速度

3、是否支援split

        支援分割的檔案可以并行的有多個 mapper 程式處理大資料檔案,大多數檔案不支援可分割是因為這些檔案隻能從頭開始讀。

        是否壓縮

1、計算密集型,不壓縮,否則進一步增加了CPU的負擔

2、網絡密集型,推薦壓縮,減小網絡資料傳輸

        各個壓縮方式所對應的Class類

3萬字史詩級Hive性能調優

        壓縮使用:

        Job 輸出檔案按照 Block 以 GZip 的方式進行壓縮:

## 預設值是false 
set mapreduce.output.fileoutputformat.compress=true; 

## 預設值是Record 
set mapreduce.output.fileoutputformat.compress.type=BLOCK 

## 預設值是org.apache.hadoop.io.compress.DefaultCodec 
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G zipCodec      

        Map 輸出結果也以 Gzip 進行壓縮:

## 啟用map端輸出壓縮 
set mapred.map.output.compress=true

## 預設值是org.apache.hadoop.io.compress.DefaultCodec 
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCod      

        對 Hive 輸出結果和中間都進行壓縮:

## 預設值是false,不壓縮 
set hive.exec.compress.output=true 

## 預設值是false,為true時MR設定的壓縮才 啟用
set hive.exec.compress.intermediate=true       

HQL文法和運作參數層面

        為了寫出高效的SQL,我們有必要知道HQL的執行文法,以及通過一些控制參數來調整 HQL 的執行。

1、檢視Hive執行計劃

        Hive 的 SQL 語句在執行之前需要将 SQL 語句轉換成 MapReduce 任務,是以需要了解具體的轉換過程,可以在 SQL 語句中輸入如下指令檢視具體的執行計劃 。

## 檢視執行計劃,添加extended關鍵字可以檢視更加詳細的執行計劃 
explain [extended] query      

2、列裁剪

列裁剪就是在查詢時隻讀取需要的列,分區裁剪就是隻讀取需要的分區。當列很多或者資料量很大時,如果 select * 或者不指定分區,全列掃描和全表掃描效率都很低。

        Hive 在讀資料的時候,可以隻讀取查詢中所需要用到的列,而忽略其他的列。這樣做可以節省讀取開銷:中間表存儲開銷和資料整合開銷。

## 列裁剪,取數隻取查詢中需要用到的列,預設是true
set hive.optimize.cp = true;       

3、謂詞下推

        将 SQL 語句中的 where 謂詞邏輯都盡可能提前執行,減少下遊處理的資料量。對應邏輯優化器是 ​

​PredicatePushDown​

​ 。

## 預設是true
set hive.optimize.ppd=true;      

        示例程式:

## 優化之前
SELECT
    a.*,
    b.*
FROM
    a
    JOIN b ON a.id = b.id
WHERE
    b.age > 20;

## 優化之後
SELECT
    a.*,
    c.*
FROM
    a
    JOIN (
        SELECT
            *
        FROM
            b
        WHERE
            age > 20
    ) c ON a.id = c.id;      

4、分區裁剪

        列裁剪就是在查詢時隻讀取需要的列,分區裁剪就是隻讀取需要的分區 。當列很多或者資料量很大時,如果 select * 或者不指定分區,全列掃描和全表掃描效率都很低 。

        在查詢的過程中隻選擇需要的分區,可以減少讀入的分區數目,減少讀入的資料量 。

        Hive 中與分區裁剪優化相關的則是:

## 預設是true
set hive.optimize.pruner=true;       

        在 HiveQL 解析階段對應的則是 ColumnPruner 邏輯優化器 。

SELECT
    *
FROM
    student
WHERE
    department = "AAAA";      

5、合并小檔案

        Map 輸入合并

        在執行 MapReduce 程式的時候,一般情況是一個檔案的一個資料分塊需要一個 mapTask 來處理。但是如果資料源是大量的小檔案,這樣就會啟動大量的 mapTask 任務,這樣會浪費大量資源。可以将輸入的小檔案進行合并,進而減少 mapTask 任務數量 。

## Map端輸入、合并檔案之後按照block的大小分割(預設) 
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 

## Map端輸入,不合并 
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;      

        Map/Reduce輸出合并

        大量的小檔案會給 HDFS 帶來壓力,影響處理效率。可以通過合并 Map 和 Reduce 的結果檔案來消除影響 。

## 是否合并Map輸出檔案, 預設值為true 
set hive.merge.mapfiles=true; 

## 是否合并Reduce端輸出檔案,預設值為false 
set hive.merge.mapredfiles=true; 

## 合并檔案的大小,預設值為256000000 
set hive.merge.size.per.task=256000000; 

## 每個Map 最大分割大小 
set mapred.max.split.size=256000000; 

## 一個節點上split的最少值 
set mapred.min.split.size.per.node=1;  // 伺服器節點 

## 一個機架上split的最少值 
set mapred.min.split.size.per.rack=1;  // 伺服器機架      

        ​

​hive.merge.size.per.task​

​ 和 ​

​mapred.min.split.size.per.node​

​ 聯合起來:

1、預設情況先把這個節點上的所有資料進行合并,如果合并的那個檔案的大小超過了256M就開啟另外一個檔案繼續合并

2、如果目前這個節點上的資料不足256M,那麼就都合并成一個邏輯切片。

6、合理設定MapTask并行度

第一:MapReduce中的MapTask的并行度機制

        Map數過大:當輸入檔案特别大,MapTask 特别多,每個計算節點配置設定執行的 MapTask 都很多,這時候可以考慮減少 MapTask 的數量 ,增大每個 MapTask 處理的資料量。如果 MapTask 過多,最終生成的結果檔案數會太多 。

原因:

1、Map階段輸出檔案太小,産生大量小檔案

2、初始化和建立Map的開銷很大

        Map數太小:當輸入檔案都很大,任務邏輯複雜,MapTask 執行非常慢的時候,可以考慮增加

MapTask 數,來使得每個 MapTask 處理的資料量減少,進而提高任務的執行效率 。

原因:

1、檔案處理或查詢并發度小,Job執行時間過長

2、大量作業時,容易堵塞叢集

        在 MapReduce 的程式設計案例中,我們得知,一個 MapReduce Job 的 MapTask 數量是由輸入分片 InputSplit 決定的。而輸入分片是由 ​

​FileInputFormat.getSplit()​

​ 決定的。一個輸入分片對應一個 MapTask,而輸入分片是由三個參數決定的:

3萬字史詩級Hive性能調優

        輸入分片大小的計算是這麼計算出來的:

long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))      

        預設情況下,輸入分片大小和 HDFS 叢集預設資料塊大小一緻,也就是預設一個資料塊,啟用一個 MapTask 進行處理,這樣做的好處是避免了伺服器節點之間的資料傳輸,提高 job 處理效率 。

        兩種經典的控制 MapTask 的個數方案:減少 MapTask 數 或者 增加 MapTask 數:

1、減少 MapTask 數是通過合并小檔案來實作,這一點主要是針對資料源

2、增加 MapTask 數可以通過控制上一個 job 的 reduceTask 個數

重點注意:不推薦把這個值進行随意設定! 推薦的方式:使用預設的切塊大小即可。如果非要調整,最好是切塊的N倍數

第二:合理控制 MapTask 數量
  1. 少 MapTask 數可以通過合并小檔案來實作
  2. 加 MapTask 數可以通過控制上一個 ReduceTask 預設的 MapTask 個數

計算方式

輸入檔案總大小:total_size

HDFS 設定的資料塊大小:dfs_block_size default_mapper_num = total_size / dfs_block_size

        MapReduce 中提供了如下參數來控制 map 任務個數,從字面上看,貌似是可以直接設定 MapTask 個數的樣子,但是很遺憾不行,這個參數設定隻有在大于 ​

​default_mapper_num​

​ 的時候,才會生效 。

## 預設值是2
set mapred.map.tasks=10;       

        那如果我們需要減少 MapTask 數量,但是檔案大小是固定的,那該怎麼辦呢?

        可以通過 ​

​mapred.min.split.size​

​ 設定每個任務處理的檔案的大小,這個大小隻有在大于 ​

​dfs_block_size​

​ 的時候才會生效

split_size = max(mapred.min.split.size, dfs_block_size)

split_num = total_size / split_size

compute_map_num = Math.min(split_num, Math.max(default_mapper_num, mapred.map.tasks))

        這樣就可以減少 MapTask 數量了 。

        總結一下控制 mapper 個數的方法:

1、如果想增加 MapTask 個數,可以設定 mapred.map.tasks 為一個較大的值

2、如果想減少 MapTask 個數,可以設定 maperd.min.split.size 為一個較大的值

3、如果輸入是大量小檔案,想減少 mapper 個數,可以通過設定 hive.input.format 合并小檔案

        如果想要調整 mapper 個數,在調整之前,需要确定處理的檔案大概大小以及檔案的存在形式(是大量小檔案,還是單個大檔案),然後再設定合适的參數。不能盲目進行暴力設定,不然适得其反。

        MapTask 數量與輸入檔案的 split 數息息相關,在 Hadoop 源碼​

​org.apache.hadoop.mapreduce.lib.input.FileInputFormat​

​ 類中可以看到 split 劃分的具體邏輯。可以直接通過參數 ​

​mapred.map.tasks​

​ (預設值2)來設定 MapTask 數的期望值,但它不一定會生效 。

7、合理設定ReduceTask并行度

        如果 ReduceTask 數量過多,一個 ReduceTask 會産生一個結果檔案,這樣就會生成很多小檔案,那麼如果這些結果檔案會作為下一個 Job 的輸入,則會出現小檔案需要進行合并的問題,而且啟動和初始化ReduceTask 需要耗費資源 。

        如果 ReduceTask 數量過少,這樣一個 ReduceTask 就需要處理大量的資料,容易拖慢運作時間或者造成 OOM,可能會出現資料傾斜的問題,使得整個查詢耗時長。 預設情況下,Hive 配置設定的 reducer 個數由下列參數決定:

參數1:hive.exec.reducers.bytes.per.reducer (預設256M)

參數2:hive.exec.reducers.max (預設為1009)

參數3:mapreduce.job.reduces (預設值為-1,表示沒有設定,那麼就按照以上兩個參數 進行設定)

        ReduceTask 的計算公式為:

N = Math.min(參數2,總輸入資料大小 / 參數1)

        可以通過改變上述兩個參數的值來控制 ReduceTask 的數量。 也可以通過

set mapred.map.tasks=10; 
set mapreduce.job.reduces=10;      

        通常情況下,有必要手動指定 ReduceTask 個數。考慮到 Mapper 階段的輸出資料量通常會比輸入有大幅減少,是以即使不設定 ReduceTask 個數,重設 參數2 還是必要的 。

        依據經驗,可以将 參數2 設定為 M * (0.95 * N) (N為叢集中 NodeManager 個數)。一般來說,NodeManage 和 DataNode 的個數是一樣的。

8、 Join優化

Join優化整體原則:

1、優先過濾後再進行Join操作,最大限度的減少參與join的資料量

2、小表join大表,最好啟動mapjoin,hive自動啟用mapjoin, 小表不能超過25M,可以更改

3、Join on的條件相同的話,最好放入同一個job,并且join表的排列順序從小到大:select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.i

4、如果多張表做join, 如果多個連結條件都相同,會轉換成一個JOb

優先過濾資料

盡量減少每個階段的資料量,對于分區表能用上分區字段的盡量使用,同時隻選擇後面需要使用到的列,最大 限度的減少參與 Join 的資料量。

小表 join 大表原則

小表 join 大表的時應遵守小表 join 大表原則,原因是 join 操作的 reduce 階段,位于 join 左邊 的表内容會被加載進記憶體,将條目少的表放在左邊,可以有效減少發生記憶體溢出的幾率。join 中執行順序是 從左到右生成 Job,應該保證連續查詢中的表的大小從左到右是依次增加的。

使用相同的連接配接鍵

在 hive 中,當對 3 個或更多張表進行 join 時,如果 on 條件使用相同字段,那麼它們會合并為一個 MapReduce Job,利用這種特性,可以将相同的 join on 放入一個 job 來節省執行時間 。

盡量原子操作

盡量避免一個SQL包含複雜的邏輯,可以使用中間表來完成複雜的邏輯。

大表Join大表

1、空key過濾:有時join逾時是因為某些key對應的資料太多,而相同key對應的資料都會發送到相同的 reducer上,進而導緻記憶體不夠。此時我們應該仔細分析這些異常的key,很多情況下,這些key對應的資料 是異常資料,我們需要在SQL語句中進行過濾。

2、空key轉換:有時雖然某個key為空對應的資料很多,但是相應的資料不是異常資料,必須要包含在join 的結果中,此時我們可以表a中key為空的字段賦一個随機的值,使得資料随機均勻地分不到不同的reducer 上 。

9、 啟用 MapJoin

這個優化措施,但凡能用就用! 大表 join 小表 小表滿足需求: 小表資料小于控制條件時 。

        MapJoin 是将 join 雙方比較小的表直接分發到各個 map 程序的記憶體中,在 map 程序中進行 join 操作,這樣就不用進行 reduce 步驟,進而提高了速度。隻有 join 操作才能啟用 MapJoin 。

## 是否根據輸入小表的大小,自動将reduce端的common join 轉化為map join,将小表刷入記憶體中。 
## 對應邏輯優化器是MapJoinProcessor 
set hive.auto.convert.join = true; 

## 刷入記憶體表的大小(位元組) 
set hive.mapjoin.smalltable.filesize = 25000000; 

## hive會基于表的size自動的将普通join轉換成mapjoin 
set hive.auto.convert.join.noconditionaltask=true; 

## 多大的表可以自動觸發放到内層 LocalTask 中,預設大小10M 
set hive.auto.convert.join.noconditionaltask.size=10000000;      

        Hive 可以進行多表 Join。Join 操作尤其是 Join 大表的時候代價是非常大的。MapJoin 特别适合大小表 join 的情況。在Hive join場景中,一般總有一張相對小的表和一張相對大的表,小表叫 build table,大表叫 probe table。Hive 在解析帶 join 的 SQL 語句時,會預設将最後一個表作為 probe table,将前面的表作為 build table 并試圖将它們讀進記憶體。如果表順序寫反,probe table 在前面,引發 OOM 的風險就高了。在次元模組化資料倉庫中,事實表就是 probe table,次元表就是 build table。這種 Join 方式在 map 端直接完成 join 過程,消滅了 reduce,效率很高。而且 MapJoin 還支援非等值連接配接 。

        當 Hive 執行 Join 時,需要選擇哪個表被流式傳輸(stream),哪個表被緩存(cache)。Hive 将JOIN 語句中的最後一個表用于流式傳輸,是以我們需要確定這個流表在兩者之間是最大的。如果要在不同的 key 上 join 更多的表,那麼對于每個 join 集,隻需在 ON 條件右側指定較大的表 。

        也可以手動開啟mapjoin:

-- SQL方式,在SQL語句中添加MapJoin标記(mapjoin hint) 
-- 将小表放到記憶體中,省去shffle操作 

// 在沒有開啟mapjoin的情況下,執行的是reduceJoin 
SELECT /*+ MAPJOIN(smallTable) */  smallTable.key, bigTable.value FROM smallTable JOIN bigTable ON smallTable.key = bigTable.key;

 /*+mapjoin(smalltable)*/      

Sort-Merge-Bucket(SMB) Map Join

        它是另一種Hive Join優化技術,使用這個技術的前提是所有的表都必須是分桶表(bucket)和分桶排序的(sort)。分桶表的優化!

具體實作:

1、針對參與join的這兩張做相同的hash散列,每個桶裡面的資料還要排序

2、這兩張表的分桶個數要成倍數。

3、開啟 SMB join 的開關!

一些常見參數設定:

## 當使用者執行bucket map join的時候,發現不能執行時,禁止查詢;
set hive.enforce.sortmergebucketmapjoin=false; 

## 如果join的表通過sort merge join的條件,join是否會自動轉換為sort merge join; 
set hive.auto.convert.sortmerge.join=true; 

## 當兩個分桶表 join 時,如果 join on的是分桶字段,小表的分桶數是大表的倍數時,可以啟用 mapjoin 來提高效率。 # bucket map join優化,預設值是 false 
set hive.optimize.bucketmapjoin=false; 

## bucket map join 優化,預設值是 false; 
set hive.optimize.bucketmapjoin.sortedmerge=false;      

10、Join資料傾斜優化

在編寫 Join 查詢語句時,如果确定是由于 join 出現的資料傾斜,那麼請做如下設定:

# join的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定 
set hive.skewjoin.key=100000; 

# 如果是join過程出現傾斜應該設定為true 
set hive.optimize.skewjoin=false;      

        如果開啟了,在 Join 過程中 Hive 會将計數超過門檻值 hive.skewjoin.key(預設100000)的傾斜 key 對應的行臨時寫進檔案中,然後再啟動另一個 job 做 map join 生成結果。

        通過 ​

​hive.skewjoin.mapjoin.map.tasks​

​ 參數還可以控制第二個 job 的 mapper 數量,預設10000 。

set hive.skewjoin.mapjoin.map.tasks=10000;

11、CBO優化

        join的時候表的順序的關系:前面的表都會被加載到記憶體中。後面的表進行磁盤掃描 。

select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id ;

        Hive 自 0.14.0 開始,加入了一項 ​

​Cost based Optimizer​

​ 來對 HQL 執行計劃進行優化,這個功能通過 ​

​hive.cbo.enable​

​ 來開啟。在 Hive 1.1.0 之後,這個 feature 是預設開啟的,它可以 自動優化 HQL 中多個 Join 的順序,并選擇合适的 Join 算法 。

        CBO,成本優化器,代價最小的執行計劃就是最好的執行計劃。傳統的資料庫,成本優化器做出最優化的執行計劃是依據統計資訊來計算的。Hive 的成本優化器也一樣。

        Hive 在提供最終執行前,優化每個查詢的執行邏輯和實體執行計劃。這些優化工作是交給底層來完成的。根據查詢成本執行進一步的優化,進而産生潛在的不同決策:如何排序連接配接,執行哪種類型的連接配接,并行度等等。

        要使用基于成本的優化(也稱為CBO),請在查詢開始設定以下參數:

set hive.cbo.enable=true;

set hive.compute.query.using.stats=true;

set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

12、怎樣做笛卡爾積

        當 Hive 設定為嚴格模式(hive.mapred.mode=strict)時,不允許在 HQL 語句中出現笛卡爾積,這實

際說明了 Hive 對笛卡爾積支援較弱。因為找不到 Join key,Hive 隻能使用 1 個 reducer 來完成笛卡爾積 。

        當然也可以使用 limit 的辦法來減少某個表參與 join 的資料量,但對于需要笛卡爾積語義的需求來說,經常是一個大表和一個小表的 Join 操作,結果仍然很大(以至于無法用單機處理),這時 MapJoin 才是最好的解決辦法。MapJoin,顧名思義,會在 Map 端完成 Join 操作。這需要将 Join 操作的一個或多個表完全讀入記憶體。

        PS:MapJoin 在子查詢中可能出現未知 BUG。在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,

給 Join 添加一個 Join key,原理很簡單:将小表擴充一列 join key,并将小表的條目複制數倍,join

key 各不相同;将大表擴充一列 join key 為随機數。

        精髓就在于複制幾倍,最後就有幾個 reduce 來做,而且大表的資料是前面小表擴張 key 值範圍裡面随機出來的,是以複制了幾倍 n,就相當于這個随機範圍就有多大 n,那麼相應的,大表的資料就被随機的分為了 n 份。并且最後處理所用的 reduce 數量也是 n,而且也不會出現資料傾斜 。

####13、Group By優化

        預設情況下,Map 階段同一個 Key 的資料會分發到一個 Reduce 上,當一個 Key 的資料過大時會産生資料傾斜。進行 group by 操作時可以從以下兩個方面進行優化:

1. Map端部分聚合

        事實上并不是所有的聚合操作都需要在 Reduce 部分進行,很多聚合操作都可以先在 Map 端進行部分聚合,然後在 Reduce 端的得出最終結果 。

## 開啟Map端聚合參數設定 
set hive.map.aggr=true; 

## 設定map端預聚合的行數門檻值,超過該值就會分拆job,預設值100000 
set hive.groupby.mapaggr.checkinterval=100000      

2. 有資料傾斜時進行負載均衡

        當 HQL 語句使用 group by 時資料出現傾斜時,如果該變量設定為 true,那麼 Hive 會自動進行負載均衡。政策就是把 MapReduce 任務拆分成兩個:第一個先做預彙總,第二個再做最終彙總 。

# 自動優化,有資料傾斜的時候進行負載均衡(預設是false)
 set hive.groupby.skewindata=false;      

        當選項設定為 true 時,生成的查詢計劃有兩個 MapReduce 任務 。

1、在第一個 MapReduce 任務中,map 的輸出結果會随機分布到 reduce 中,每個 reduce 做部分聚合操作,并輸出結果,這樣處理的結果是相同的​

​group by key​

​有可能分發到不同的 reduce 中,進而達到負載均衡的目的;

2、第二個 MapReduce 任務再根據預處理的資料結果按照 group by key 分布到各個 reduce 中,最 後完成最終的聚合操作。

        Map 端部分聚合:并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最後在 Reduce 端得出最終結果,對應的優化器為 GroupByOptimizer。

        那麼如何用 group by 方式同時統計多個列?

SELECT
    t.a,
    SUM(t.b),
    COUNT(t.c),
    COUNT(t.d)
FROM
    some_table t
GROUP BY
    t.a;      

        下面是解決方法:

select t.a, sum(t.b), count(t.c), count(t.d) from ( 
  select a,b,null c,null d from some_table 
  union all 
  select a,0 b,c,null d from some_table group by a,c 
  union all 
  select a,0 b,null c,d from some_table group by a,d 
) t;      

14、Order By優化

        ​

​order by​

​ 隻能是在一個 reduce 程序中進行,是以如果對一個大資料集進行 order by ,會導緻一個 reduce 程序中處理的資料相當大,造成查詢執行緩慢 。

1、在最終結果上進行order by,不要在中間的大資料集上進行排序。如果最終結果較少,可以在一個 reduce上進行排序時,那麼就在最後的結果集上進行order by。

2、如果是取排序後的前N條資料,可以使用distribute by和sort by在各個reduce上進行排序後前N 條,然後再對各個reduce的結果集合合并後在一個reduce中全局排序,再取前N條,因為參與全局排序的 order by的資料量最多是reduce個數 * N,是以執行效率會有很大提升。

        在Hive中,關于資料排序,提供了四種文法,一定要區分這四種排序的使用方式和适用場景。

1、​

​order by​

​:全局排序,缺陷是隻能使用一個reduce

2、​

​sort by​

​:單機排序,單個reduce結果有序

3、​

​cluster by​

​:對同一字段分桶并排序,不能和sort by連用

4、​

​distribute by​

​+​

​sort by​

​:分桶,保證同一字段值隻存在一個結果檔案當中,結合sort by保證每 個reduceTask結果有序

        Hive HQL 中的 order by 與其他 SQL 方言中的功能一樣,就是将結果按某字段全局排序,這會導緻所有 map 端資料都進入一個 reducer 中,在資料量大時可能會長時間計算不完 。

        如果使用 sort by,那麼還是會視情況啟動多個 reducer 進行排序,并且保證每個 reducer 内局部有序。為了控制 map 端資料配置設定到 reducer 的 key,往往還要配合 distribute by 一同使用。如果不加 ​

​distribute by​

​ 的話,map 端資料就會随機配置設定到 reducer。

        提供一種方式實作全局排序:兩種方式:

1、建表導入資料準備

CREATE TABLE if NOT EXISTS student(
    id INT,
    name string,
    sex string,
    age INT,
    department string
) ROW format delimited fields terminated BY ",";

load data LOCAL inpath "/home/bigdata/students.txt" INTO TABLE student;      

2、第一種方式

-- 直接使用order by來做。如果結果資料量很大,這個任務的執行效率會非常低;
SELECT
    id,
    name,
    age
FROM
    student
ORDER BY
    age desc
LIMIT
    3;      

3、第二種方式

-- 使用distribute by + sort by 多個reduceTask,每個reduceTask分别有序
SET mapreduce.job.reduces = 3;
DROP TABLE student_orderby_result;

-- 範圍分桶 0 < 18 < 1 < 20 < 2
CREATE TABLE student_orderby_result AS
SELECT
    *
FROM
    student distribute BY (
        CASE
            WHEN age > 20 THEN 0
            WHEN age < 18 THEN 2
            ELSE 1
        END
    ) sort BY (age desc);      

        關于分界值的确定,使用采樣的方式,來估計資料分布規律 。

15、Count Distinct優化

        當要統計某一列去重數時,如果資料量很大,count(distinct) 就會非常慢,原因與 order by 類似,count(distinct) 邏輯隻會有很少的 reducer 來處理。這時可以用 group by 來改寫:

-- 先 group by 再 count
SELECT
    COUNT(1)
FROM
    (
        SELECT
            age
        FROM
            student
        WHERE
            department >= "MA"
        GROUP BY
            age
    ) t;      

再來一個例子:

        優化前 ,一個普通的隻使用一個reduceTask來進行count(distinct) 操作

-- 優化前(隻有一個reduce,先去重再count負擔比較大):
SELECT
    COUNT(DISTINCT id)
FROM
    tablename;      

        優化後 ,但是這樣寫會啟動兩個MR job(單純 distinct 隻會啟動一個),是以要確定資料量大到啟動 job 的 overhead 遠小于計算耗時,才考慮這種方法。當資料集很小或者 key 的傾斜比較明顯時,group by 還可能會比 distinct 慢。

-- 優化後(啟動兩個job,一個job負責子查詢(可以有多個reduce),另一個job負責count(1)):
SELECT
    COUNT(1)
FROM
    (
        SELECT
            DISTINCT id
        FROM
            tablename
    ) tmp;

SELECT
    COUNT(1)
FROM
    (
        SELECT
            id
        FROM
            tablename
        GROUP BY
            id
    ) tmp;
/ / 推薦使用這種      

16、怎樣寫in/exists語句

        在Hive的早期版本中,​

​in/exists​

​文法是不被支援的,但是從 ​

​hive-0.8x​

​ 以後就開始支援這個文法。但是不推薦使用這個文法。雖然經過測驗,Hive-2.3.6 也支援 in/exists 操作,但還是推薦使用 Hive 的一個高效替代方案:​

​left semi join​

        比如說:

-- in / exists 實作
SELECT
    a.id,
    a.name
FROM
    a
WHERE
    a.id IN (
        SELECT
            b.id
        FROM
            b
    );

SELECT
    a.id,
    a.name
FROM
    a
WHERE
    EXISTS (
        SELECT
            id
        FROM
            b
        WHERE
            a.id = b.id
    );      

        可以使用join來改寫:

SELECT
    a.id,
    a.namr
FROM
    a
    JOIN b ON a.id = b.id;      

        應該轉換成:

-- left semi join 實作
SELECT
    a.id,
    a.name
FROM
    a LEFT semi
    JOIN b ON a.id = b.id;      

17、使用 vectorization 技術

        在計算類似 scan, filter, aggregation 的時候, vectorization 技術以設定批處理的增量大小為 1024 行單次來達到比單條記錄單次獲得更高的效率。

set hive.vectorized.execution.enabled=true ;

set hive.vectorized.execution.reduce.enabled=true;

18、多重模式

        如果你碰到一堆SQL,并且這一堆SQL的模式還一樣。都是從同一個表進行掃描,做不同的邏輯。 有可優化的地方:如果有n條SQL,每個SQL執行都會掃描一次這張表 。

        如果一個 HQL 底層要執行 10 個 Job,那麼能優化成 8 個一般來說,肯定能有所提高,多重插入就是一

個非常實用的技能。一次讀取,多次插入,有些場景是從一張表讀取資料後,要多次利用,這時可以使用 ​

​multi insert ​

​文法:

FROM
    sale_detail INSERT overwrite TABLE sale_detail_multi PARTITION (sale_date = '2019', region = 'china')
SELECT
    shop_name,
    customer_id,
    total_price
WHERE
.....insert overwrite TABLE sale_detail_multi PARTITION (sale_date = '2020', region = 'china')
SELECT
    shop_name,
    customer_id,
    total_price
WHERE
.....;      

        需要的是,​

​multi insert​

​ 文法有一些限制

1、一般情況下,單個SQL中最多可以寫128路輸出,超過128路,則報文法錯誤。

2、在一個multi insert中: 對于分區表,同一個目标分區不允許出現多次。 對于未分區表,該表不能出現多次。

3、對于同一張分區表的不同分區,不能同時有insert overwrite和insert into操作,否則報錯傳回

        ​

​Multi-Group by​

​ 是 Hive 的一個非常好的特性,它使得 Hive 中利用中間結果變得非常友善。例如:

FROM
    (
        SELECT
            a.status,
            b.school,
            b.gender
        FROM
            status_updates a
            JOIN profiles b ON (
                a.userid = b.userid
                AND a.ds = '2019-03-20'
            )
    ) subq1 INSERT OVERWRITE TABLE gender_summary PARTITION(ds = '2019-03-20')
SELECT
    subq1.gender,
    COUNT(1)
GROUP BY
    subq1.gender INSERT OVERWRITE TABLE school_summary PARTITION(ds = '2019-03-20')
SELECT
    subq1.school,
    COUNT(1)
GROUP BY
    subq1.school;      

        上述查詢語句使用了 ​

​Multi-Group by​

​ 特性連續 group by 了 2 次資料,使用不同的 ​

​Multi-Group by​

​。這一特性可以減少一次 MapReduce 操作。

19、啟動中間結果壓縮

map 輸出壓縮

set mapreduce.map.output.compress=true;

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

中間資料壓縮

        中間資料壓縮就是對 hive 查詢的多個 Job 之間的資料進行壓縮。最好是選擇一個節省CPU耗時的壓縮方式。可以采用 snappy 壓縮算法,該算法的壓縮和解壓效率都非常高。

set hive.exec.compress.intermediate=true;

set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

set hive.intermediate.compression.type=BLOCK;

結果資料壓縮

        最終的結果資料(Reducer輸出資料)也是可以進行壓縮的,可以選擇一個壓縮效果比較好的,可以減少資料的大小和資料的磁盤讀寫時間 。

        需要注意:常用的 gzip,snappy 壓縮算法是不支援并行處理的,如果資料源是 gzip/snappy壓縮檔案大檔案,這樣隻會有有個 mapper 來處理這個檔案,會嚴重影響查詢效率。 是以如果結果資料需要作為其他查詢任務的資料源,可以選擇支援 splitable 的 LZO 算法,這樣既能對結果檔案進行壓縮,還可以并行的處理,這樣就可以大大的提高 job 執行的速度了。

set hive.exec.compress.output=true;

set mapreduce.output.fileoutputformat.compress=true;

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G zipCodec;

set mapreduce.output.fileoutputformat.compress.type=BLOCK;

        Hadoop叢集支援的壓縮算法:

org.apache.hadoop.io.compress.DefaultCodec org.apache.hadoop.io.compress.GzipCodec 
org.apache.hadoop.io.compress.BZip2Codec org.apache.hadoop.io.compress.DeflateCodec 
org.apache.hadoop.io.compress.SnappyCodec org.apache.hadoop.io.compress.Lz4Codec 
com.hadoop.compression.lzo.LzoCodec com.hadoop.compression.lzo.LzopCodec      

Hive 架構層面

1、啟用本地抓取

        Hive 的某些 SQL 語句需要轉換成 MapReduce 的操作,某些 SQL 語句就不需要轉換成 MapReduce 操作,但是同學們需要注意,理論上來說,所有的 SQL 語句都需要轉換成 MapReduce 操作,隻不過 Hive 在轉換 SQL 語句的過程中會做部分優化,使某些簡單的操作不再需要轉換成 MapReduce,例如:

1、隻是 select * 的時候

2、where 條件針對分區字段進行篩選過濾時

3、帶有 limit 分支語句時

        Hive 從 HDFS 中讀取資料,有兩種方式:啟用MapReduce讀取 和 直接抓取 。

        直接抓取資料比 MapReduce 方式讀取資料要快的多,但是隻有少數操作可以使用直接抓取方式 。

        可以通過 ​

​hive.fetch.task.conversion​

​ 參數來配置在什麼情況下采用直接抓取方式:

minimal:隻有 select * 、在分區字段上 where 過濾、有 limit 這三種場景下才啟用直接抓取方式。

more:在 select、where 篩選、limit 時,都啟用直接抓取方式 。

        檢視 Hive 的抓取政策:

> ## 檢視 
> set hive.fetch.task.conversion;      

        設定Hive的抓取政策:

## 預設more 
set hive.fetch.task.conversion=more;      

        如果有疑惑,請看hive-default.xml中關于這個參數的解釋:

<property>
    <name>hive.fetch.task.conversion</name>
    <value>more</value>
    <description>
Expects one of [none, mi nimal, more].
Some select queri es can be converted to single FETCH task minimizing latency.
Currently the query should be si ngle sourced not havi ng any subquery and should not have
any aggregations or di sti ncts (whi ch i ncurs RS), lateral vi ews and
joi ns.
0. none : di sable hive.fetch.task.conversion
1.minimal : select star, filter on partition columns, limit only
2.more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and vi rtual
columns)

    </descri ption>
</property>
<property>
    <name>hive.fetch.task.conversion.threshold</name>
    <value>1073741824</value>
    <descri pti on>
input threshold for applying hive.fetch.task.conversion, if target table is native, input 1ength
is calculated by summation of file 1engths. if it's not native, storage handler for the table
can optionally implement
org.apache, hadoop. hive, ql. metadata. inputEstimator iinterface.
</descri ption>
</property>      

2、本地執行優化

        Hive在叢集上查詢時,預設是在叢集上多台機器上運作,需要多個機器進行協調運作,這種方式很好 的解決了大資料量的查詢問題。但是在Hive查詢處理的瓣量比較小的時候,其實沒有必要啟動分布 式模式去執行,因為以分布式方式執行設計到跨網絡傳輸、多節點協調等,并且消耗資源。對于小資料 集,可以通過本地模式,在單台機器上處理所有任務,執行時間明顯被縮短 。

        啟動本地模式涉及到三個參數:

##打開hive自動判斷是否啟動本地模式的開關
set hive.exec.mode.local.auto=true;

## map任務晝專大值,*啟用本地模式的task最大臯數
set hive.exec.mode.1ocal.auto.input.files.max=4;

## map輸入檔案最大大小,不啟動本地模式的最大輸入檔案大小
set hive.exec.mode.1ocal.auto.inputbytes.max=134217728;      

3、JVM 重用

        Hive語句最終會轉換為一系的MapReduce任務,每一個MapReduce任務是由一系的MapTask 和ReduceTask組成的,預設情況下,MapReduce中一個MapTask或者ReduceTask就會啟動一個 JVM程序,一個Task執行完畢後,JVM程序就會退出。這樣如果任務花費時間很短,又要多次啟動 JVM的情況下,JVM的啟動時間會變成一個比較大的消耗,這時,可以通過重用JVM來解決 。

set mapred.job.reuse.jvm.num.tasks=5;      

        JVM也是有缺點的,開啟JVM重用會一直占用使用到的task的插槽,以便進行重用,直到任務完成後才 會釋放。如果某個不平衡的job中有幾個reduce task執行的時間要比其他的reduce task消耗的時間 要多得多的話,那麼保留的插槽就會一直空閑卻無法被其他的job使用,直到所有的task都結束了才 會釋放。

        根據經驗,一般來說可以使用一個cpu core啟動一個JVM,假如伺服器有16個cpu core,但是這個 節點,可能會啟動32個 mapTask ,完全可以考慮:啟動一個JVM,執行兩個Task 。

4、并行執行

        有的查詢語句,Hive會将其轉化為一個或多個階段,包括:MapReduce階段、抽樣階段、合并階段、 limit階段等。預設情況下,一次隻執行一個階段。但是,如果某些階段不是互相依賴,是可以并行執行的。多階段并行是比較耗系統資源的 。

        一個 Hive SQL語句可能會轉為多個MapReduce Job,每一個 job 就是一個 stage , 這些Job順序執行,這個在 client 的運作日志中也可以看到。但是有時候這些任務之間并不是互相依賴的,如果叢集資源允許的話,可以讓多個并不互相依賴 stage 并發執行,這樣就節約了時間,提高了執行速度,但是如 果叢集資源匮乏時,啟用并行化反倒是會導緻各個 Job 互相搶占資源而導緻整體執行性能的下降。啟用 并行化:

##可以開啟并發執行。
set hive.exec.parallei=true;

##同一個sql允許最大并行度,預設為8。
set hive.exec.paral1 el.thread.number=16;      

5、推測執行

        在分布式叢集環境下,因為程式Bug(包括Hadoop本身的bug),負載不均衡或者資源分布不均等原因,會造成同一個作業的多個任務之間運作速度不一緻,有些任務的運作速度可能明顯慢于其他任務(比如一個作業的某個任務進度隻有50%,而其他所有任務已經運作完畢),則這些任務會拖慢作業的整體執行進度。為了避免這種情況發生,Hadoop采用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖後腿”的任務,并為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份資料,并最終選用最先成功運作完成任務的計算結果作為最終結果 。

# 啟動mapper階段的推測執行機制 
set mapreduce.map.speculative=true; 

# 啟動reducer階段的推測執行機制 
set mapreduce.reduce.speculative=true;      

設定開啟推測執行參數:Hadoop 的 mapred-site.xml 檔案中進行配置:

<property>
    <name>mapreduce.map.speculative</name>
    <value>true</value>
    <description>lf true, then multiple i nstances of some map tasks may be executed i n parallel.</description>
</property>
<property>
    <name>mapreduce.reduce.speculati ve</name>
    <value>true</value>
    <descri pti on>lf true, then multi ple i nstances of some reduce tasks may be executed in parallel.
    </description>
</property>      

Hive本身也提供了配置項來控制reduce-side的推測執行

<property>
    <name>hive.mapped.reduce.tasks.speculative.executi on</name>
    <value>true</value>
    <description>whether speculative execution for reducers should be turned on. </description>
</property>      

建議:

如果使用者對于運作時的偏差非常敏感的話,那麼可以将這些功能關閉掉。如果使用者因為輸入資料量很大而需要 執行長時間的MapTask或者ReduceTask的話,那麼啟動推測執行造成的浪費是非常巨大大。

6、Hive嚴格模式

        所謂嚴格模式,就是強制不允許使用者執行有風險的 HiveQL 語句,一旦執行會直接失敗。但是Hive中為了提高SQL語句的執行效率,可以設定嚴格模式,充分利用 Hive 的某些特點 。

## 設定Hive的嚴格模式 
set hive.mapred.mode=strict; 
set hive.exec.dynamic.partition.mode=nostrict;      

        注意:當設定嚴格模式之後,會有如下限制:

1、對于分區表,必須添加where對于分區字段的條件過濾 
select * from student_ptn where age > 25 

2、order by語句必須包含limit輸出限制 
select * from student order by age limit 100; 

3、限制執行笛卡爾積的查詢 
select a.*, b.* from a, b; 

4、在hive的動态分區模式下,如果為嚴格模式,則必須需要一個分區列是靜态分區      

資料傾斜

網上關于如何定位并解決資料傾斜的教程很多,但是大多隻是點到為止,浮于表面 。這裡我們直接引用了《Hive性能調優實戰》中資料傾斜部分的内容,讓大家能夠體系化學習,徹底掌握 。

        資料傾斜,即單個節點任務所處理的資料量遠大于同類型任務所處理的 資料量,導緻該節點成為整個作業的瓶頸,這是分布式系統不可能避免的問 題。從本質來說,導緻資料傾斜有兩種原因,一是任務讀取大檔案,二是任務需要處理大量相同鍵的資料 。

        任務讀取大檔案,最常見的就是讀取壓縮的不可分割的大檔案,具體在 10.4.1節會介紹。任務需要處理大量相同鍵的資料,這種情況有以下4種表現形式:

  • 資料含有大量無意義的資料,例如空值(NULL)、空字元串等
  • 含有傾斜資料在進行聚合計算時無法聚合中間結果,大量資料都需要 經過Shuffle階段的處理,引起資料傾斜
  • 資料在計算時做多元資料集合,導緻次元膨脹引起的資料傾斜
  • 兩表進行Join,都含有大量相同的傾斜資料鍵

1、不可拆分大檔案引發的資料傾斜

        當叢集的資料量增長到一定規模,有些資料需要歸檔或者轉儲,這時候往往會對資料進行壓縮;當對檔案使用​

​GZIP​

​壓縮等不支援檔案分割操作的壓縮方式,在日後有作業涉及讀取壓縮後的檔案時,該壓縮檔案隻會被一個任務所讀取。如果該壓縮檔案很大,則處理該檔案的Map需要花費的時間會 遠多于讀取普通檔案的Map時間,該Map任務會成為作業運作的瓶頸。這種情況也就是Map讀取檔案的資料傾斜。例如存在這樣一張表​

​t_des_info​

​ 。

3萬字史詩級Hive性能調優

        t_des_info表由3個GZIP壓縮後的檔案組成 。

3萬字史詩級Hive性能調優

        其中,​

​large_file.gz​

​檔案約200MB,在計算引擎在運作時,預先設定每 個Map處理的資料量為128MB,但是計算引擎無法切分​

​large_file.gz​

​檔案,所 以該檔案不會交給兩個Map任務去讀取,而是有且僅有一個任務在操作 。

        t_des_info表有3個gz檔案,任何涉及處理該表的資料都隻會使用3個 Map。

3萬字史詩級Hive性能調優

        為避免因不可拆分大檔案而引發資料讀取的傾斜,在資料壓縮的時 候可以采用​

​bzip2​

​和​

​Zip​

​等支援檔案分割的壓縮算法。

2、業務無關的資料引發的資料傾斜

        實際業務中有些大量的NULL值或者一些無意義的資料參與到計算作業 中,這些資料可能來自業務為上報或因資料規範将某類資料進行歸一化變成 空值或空字元串等形式。這些與業務無關的資料引入導緻在進行分組聚合或者在執行表連接配接時發生資料傾斜。對于這類問題引發的資料傾斜,在計算過 程中排除含有這類“異常”資料即可 。

3、 多元聚合計算資料膨脹引起的資料傾斜

        在多元聚合計算時存在這樣的場景:​

​select a,b,c,count(1)from T group by a,b,c with rollup​

​。對于上述的SQL,可以拆解成4種類型的鍵進行分組聚合,它們分别是​

​(a,b,c)​

​、​

​(a,b,null)​

​、​

​(a,null,null)​

​ 和​

​(null,null,null)​

​。

        如果T表的資料量很大,并且Map端的聚合不能很好地起到資料壓縮的 情況下,會導緻Map端産出的資料急速膨脹,這種情況容易導緻作業記憶體溢 出的異常。如果T表含有資料傾斜鍵,會加劇Shuffle過程的資料傾斜 。

        對上述的情況我們會很自然地想到拆解上面的SQL語句,将rollup拆解 成如下多個普通類型分組聚合的組合。

select a, b, c, count(1) from T group by a, b, c; 

select a, b, null, count(1) from T group by a, b; 

select a, null, null, count(1) from T group by a; 

select null, null, null, count(1) from T;      

        這是很笨拙的方法,如果分組聚合的列遠不止3個列,那麼需要拆解的 SQL語句會更多。在Hive中可以通過參數 ​

​(hive.new.job.grouping.set.cardinality)​

​配置的方式自動控制作業的拆解,該 參數預設值是30。該參數表示針對​

​grouping sets/rollups/cubes​

​這類多元聚合的 操作,如果最後拆解的鍵組合(上面例子的組合是4)大于該值,會啟用新的任務去處理大于該值之外的組合。如果在處理資料時,某個分組聚合的列 有較大的傾斜,可以适當調小該值 。

4、無法削減中間結果的資料量引發的資料傾斜

        在一些操作中無法削減中間結果,例如使用​

​collect_list​

​聚合函數,存在如下SQL:

SELECT
    s_age,
    collect_list(s_score) list_score
FROM
    student_tb_txt
GROUP BY
    s_age      

        在​

​student_tb_txt​

​表中,s_age有資料傾斜,但如果資料量大到一定的數 量,會導緻處理傾斜的Reduce任務産生記憶體溢出的異常。針對這種場景,即 使開啟​

​hive.groupby.skewindata​

​配置參數,也不會起到優化的作業,反而會拖累整個作業的運作。

        啟用該配置參數會将作業拆解成兩個作業,第一個作業會盡可能将 Map 的資料平均配置設定到Reduce階段,并在這個階段實作資料的預聚合,以減少第二個作業處理的資料量;第二個作業在第一個作業處理的資料基礎上進行結果的聚合。

        ​

​hive.groupby.skewindata​

​的核心作用在于生成的第一個作業能夠有效減少數量。但是對于​

​collect_list​

​這類要求全量操作所有資料的中間結果的函數來說,明顯起不到作用,反而因為引入新的作業增加了磁盤和網絡I/O的負擔,而導緻性能變得更為低下 。

5、兩個Hive資料表連接配接時引發的資料傾斜

繼續閱讀