天天看點

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

本文是對​​《【硬剛大資料之學習路線篇】2021年從零到大資料專家的學習指南(全面更新版)》​​的面試部分補充。

前言

hive從2008年始于facebook工程師之手,經過10幾年的發展至今保持強大的生命力。截止目前hive已經更新至3.1.x版本,hive從最開始的為人诟病的速度慢迅速發展,開始支援更多的計算引擎,計算速度大大提升。

本文我們将從原理、應用、調優分别講解hive所支援的mapreduce、tez、spark引擎。

我們在之前的文章中:

《硬剛hive|4萬字基礎調優面試小總結》

《當我們在學習hive的時候在學習什麼?「硬剛hive續集」》

對hive的mapreduce引擎已經做過非常詳細的講解了。

本文首發自公衆号:《import_bigdata》,大資料技術與架構。

在hive2.x版本中,hivesql會被轉化為mr任務,這也是我們經常說的hivesql的執行原理。

我們先來看下 hive 的底層執行架構圖, hive 的主要元件與 hadoop 互動的過程:

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

hive底層執行架構

在 hive 這一側,總共有五個元件:

ui:使用者界面。可看作我們送出sql語句的指令行界面。

driver:驅動程式。接收查詢的元件。該元件實作了會話句柄的概念。

compiler:編譯器。負責将 sql 轉化為平台可執行的執行計劃。對不同的查詢塊和查詢表達式進行語義分析,并最終借助表和從 metastore 查找的分區中繼資料來生成執行計劃。

metastore:中繼資料庫。存儲 hive 中各種表和分區的所有結構資訊。

execution engine:執行引擎。負責送出 compiler 階段編譯好的執行計劃到不同的平台上。

上圖的基本流程是:

步驟1:ui 調用 driver 的接口;

步驟2:driver 為查詢建立會話句柄,并将查詢發送到 compiler(編譯器)生成執行計劃;

步驟3和4:編譯器從中繼資料存儲中擷取本次查詢所需要的中繼資料,該中繼資料用于對查詢樹中的表達式進行類型檢查,以及基于查詢謂詞修建分區;

步驟5:編譯器生成的計劃是分階段的dag,每個階段要麼是 map/reduce 作業,要麼是一個中繼資料或者hdfs上的操作。将生成的計劃發給 driver。

如果是 map/reduce 作業,該計劃包括 map operator trees 和一個 reduce operator tree,執行引擎将會把這些作業發送給 mapreduce :

步驟6、6.1、6.2和6.3:執行引擎将這些階段送出給适當的元件。在每個 task(mapper/reducer) 中,從hdfs檔案中讀取與表或中間輸出相關聯的資料,并通過相關算子樹傳遞這些資料。最終這些資料通過序列化器寫入到一個臨時hdfs檔案中(如果不需要 reduce 階段,則在 map 中操作)。臨時檔案用于向計劃中後面的 map/reduce 階段提供資料。

步驟7、8和9:最終的臨時檔案将移動到表的位置,確定不讀取髒資料(檔案重命名在hdfs中是原子操作)。對于使用者的查詢,臨時檔案的内容由執行引擎直接從hdfs讀取,然後通過driver發送到ui。

hive sql 編譯成 mapreduce 過程

美團部落格中有一篇非常詳細的部落格講解《hive sql的編譯過程》。

你可以參考: https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html

編譯 sql 的任務是在上節中介紹的 compiler(編譯器元件)中完成的。hive将sql轉化為mapreduce任務,整個編譯過程分為六個階段:

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

hive sql編譯過程

<code>詞法、文法解析</code>: antlr 定義 sql 的文法規則,完成 sql 詞法,文法解析,将 sql 轉化為抽象文法樹 ast tree;

antlr是一種語言識别的工具,可以用來構造領域語言。使用antlr構造特定的語言隻需要編寫一個文法檔案,定義詞法和文法替換規則即可,antlr完成了詞法分析、文法分析、語義分析、中間代碼生成的過程。

語義解析: 周遊 ast tree,抽象出查詢的基本組成單元 queryblock;

生成邏輯執行計劃: 周遊 queryblock,翻譯為執行操作樹 operatortree;

優化邏輯執行計劃: 邏輯層優化器進行 operatortree 變換,合并 operator,達到減少 mapreduce job,減少資料傳輸及 shuffle 資料量;

生成實體執行計劃: 周遊 operatortree,翻譯為 mapreduce 任務;

優化實體執行計劃: 實體層優化器進行 mapreduce 任務的變換,生成最終的執行計劃。

下面對這六個階段詳細解析:

為便于了解,我們拿一個簡單的查詢語句進行展示,對5月23号的地區維表進行查詢:

階段一:詞法、文法解析

根據antlr定義的sql文法規則,将相關sql進行詞法、文法解析,轉化為抽象文法樹ast tree:

階段二:語義解析

周遊ast tree,抽象出查詢的基本組成單元queryblock:

ast tree生成後由于其複雜度依舊較高,不便于翻譯為mapreduce程式,需要進行進一步抽象和結構化,形成queryblock。

queryblock是一條sql最基本的組成單元,包括三個部分:輸入源,計算過程,輸出。簡單來講一個queryblock就是一個子查詢。

queryblock的生成過程為一個遞歸過程,先序周遊 ast tree ,遇到不同的 token 節點(了解為特殊标記),儲存到相應的屬性中。

階段三:生成邏輯執行計劃

周遊queryblock,翻譯為執行操作樹operatortree:

hive最終生成的mapreduce任務,map階段和reduce階段均由operatortree組成。

基本的操作符包括:

operator在map reduce階段之間的資料傳遞都是一個流式的過程。每一個operator對一行資料完成操作後之後将資料傳遞給childoperator計算。

由于join/groupby/orderby均需要在reduce階段完成,是以在生成相應操作的operator之前都會先生成一個reducesinkoperator,将字段組合并序列化為reduce key/value, partition key。

階段四:優化邏輯執行計劃

hive中的邏輯查詢優化可以大緻分為以下幾類:

投影修剪

推導傳遞謂詞

謂詞下推

将select-select,filter-filter合并為單個操作

多路 join

查詢重寫以适應某些列值的join傾斜

階段五:生成實體執行計劃

生成實體執行計劃即是将邏輯執行計劃生成的operatortree轉化為mapreduce job的過程,主要分為下面幾個階段:

對輸出表生成movetask

從operatortree的其中一個根節點向下深度優先周遊

reducesinkoperator标示map/reduce的界限,多個job間的界限

周遊其他根節點,遇過碰到joinoperator合并mapreducetask

生成stattask更新中繼資料

剪斷map與reduce間的operator的關系

階段六:優化實體執行計劃

hive中的實體優化可以大緻分為以下幾類:

分區修剪(partition pruning)

基于分區和桶的掃描修剪(scan pruning)

如果查詢基于抽樣,則掃描修剪

在某些情況下,在 map 端應用 group by

在 mapper 上執行 join

優化 union,使union隻在 map 端執行

在多路 join 中,根據使用者提示決定最後流哪個表

删除不必要的 reducesinkoperators

對于帶有limit子句的查詢,減少需要為該表掃描的檔案數

對于帶有limit子句的查詢,通過限制 reducesinkoperator 生成的内容來限制來自 mapper 的輸出

減少使用者送出的sql查詢所需的tez作業數量

如果是簡單的提取查詢,避免使用mapreduce作業

對于帶有聚合的簡單擷取查詢,執行不帶 mapreduce 任務的聚合

重寫 group by 查詢使用索引表代替原來的表

當表掃描之上的謂詞是相等謂詞且謂詞中的列具有索引時,使用索引掃描

經過以上六個階段,sql 就被解析映射成了叢集上的 mapreduce 任務。

hive explain 語句類似mysql 的explain 語句,提供了對應查詢的執行計劃,對于我們在了解hive底層邏輯、hive調優、hive sql書寫等方面提供了一個參照,在我們的生産工作了是一個很有意義的工具。

hive explain文法

explain [extended|cbo|ast|dependency|authorization|locks|vectorization|analyze] query

hive explain的文法規則如上,後面将按照對應的子句進行探讨。

extended 語句會在執行計劃中産生關于算子(operator)的額外資訊,這些資訊都是典型的實體資訊,如檔案名稱等。

在執行explain query 之後,一個查詢會被轉化為包含多個stage的語句(看起來更像一個dag)。這些stages要麼是map/reduce stage,要麼是做些中繼資料或檔案系統操作的stage (如 move 、rename等)。explain的輸出包含2個部分:

執行計劃不同stage之間的以來關系(dependency)

每個stage的執行描述資訊(description)

以下将通過一個簡單的例子進行解釋。

執行explain 語句

explain輸出結果解析

依賴圖

一個hive查詢被轉換為一個由一個或多個stage組成的序列(有向無環圖dag)。這些stage可以是mapreduce stage,也可以是負責中繼資料存儲的stage,也可以是負責檔案系統的操作(比如移動和重命名)的stage。

我們将上述結果拆分看,先從最外層開始,包含兩個大的部分:

stage dependencies:各個stage之間的依賴性

stage plan:各個stage的執行計劃

先看第一部分 stage dependencies ,包含兩個 stage,stage-1 是根stage,說明這是開始的stage,stage-0 依賴 stage-1,stage-1執行完成後執行stage-0。

再看第二部分 stage plan,裡面有一個 map reduce,一個mr的執行計劃分為兩個部分

map operator tree:map端的執行計劃樹

reduce operator tree:reduce端的執行計劃樹

這兩個執行計劃樹裡面包含這條sql語句的 operator

tablescan:表掃描操作,map端第一個操作肯定是加載表,是以就是表掃描操作,常見的屬性:

select operator:選取操作,常見的屬性 :

group by operator:分組聚合操作,常見的屬性:

reduce output operator:輸出到reduce操作,常見屬性:

filter operator:過濾操作,常見的屬性:

map join operator:join 操作,常見的屬性:

file output operator:檔案輸出操作,常見的屬性:

fetch operator 用戶端擷取資料操作,常見的屬性:

explain使用場景

那麼explain能夠為我們在生産實踐中帶來哪些便利及解決我們哪些迷惑呢?

join 語句會過濾 null 的值嗎?

現在,我們在hive cli 輸入以下查詢計劃語句

然後執行:

我們來看結果:

從上述結果可以看到 predicate: id is not null 這樣一行,說明 join 時會自動過濾掉關聯字段為 null 值的情況,但 left join 或 full join 是不會自動過濾null值的,大家可以自行嘗試下。

group by 分組語句會進行排序嗎?

直接來看 explain 之後結果:

我們看 group by operator,裡面有 keys: id (type: int) 說明按照 id 進行分組的,再往下看還有 sort order: + ,說明是按照 id 字段進行正序排序的。

哪條sql執行效率高

觀察如下兩條sql:

這兩條sql語句輸出的結果是一樣的,但是哪條sql執行效率高呢?

有人說第一條sql執行效率高,因為第二條sql有子查詢,子查詢會影響性能; 有人說第二條sql執行效率高,因為先過濾之後,在進行join時的條數減少了,是以執行效率就高了。 到底哪條sql效率高呢,我們直接在sql語句前面加上 explain,看下執行計劃不就知道了嘛!

在第一條sql語句前加上 explain,得到如下結果:

在第二條sql語句前加上 explain,得到如下結果:

大家有什麼發現,除了表别名不一樣,其他的執行計劃完全一樣,都是先進行 where 條件過濾,在進行 join 條件關聯。說明 hive 底層會自動幫我們進行優化,是以這兩條sql語句執行效率是一樣的。

以上僅列舉了3個我們生産中既熟悉又有點迷糊的例子,explain 還有很多其他的用途,如檢視stage的依賴情況、排查資料傾斜、hive 調優等,小夥伴們可以自行嘗試。

explain dependency的用法

explain dependency用于描述一段sql需要的資料來源,輸出是一個json格式的資料,裡面包含以下兩個部分的内容:

input_partitions:描述一段sql依賴的資料來源表分區,裡面存儲的是分區名的清單,如果整段sql包含的所有表都是非分區表,則顯示為空。

input_tables:描述一段sql依賴的資料來源表,裡面存儲的是hive表名的清單。

使用explain dependency檢視sql查詢非分區普通表,在 hive cli 中輸入以下指令:

得到如下結果:

使用explain dependency檢視sql查詢分區表,在 hive cli 中輸入以下指令:

得到結果:

explain dependency的使用場景有兩個:

場景一:快速排除。快速排除因為讀取不到相應分區的資料而導緻任務資料輸出異常。例如,在一個以天分區的任務中,上遊任務因為生産過程不可控因素出現異常或者空跑,導緻下遊任務引發異常。通過這種方式,可以快速檢視sql讀取的分區是否出現異常。

場景二:理清表的輸入,幫助了解程式的運作,特别是有助于了解有多重子查詢,多表連接配接的依賴輸入。

下面通過兩個案例來看explain dependency的實際運用:

識别看似等價的代碼

有如下兩條看似相等的sql:

代碼一:

代碼二:

我們看下上述兩段代碼explain dependency的輸出結果:

代碼1的explain dependency結果:

代碼2的explain dependency結果:

通過上面的輸出結果可以看到,其實上述的兩個sql并不等價,代碼1在内連接配接(inner join)中的連接配接條件(on)中加入非等值的過濾條件後,并沒有将内連接配接的左右兩個表按照過濾條件進行過濾,内連接配接在執行時會多讀取part=0的分區資料。而在代碼2中,會過濾掉不符合條件的分區。

識别sql讀取資料範圍的差别

有如下兩段代碼:

以上兩個代碼的資料讀取範圍是一樣的嗎?答案是不一樣,我們通過explain dependency來看下:

可以看到,對左外連接配接在連接配接條件中加入非等值過濾的條件,如果過濾條件是作用于右表(b表)有起到過濾的效果,則右表隻要掃描兩個分區即可,但是左表(a表)會進行全表掃描。如果過濾條件是針對左表,則完全沒有起到過濾的作用,那麼兩個表将進行全表掃描。這時的情況就如同全外連接配接一樣都需要對兩個資料進行全表掃描。

在使用過程中,容易認為代碼片段2可以像代碼片段1一樣進行資料過濾,通過檢視explain dependency的輸出結果,可以知道不是如此。

explain authorization 的用法

通過explain authorization可以知道目前sql通路的資料來源(inputs) 和資料輸出(outputs),以及目前hive的通路使用者 (current_user)和操作(operation)。

在 hive cli 中輸入以下指令:

結果如下:

從上面的資訊可知:

上面案例的資料來源是defalut資料庫中的 student_tb_orc表;

資料的輸出路徑是hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-90f1475a3ed5/-mr-10000;

目前的操作使用者是hdfs,操作是查詢;

觀察上面的資訊我們還會看到authorization_failures資訊,提示對目前的輸入沒有查詢權限,但如果運作上面的sql的話也能夠正常運作。為什麼會出現這種情況?hive在預設不配置權限管理的情況下不進行權限驗證,所有的使用者在hive裡面都是超級管理者,即使不對特定的使用者進行賦權,也能夠正常查詢。

tez是apache開源的支援dag作業的計算架構,是支援hadoop2.x的重要引擎。它源于mapreduce架構,核心思想是将map和reduce兩個操作進一步拆分,分解後的元操作可以任意靈活組合,産生新的操作,這些操作經過一些控制程式組裝後,可形成一個大的dag作業。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

tez将map task和reduce task進一步拆分為如下圖所示:

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

tez的task由input、processor、output階段組成,可以表達所有複雜的map、reduce操作,如下圖:

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

tez的實作

tez對外提供了6種可程式設計元件,分别是:

1)input:對輸入資料源的抽象,它解析輸入資料格式,并吐出一個個key/value

2)output:對輸出資料源的抽象,它将使用者程式産生的key/value寫入檔案系統

3)paritioner:對資料進行分片,類似于mr中的partitioner

4)processor:對計算的抽象,它從一個input中擷取資料,經處理後,通過output輸出

5)task:對任務的抽象,每個task由一個input、ouput和processor組成

6)maser:管理各個task的依賴關系,并按順依賴關系執行他們

除了以上6種元件,tez還提供了兩種算子,分别是sort(排序)和shuffle(混洗),為了使用者使用友善,它還提供了多種input、output、task和sort的實作,具體如下:

1)input實作:localmergedinput(檔案本地合并後作為輸入),shuffledmergedinput(遠端拷貝資料且合并後作為輸入)

2)output實作:inmemorysortedoutput(記憶體排序後輸出),localonfilesorteroutput(本地磁盤排序後輸出),onfilesortedoutput(磁盤排序後輸出)

3)task實作:runtimetask(非常簡單的task,基本沒做什麼事)

4)sort實作:defaultsorter(本地資料排序),inmemoryshufflesorter(遠端拷貝資料并排序)

為了展示tez的使用方法和驗證tez架構的可用性,apache在yarn mrappmaster基礎上使用tez程式設計接口重新設計了mapreduce架構,使之可運作在yarn中。為此,tez提供了以下幾個元件:

1)input:simpleinput(直接使用mr inputformat擷取資料)

2)output:simpleoutput(直接使用mr outputformat擷取資料)

3)partition:mrpartitioner(直接使用mr partitioner擷取資料)

4)processor:mapprocessor(執行map task),reduceprocessor(執行reduce task)

5)task:finaltask,initialtask,initialtaskwithinmemsort,initialtaskwithlocalsort ,intermediatetask,localfinaltask,maponlytask。

對于mapreduce作業而言,如果隻有map task,則使用maponlytask,否則,map task使用initialtaskwithinmemsort而reduce task用finaltask。當然,如果你想編寫其他類型的作業,可使用以上任何幾種task進行組合,比如”initialtaskwithinmemsort –&gt; finaltask”是mapreduce作業。

為了減少tez開發工作量,并讓tez能夠運作在yarn之上,tez重用了大部分yarn中mrappmater的代碼,包括用戶端、資源申請、任務推測執行、任務啟動等。

tez和mapreduce作業的比較:

tez繞過了mapreduce很多不必要的中間的資料存儲和讀取的過程,直接在一個作業中表達了mapreduce需要多個作業共同協作才能完成的事情。

tez和mapreduce一樣都運作使用yarn作為資源排程和管理。但與mapreduce on yarn不同,tez on yarn并不是将作業送出到resourcemanager,而是送出到ampoolserver的服務上,ampoolserver存放着若幹已經預先啟動applicationmaster的服務。

當使用者送出一個作業上來後,ampoolserver從中選擇一個applicationmaster用于管理使用者送出上來的作業,這樣既可以節省resourcemanager建立applicationmaster的時間,而又能夠重用每個applicationmaster的資源,節省了資源釋放和建立時間。

tez相比于mapreduce有幾點重大改進:

當查詢需要有多個reduce邏輯時,hive的mapreduce引擎會将計劃分解,每個redcue送出一個mr作業。這個鍊中的所有mr作業都需要逐個排程,每個作業都必須從hdfs中重新讀取上一個作業的輸出并重新洗牌。而在tez中,幾個reduce接收器可以直接連接配接,資料可以流水線傳輸,而不需要臨時hdfs檔案,這種模式稱為mrr(map-reduce-reduce*)。

tez還允許一次發送整個查詢計劃,實作應用程式動态規劃,進而使架構能夠更智能地配置設定資源,并通過各個階段流水線傳輸資料。對于更複雜的查詢來說,這是一個巨大的改進,因為它消除了io/sync障礙和各個階段之間的排程開銷。

在mapreduce計算引擎中,無論資料大小,在shuffle階段都以相同的方式執行,将資料序列化到磁盤,再由下遊的程式去拉取,并反序列化。tez可以允許小資料集完全在記憶體中處理,而mapreduce中沒有這樣的優化。倉庫查詢經常需要在處理完大量的資料後對小型資料集進行排序或聚合,tez的優化也能極大地提升效率。

給 hive 換上 tez 非常簡單,隻需給 hive-site.xml 中設定:

設定hive.execution.engine為 tez 後進入到 hive 執行 sql:

可以看到,我的 userinfo 中有 100w 條記錄,執行一遍 count 需要 6.19s。 現在把 engine 換為 mr

再次執行 count userinfo:

可以看到,使用 tez 效率比 mapreduce 有近3倍的提升。而且,hive 在使用 tez 引擎執行時,有 ==&gt;&gt; 動态的進度訓示。而在使用 mr 時,隻有日志輸出 map and reduce 的進度百分比。使用 tez,輸出的日志也清爽很多。

在我測試的很多複雜的 sql,tez 的都比 mapreduce 快很多,快慢取決于 sql 的複雜度。執行簡單的 select 等并不能展現 tez 的優勢。tez 内部翻譯 sql 能任意的 map,reduce,reduce 組合,而 mr 隻能 map-&gt;reduce-&gt;map-&gt;reduce,是以在執行複雜 sql 時, tez 的優勢明顯。

tez 參數優化

優化參參數(在同樣條件下,使用了tez從300s+降到200s+)

tez記憶體優化

1. am、container大小設定

參數說明:set tez.am.resource.memory.mb tobe the same as yarn.scheduler.minimum-allocation-mb the yarnminimum container size.
參數說明:set hive.tez.container.size to be the same as or a small multiple(1 or 2 times that) of yarn container size yarn.scheduler.minimum-allocation-mb but never more than yarn.scheduler.maximum-allocation-mb.

2. am、container jvm參數設定

預設值:80% * tez.am.resource.memory.mb,一般不需要調整
預設值:80% * hive.tez.container.size 參數說明:hortonworks建議"–server –djava.net.preferipv4stack=true–xx:newratio=8 –xx:+usenuma –xx:useg1g"
預設值:0.8,參數說明:task/am占用jvm xmx的比例,該參數建議調整,需根據具體業務情況修改;

3. hive記憶體map join參數設定

預設值:100,參數說明:輸出排序需要的記憶體大小。建議值:40% * hive.tez.container.size,一般不超過2g.
預設值:true,參數說明:是否将多個mapjoin合并為一個,使用預設值
預設值為10mb,參數說明:多個mapjoin轉換為1個時,所有小表的檔案大小總和的最大值,這個值隻是限制輸入的表檔案的大小,并不代表實際mapjoin時hashtable的大小。 建議值:1/3 * hive.tez.container.size
預設值:100,參數說明:size of the buffer to use if not writing directly to disk。建議值: 10% * hive.tez.container.size.

4. container重用設定

預設值:true,參數說明:container重用開關

hive社群于2014年推出了hive on spark項目(hive-7292),将spark作為繼mapreduce和tez之後hive的第三個計算引擎。該項目由cloudera、intel和mapr等幾家公司共同開發,并受到了來自hive和spark兩個社群的共同關注。通過該項目,可以提高hive查詢的性能,同時為已經部署了hive或者spark的使用者提供了更加靈活的選擇,進而進一步提高hive和spark的普及率。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

總體設計

hive on spark總體的設計思路是,盡可能重用hive邏輯層面的功能;從生成實體計劃開始,提供一整套針對spark的實作,比如 sparkcompiler、sparktask等,這樣hive的查詢就可以作為spark的任務來執行了。以下是幾點主要的設計原則。

盡可能減少對hive原有代碼的修改。這是和之前的shark設計思路最大的不同。shark對hive的改動太大以至于無法被hive社群接受,hive on spark盡可能少改動hive的代碼,進而不影響hive目前對mapreduce和tez的支援。同時,hive on spark保證對現有的mapreduce和tez模式在功能和性能方面不會有任何影響。

對于選擇spark的使用者,應使其能夠自動的擷取hive現有的和未來新增的功能。

盡可能降低維護成本,保持對spark依賴的松耦合。

基于以上思路和原則,具體的一些設計架構如下。

hive 的使用者可以通過hive.execution.engine來設定計算引擎,目前該參數可選的值為mr和tez。為了實作hive on spark,我們将spark作為該參數的第三個選項。要開啟hive on spark模式,使用者僅需将這個參數設定為spark即可。

在hive中使用以下語句開啟:

spark 以分布式可靠資料集(resilient distributed dataset,rdd)作為其資料抽象,是以我們需要将hive的表轉化為rdd以便spark處理。本質上,hive的表和spark的 hadooprdd都是hdfs上的一組檔案,通過inputformat和recordreader讀取其中的資料,是以這個轉化是自然而然的。

spark為rdd提供了一系列的轉換(transformation),其中有些轉換也是面向sql 的,如groupbykey、join等。但如果使用這些轉換(就如shark所做的那樣),就意味着我們要重新實作一些hive已有的功能;而且當 hive增加新的功能時,我們需要相應地修改hive on spark模式。有鑒于此,我們選擇将hive的操作符包裝為function,然後應用到rdd上。這樣,我們隻需要依賴較少的幾種rdd的轉換,而主要的計算邏輯仍由hive提供。

由于使用了hive的原語,是以我們需要顯式地調用一些transformation來實作shuffle的功能。下表中列舉了hive on spark使用的所有轉換。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

hive on spark

對repartitionandsortwithinpartitions 簡單說明一下,這個功能由spark-2978引入,目的是提供一種mapreduce風格的shuffle。雖然sortbykey也提供了排序的功 能,但某些情況下我們并不需要全局有序,另外其使用的range partitioner對于某些hive的查詢并不适用。

實體執行計劃

通過sparkcompiler将operator tree轉換為task tree,其中需要送出給spark執行的任務即為sparktask。不同于mapreduce中map+reduce的兩階段執行模式,spark采用dag執行模式,是以一個sparktask包含了一個表示rdd轉換的dag,我們将這個dag包裝為sparkwork。執行sparktask 時,就根據sparkwork所表示的dag計算出最終的rdd,然後通過rdd的foreachasync來觸發運算。使用foreachasync是因為我們使用了hive原語,是以不需要rdd傳回結果;此外foreachasync異步送出任務便于我們對任務進行監控。

sparkcontext生命周期

sparkcontext 是使用者與spark叢集進行互動的接口,hive on spark應該為每個使用者的會話建立一個sparkcontext。但是spark目前的使用方式假設sparkcontext的生命周期是spark應 用級别的,而且目前在同一個jvm中不能建立多個sparkcontext。這明顯無法滿足hiveserver2的應用場景,因為多個用戶端需要通過同一個hiveserver2來提供服務。鑒于此,我們需要在單獨的jvm中啟動sparkcontext,并通過rpc與遠端的sparkcontext進行通信。

任務監控與統計資訊收集

spark提供了sparklistener接口來監聽任務執行期間的各種事件,是以我們可以實作一個listener來監控任務執行進度以及收集任務級别的統計信 息(目前任務級别的統計由sparklistener采集,任務進度則由spark提供的專門的api來監控)。另外hive還提供了operator級 别的統計資料資訊,比如讀取的行數等。在mapreduce模式下,這些資訊通過hadoop counter收集。我們可以使用spark提供的accumulator來實作該功能。

細節實作

hive on spark解析sql的過程

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

sql語句在分析執行過程中會經曆下圖所示的幾個步驟

文法解析

操作綁定

優化執行政策

傳遞執行

文法解析之後,會形成一棵文法樹,如下圖所示。樹中的每個節點是執行的rule,整棵樹稱之為執行政策。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

政策優化

形成上述的執行政策樹還隻是第一步,因為這個執行政策可以進行優化,所謂的優化就是對樹中節點進行合并或是進行順序上的調整。

以大家熟悉的join操作為例,下圖給出一個join優化的示例。a join b等同于b join a,但是順序的調整可能給執行的性能帶來極大的影響,下圖就是調整前後的對比圖。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

在hash join中,首先被通路的表稱之為“内部建構表”,第二個表為“探針輸入”。建立内部表時,會将資料移動到資料倉庫指向的路徑;建立外部表,僅記錄資料所在的路徑。

再舉一例,一般來說盡可能的先實施聚合操作(aggregate)然後再join

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

這種優化自動完成,在調優時不需要考慮。

sql到spark作業的轉換過程

native command的執行流程

由于native command是一些非耗時的操作,直接使用hive中原有的exeucte engine來執行即可。這些command的執行示意圖如下:

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

sparktask的生成和執行

我們通過一個例子來看一下一個簡單的兩表join查詢如何被轉換為sparktask并被執行。下圖左半部分展示了這個查詢的operator tree,以及該operator tree如何被轉化成sparktask;右半部分展示了該sparktask執行時如何得到最終的rdd并通過foreachasync送出spark任務。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

sparkcompiler周遊operator tree,将其劃分為不同的mapwork和reducework。

mapwork為根節點,總是由tablescanoperator(hive中對表進行掃描的操作符)開始;後續的work均為reducework。reducesinkoperator(hive中進行shuffle輸出的操作符)用來标記兩個work之間的界線,出現reducesinkoperator表示目前work到下一個work之間的資料需要進行shuffle。是以,當我們發現reducesinkoperator時,就會建立一個新的reducework并作為目前work的子節點。包含了filesinkoperator(hive中将結果輸出到檔案的操作符)的work為葉子節點。

與mapreduce最大的不同在于,我們并不要求reducework一定是葉子節點,即reducework之後可以連結更多的reducework,并在同一個sparktask中執行。

從該圖可以看出,這個查詢的operator tree被轉化成了兩個mapwork和一個reducework。

執行sparktask步驟:

根據mapwork來生成最底層的hadooprdd,

将各個mapwork和reducework包裝成function應用到rdd上。

在有依賴的work之間,需要顯式地調用shuffle轉換,具體選用哪種shuffle則要根據查詢的類型來确定。另外,由于這個例子涉及多表查詢,是以在shuffle之前還要對rdd進行union。

經過這一系列轉換後,得到最終的rdd,并通過foreachasync送出到spark叢集上進行計算。

在logicalplan到physicalplan的轉換過程中,tordd最關鍵的元素

我們通過一個例子來看一下一個簡單的兩表join查詢如何被轉換為sparktask并被執行。下圖左半部分展示了這個查詢的operator tree,以及該operator tree如何被轉化成sparktask;右半部分展示了該sparktask執行時如何得到最終的rdd并通過foreachasync送出spark任務。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

sparkcompiler周遊operator tree,将其劃分為不同的mapwork和reducework。mapwork為根節點,總是由tablescanoperator(hive中對表 進行掃描的操作符)開始;後續的work均為reducework。reducesinkoperator(hive中進行shuffle輸出的操作符) 用來标記兩個work之間的界線,出現reducesinkoperator表示目前work到下一個work之間的資料需要進行shuffle。是以, 當我們發現reducesinkoperator時,就會建立一個新的reducework并作為目前work的子節點。包含了 filesinkoperator(hive中将結果輸出到檔案的操作符)的work為葉子節點。與mapreduce最大的不同在于,我們并不要求 reducework一定是葉子節點,即reducework之後可以連結更多的reducework,并在同一個sparktask中執行。

從該圖可以看出,這個查詢的operator tree被轉化成了兩個mapwork和一個reducework。在執行sparktask時,首先根據mapwork來生成最底層的 hadooprdd,然後将各個mapwork和reducework包裝成function應用到rdd上。在有依賴的work之間,需要顯式地調用 shuffle轉換,具體選用哪種shuffle則要根據查詢的類型來确定。另外,由于這個例子涉及多表查詢,是以在shuffle之前還要對rdd進行 union。經過這一系列轉換後,得到最終的rdd,并通過foreachasync送出到spark叢集上進行計算。

運作模式

hive on spark支援兩種運作模式:本地和遠端。當使用者把spark master url設定為local時,采用本地模式;其餘情況則采用遠端模式。本地模式下,sparkcontext與用戶端運作在同一個jvm中;遠端模式 下,sparkcontext運作在一個獨立的jvm中。提供本地模式主要是為了友善調試,一般使用者不應選擇該模式。是以我們這裡也主要介紹遠端模式 (remote sparkcontext,rsc)。下圖展示了rsc的工作原理。

【硬剛大資料】Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

使用者的每個session會建立一個sparkclient,sparkclient會啟動remotedriver程序,并由remotedriver創 建sparkcontext。sparktask執行時,通過session送出任務,任務的主體就是對應的sparkwork。sparkclient 将任務送出給remotedriver,并傳回一個sparkjobref,通過該sparkjobref,用戶端可以監控任務執行進度,進行錯誤處理, 以及采集統計資訊等。由于最終的rdd計算沒有傳回結果,是以用戶端隻需要監控執行進度而不需要處理傳回值。remotedriver通過 sparklistener收集任務級别的統計資料,通過accumulator收集operator級别的統計資料(accumulator被包裝為 sparkcounter),并在任務結束時傳回給sparkclient。

sparkclient 與remotedriver之間通過基于netty的rpc進行通信。除了送出任務,sparkclient還提供了諸如添加jar包、擷取叢集資訊等接 口。如果用戶端需要使用更一般的sparkcontext的功能,可以自定義一個任務并通過sparkclient發送到remotedriver上執 行。

理論上來說,hive on spark對于spark叢集的部署方式沒有特别的要求,除了local以外,remotedriver可以連接配接到任意的spark叢集來執行任務。在我 們的測試中,hive on spark在standalone和spark on yarn的叢集上都能正常工作(需要動态添加jar包的查詢在yarn-cluster模式下還不能運作,請參考hive-9425)。

優化

yarn的配置

​<code>​yarn.nodemanager.resource.cpu-vcores​</code>​和​<code>​yarn.nodemanager.resource.memory-mb​</code>​,這兩個參數決定這叢集資料總管能夠有多少資源用于運作yarn上的任務。 這兩個參數的值是由機器的配置及同時在機器上運作的其它程序共同決定。本文假設僅有hdfs的datanode和yarn的nodemanager運作于該節點。

配置cores

基本配置是datanode和nodemanager各一個核,作業系統兩個核,然後剩下28核配置作為yarn資源。也即是yarn.nodemanager.resource.cpu-vcores=28

配置記憶體

對于記憶體,預留20gb給作業系統,datanode,nodemanager,剩餘100gb作為yarn資源。也即是 yarn.nodemanager.resource.memory-mb=100*1024

spark配置

假設yarn節點機器配置,假設有32核,120gb記憶體。

給yarn配置設定資源以後,那就要想着spark如何使用這些資源了,主要配置對象:

execurtor 和driver記憶體,executro配額,并行度。

executor記憶體

設定executor記憶體需要考慮如下因素:

executor記憶體越多,越能為更多的查詢提供map join的優化。由于垃圾回收的壓力會導緻開銷增加。

某些情況下hdfs的用戶端不能很好的處理并發寫入,是以過多的核心可能會導緻競争。

為了最大化使用core,建議将core設定為4,5,6(多核心會導緻并發問題,是以寫代碼的時候尤其是靜态的連結等要考慮并發問題)具體配置設定核心數要結合yarn所提供的核心數。 由于本文中涉及到的node節點是28核,那麼很明顯配置設定為4的化可以被整除,spark.executor.cores設定為4 不會有多餘的核剩下,設定為5,6都會有core剩餘。 spark.executor.cores=4,由于總共有28個核,那麼最大可以申請的executor數是7。總記憶體處以7,也即是 100/7,可以得到每個executor約14gb記憶體。

要知道 ​<code>​spark.executor.memory​</code>​ 和​<code>​spark.executor.memoryoverhead​</code>​共同決定着executor記憶體。建議​<code>​spark.executor.memoryoverhead​</code>​站總記憶體的 15%-20%。 那麼最終​<code>​spark.executor.memoryoverhead=2g​</code>​和​<code>​spark.executor.memory=12g​</code>​.

根據上面的配置的化,每個主機就可以申請7個executor,每個executor可以運作4個任務,每個core一個task。那麼每個task的平均記憶體是 14/4 = 3.5gb。在executor運作的task共享記憶體。 其實,executor内部是用newcachedthreadpool運作task的。

確定​<code>​spark.executor.memoryoverhead​</code>​和​<code>​spark.executor.memory​</code>​的和不超過​<code>​yarn.scheduler.maximum-allocation-mb​</code>​。

driver記憶體

對于drvier的記憶體配置,當然也包括兩個參數。

spark.driver.memoryoverhead 每個driver能從yarn申請的堆外記憶體的大小。

spark.driver.memory 當運作hive on spark的時候,每個spark driver能申請的最大jvm 堆記憶體。該參數結合 spark.driver.memoryoverhead共同決定着driver的記憶體大小。

driver的記憶體大小并不直接影響性能,但是也不要job的運作受限于driver的記憶體. 這裡給出spark driver記憶體申請的方案,假設yarn.nodemanager.resource.memory-mb是 x。

driver記憶體申請12gb,假設 x &gt; 50gb

driver記憶體申請 4gb,假設 12gb &lt; x &lt;50gb

driver記憶體申請1gb,假設 1gb &lt; x &lt; 12 gb

driver記憶體申請256mb,假設 x &lt; 1gb

這些數值是​<code>​spark.driver.memory​</code>​和 ​<code>​spark.driver.memoryoverhead​</code>​記憶體的總和。對外記憶體站總記憶體的10%-15%。 假設 ​<code>​yarn.nodemanager.resource.memory-mb=100*1024mb​</code>​,那麼driver記憶體設定為12gb,此時 ​<code>​spark.driver.memory=10.5gb​</code>​和​<code>​spark.driver.memoryoverhead=1.5gb​</code>​

注意,資源多少直接對應的是資料量的大小。是以要結合資源和資料量進行适當縮減和增加。

executor數

executor的數目是由每個節點運作的executor數目和叢集的節點數共同決定。如果你有四十個節點,那麼hive可以使用的最大executor數就是 280(40*7). 最大數目可能比這個小點,因為driver也會消耗1core和12gb。

目前假設是沒有yarn應用在跑。

hive性能與用于運作查詢的executor數量直接相關。 但是,不通查詢還是不通。 通常,性能與executor的數量成比例。 例如,查詢使用四個executor大約需要使用兩個executor的一半時間。 但是,性能在一定數量的executor中達到峰值,高于此值時,增加數量不會改善性能并且可能産生不利影響。

在大多數情況下,使用一半的叢集容量(executor數量的一半)可以提供良好的性能。 為了獲得最佳性能,最好使用所有可用的executor。 例如,設定spark.executor.instances = 280。 對于基準測試和性能測量,強烈建議這樣做。

動态executor申請

雖然将spark.executor.instances設定為最大值通常可以最大限度地提高性能,但不建議在多個使用者運作hive查詢的生産環境中這樣做。 避免為使用者會話配置設定固定數量的executor,因為如果executor空閑,executor不能被其他使用者查詢使用。 在生産環境中,應該好好計劃executor配置設定,以允許更多的資源共享。

spark允許您根據工作負載動态擴充配置設定給spark應用程式的叢集資源集。 要啟用動态配置設定,請按照動态配置設定中的步驟進行操作。 除了在某些情況下,強烈建議啟用動态配置設定。

并行度

要使可用的executor得到充分利用,必須同時運作足夠的任務(并行)。在大多數情況下,hive會自動确定并行度,但也可以在調優并發度方面有一些控制權。 在輸入端,map任務的數量等于輸入格式生成的split數。對于hive on spark,輸入格式為combinehiveinputformat,它可以根據需要對基礎輸入格式生成的split進行分組。 可以更好地控制stage邊界的并行度。調整hive.exec.reducers.bytes.per.reducer以控制每個reducer處理的資料量,hive根據可用的executor,執行程式記憶體,以及其他因素來确定最佳分區數。 實驗表明,隻要生成足夠的任務來保持所有可用的executor繁忙,spark就比mapreduce對hive.exec.reducers.bytes.per.reducer指定的值敏感度低。 為獲得最佳性能,請為該屬性選擇一個值,以便hive生成足夠的任務以完全使用所有可用的executor。

hive配置

hive on spark 共享了很多hive性能相關的配置。可以像調優hive on mapreduce一樣調優hive on spark。 然而,hive.auto.convert.join.noconditionaltask.size是基于統計資訊将基礎join轉化為map join的門檻值,可能會對性能産生重大影響。 盡管該配置可以用hive on mr和hive on spark,但是兩者的解釋不同。

資料的大小有兩個統計名額:

totalsize- 資料在磁盤上的近似大小

rawdatasize- 資料在記憶體中的近似大小

hive on mr用的是totalsize。hive on spark使用的是rawdatasize。由于可能存在壓縮和序列化,這兩個值會有較大的差别。 對于hive on spark 需要将 ​<code>​hive.auto.convert.join.noconditionaltask.size​</code>​指定為更大的值,才能将與hive on mr相同的join轉化為map join。

可以增加此參數的值,以使地圖連接配接轉換更具兇猛。 将common join 轉換為 map join 可以提高性能。 如果此值設定得太大,則來自小表的資料将使用過多記憶體,任務可能會因記憶體不足而失敗。 根據群集環境調整此值。

通過參數 ​<code>​hive.stats.collect.rawdatasize​</code>​ 可以控制是否收集 rawdatasize 統計資訊。

對于hiveserver2,建議再配置兩個額外的參數: ​<code>​hive.stats.fetch.column.stats=true​</code>​ 和 ​<code>​hive.optimize.index.filter=true​</code>​.

hive性能調優通常建議使用以下屬性:

預啟動yarn容器

在開始新會話後送出第一個查詢時,在檢視查詢開始之前可能會遇到稍長的延遲。還會注意到,如果再次運作相同的查詢,它的完成速度比第一個快得多。

spark執行程式需要額外的時間來啟動和初始化yarn上的spark,這會導緻較長的延遲。此外,spark不會等待所有executor在啟動作業之前全部啟動完成,是以在将作業送出到群集後,某些executor可能仍在啟動。 但是,對于在spark上運作的作業,作業送出時可用executor的數量部分決定了reducer的數量。當就緒executor的數量未達到最大值時,作業可能沒有最大并行度。這可能會進一步影響第一個查詢的性能。

在使用者較長期會話中,這個額外時間不會導緻任何問題,因為它隻在第一次查詢執行時發生。然而,諸如oozie發起的hive工作之類的短期繪畫可能無法實作最佳性能。

為減少啟動時間,可以在作業開始前啟用容器預熱。隻有在請求的executor準備就緒時,作業才會開始運作。這樣,在reduce那一側不會減少短會話的并行性。

要啟用預熱功能,請在發出查詢之前将hive.prewarm.enabled設定為true。還可以通過設定hive.prewarm.numcontainers來設定容器數量。預設值為10。

預熱的executor的實際數量受spark.executor.instances(靜态配置設定)或spark.dynamicallocation.maxexecutors(動态配置設定)的值限制。 hive.prewarm.numcontainers的值不應超過配置設定給使用者會話的值。

注意:預熱需要幾秒鐘,對于短會話來說是一個很好的做法,特别是如果查詢涉及reduce階段。 但是,如果hive.prewarm.numcontainers的值高于群集中可用的值,則該過程最多可能需要30秒。請謹慎使用預熱。

繼續閱讀