天天看點

大資料篇--SQL on Hadoop

文章目錄

      • 一、業界常用架構
        • 前言:
        • 1.Hive SQL:
        • 2.Spark SQL:
        • 3.Impala:
        • 4.Presto:
        • 5.Drill:
        • 6.Phoenix:
      • 二、SQL on Hadoop調優政策
        • 1.架構層面調優:
          • (1)分表:
          • (2)分區表:
          • (3)壓縮(compressed):
            • a.為什麼要使用壓縮:
            • b.MapReduce處理過程中哪些地方可以使用到壓縮(Spark同理):
            • c.大資料裡常用的壓縮:
            • d.整合Hadoop的使用:
            • e.整合Hive的使用:
            • f.整合Spark的使用:
        • 2.執行層面調優:
          • (1)推測執行:
          • (2)并行執行:
          • (3)JVM重用:

一、業界常用架構

有幾個概念需要先區分一下:

  Hive on Spark:這個是Hive社群的,它底層支援MR、Spark和Tez。它的意思是說可以直接運作Spark之上。

  Spark SQL:Spark社群裡的。而Spark on Hive的說法是錯誤的。

它們兩支援的力度現在也不一樣,目前Hive on Spark支援的肯定會多一些,因為Hive這個東西發展到現在已經很多年了,是以說它的技術積累是很多的,而Spark SQL是一個新興的架構,它裡面所支援的文法等要稍微差一些。其實作在在Spark 2.2和2.3以後他們基本是都是類似的,能相容和覆寫的面基本上都已經差不多了。對于以前的版本,可能Hive on Spark能運作,但是Spark SQL不一定能運作得通,因為它很多文法并沒有做一個很好的支援。

前言:

  SQL on Hadoop中的Hadoop指的是廣義上面的Hadoop,而不是針對于Hadoop一個架構,指的是大資料生态圈裡面廣義Hadoop的概念。意思就是在大資料架構中我們使用SQL語言就可以搞定,這就是所謂的一個SQL on Hadoop。

  下面架構它們底層MetaStore都是相通的,MetaStore指的是存儲中繼資料資訊的,比如表的名字叫什麼,表裡面有哪些字段,它的資料類型是什麼等,因為隻有把中繼資料資訊有了以後你才可以再把這個中繼資料和檔案系統上面的檔案作用上你才可以提供一個SQL的查詢,否則檔案系統上面的檔案就是一個普通的文本檔案而已,它根本就不知道你的一個中繼資料資訊,是以有了這個東西以後這些架構之間是共享中繼資料資訊的,意思就是你在hive裡面建立的一張表你在Impala、Presto等裡也可以做相應的一個查詢,同理impala裡建立的表在hive裡也可以查詢,是以說MetaStore這個共享中繼資料是非常重要的。否則hive裡面的東西隻能再hive裡面用,這種局限性就太強了,不友善以後架構的一個遷移。

1.Hive SQL:

  我們最常用的肯定是Hive,它是SQL on Hadoop最經典的一個解決方案。它的功能就是把我們的SQL轉換成對應執行引擎的作業,SQL層面你是不用感覺到底運作在哪個執行架構上面的,因為它底層是通過一個參數就可以直接設定執行架構的。執行引擎有好幾種:對于Hive 1.x來說預設的是MapReduce;Hive 2.x以後預設的就是基于Spark了,把MapReduce辨別為一種過時的;還有基于Tez的;Hive的文法和SQL是類似的,但是它們之間并沒有什麼聯系,隻不過說它們的文法長的比較像而已。

2.Spark SQL:

  它提供的并不僅僅是SQL的功能,提供的功能要比SQL功能多很多。通過這個名字很多人都會以為它僅僅是使用SQL,但是這個了解很定是不對的。可以去Spark官網上看一下Spark SQL是如何定義的,這個定義是非常重要的,就決定了你對它的了解是否正确以及影響到你以後的技術選型是否正确。

3.Impala:

  整體的性能是要比hive快的,但是有一點要注意那就是需要提供足夠的記憶體,這個東西簡直就是一個吃記憶體的老虎,如果沒有足夠的記憶體肯定是沒法使用的。

4.Presto:

  在國内京東用的非常多。

5.Drill:

  能做到跨資料源的一個查詢,比如你的文本檔案在HDFS上,它有中繼資料以後可以直接關聯關系型資料庫。

6.Phoenix:

  HBASE查詢性能如何取決于你Rowkey的一個設計,是以說你的Rowkey肯定要基于查詢條件來進行設計的。Hbase官方提供api和指令行的操作而并沒有提供一個SQL的查詢。是以社群上面就出來一個Phoenix,它可以直接使用SQL查詢HBASE中的東西,而且它還可以提供二級索引的功能。

二、SQL on Hadoop調優政策

先介紹兩個概念:行式存儲和列式存儲

  很多人剛開始學習資料庫的時候可能接觸到的都是關系型資料庫RDBMS,它是基于行式存儲的思想來進行資料存儲的;但是資料庫存儲領域還有一個不容小觑的力量,非關系型資料庫NoSQL,其中一部分NoSQL資料庫是采用了列式存儲的思想。

  一般認為原因是Column-Store在存儲格式有優勢,分析類查詢往往隻查詢一個表裡面很少的幾個字段,Column-Store隻需要從磁盤讀取使用者查詢的Column,而Row-Store讀取每一條記錄的時候你會把所有Column的資料讀出來,在IO上Column-Store比Row-Store效率高很多,是以性能更好。如果想深入研究原因可參考:Column-Stores vs. Row-Stores

  對此,行式資料庫給出的優化方案是加“索引”,在OLTP類型的應用中通過索引機制或給表分區等手段可以簡化查詢操作步驟,并提升查詢效率但針對海量資料背景的OLAP應用(例如分布式資料庫、資料倉庫等等)行式存儲的資料庫就有些“力不從心”了。當然,跟行資料庫一樣列式存儲也有不太适用的場景:不适合掃描小量資料;不适合随機的更新;不适合做含有删除和更新的實時操作。随着列式資料庫的發展,傳統的行式資料庫加入了列式存儲的支援,形成具有兩種存儲方式的資料庫系統。例如,随着Oracle 12c推出了in memory元件,使得Oracle資料庫具有了雙模式資料存放方式,進而能夠實作對混合類型應用的支援,當然列式資料庫也有對行式存儲的支援比如HP Vertica。參考:https://stor.51cto.com/art/201809/583648.htm

調優:在資源不變的前提下,讓作業的執行性能有提升。

1.架構層面調優:

(1)分表:

  将一張大資料表分成多個小表。

(2)分區表:

  将同一張表分别儲存到不同的分區

個人感覺:抛開分區表的一些限制和缺陷來說,可以做分區表的情況下一般不拆成實體小表,因為通常需要改程式,而分區表不用。而且分區切換速度非常快,我個人偏向與使用分區表。好像在分區表出現之前,當資料量很大時,一種方法就是将大表分成幾個小表;後來,資料庫自己實作這種功能,就是分區表。整體來看分區表比較好,除了一些特殊情況不能使用分區表外,最好還是使用分區表。

(3)壓縮(compressed):

a.為什麼要使用壓縮:

  壓縮分為無損壓縮(Lossless Compression)和有損壓縮(Lossy Compression)。無損壓縮一般适用于使用者行為資料這類不允許資料丢失的業務場景;有損壓縮一般适用于大檔案的壓縮,例如圖檔、視訊的處理,優點是壓縮率和壓縮比都比較高,可以節省更多的空間。

  随着資料量越來越大,對資料如何處理使得我們提高資料處理效率,如何選擇和使用壓縮就顯得尤為重要。缺點是由于使用資料時需要先解壓,就會加重CPU的負荷。但壓縮的優點:

  • 減少檔案大小(reduce file size)
  • 節省磁盤空間(svae disk space)
  • 增加網絡傳輸速度及效率(Increase tansfer speed at a given data rate)

b.MapReduce處理過程中哪些地方可以使用到壓縮(Spark同理):

大資料篇--SQL on Hadoop

  1描述的是使用壓縮過的資料作為Map的輸入;2描述的是壓縮的中間資料,因為Map的輸出到Reduce的輸入中間是有一個過程;3描述的是Reduce處理完的結果可以進行壓縮。

  對于這一份資料來說假設沒有經過壓縮體積肯定是要大一些的,按照預設的Input Split大小是block size大小,不壓縮Input Split後給Map處理的數量肯定要多一些。如果使用壓縮的話這這分資料體積減小,經過Input Split後的數量也會少一些。當Maps讀的時候會有一個解壓縮,這塊分布式的架構比如MapReduce、Spark等引擎預設内置的,是以對資料到底壓縮不壓縮就不用在你的編碼層面指定,通過配置就可以搞定。Maps進來以後按照業務邏輯比如統計WordCount,他會把每一行資料按照分隔符進行拆分,拆分出來以後把每個單詞賦上一個1。Map輸出會先把資料寫到環形緩存區中,當填滿的時候會刷到磁盤上面,是以刷到磁盤上面的資料又可以進行壓縮。然後Reduce會根據一定規則去Map的輸出裡面拉取資料,并再次解壓,這就是一個中間的過程。到Reduce之後會有一個真正作業結果的輸出,這個輸出以後也可以進行壓縮,這個壓縮以後可以節省空間。

  不同的場景選擇不同的壓縮方式,肯定沒有一個一勞永逸的方法,如果選擇高壓縮比,那麼對于cpu的性能要求要高,同時壓縮、解壓時間耗費也多;選擇壓縮比低的,對于磁盤io、網絡io的時間要多,空間占據要多;對于支援分割的,可以實作并行處理。

  那麼這幾種壓縮該如何做一個選擇呢?需要根據壓縮比、壓縮/解壓速度這兩個角度來考慮。根據上面這個圖我們可以想到中間2這個過程應該選擇速度非常快的,因為中間這個過程無所謂壓縮比大還是小。後面3輸出的話肯定優先選擇壓縮比高的,因為這樣可以節省磁盤空間。對于一開始的1就應該考慮壓縮能否支援分片,如果壓縮不支援分片,即使你使用了壓縮,Map處理的時候也隻有一個Task進行處理就肯定會比較慢的。是以對于這個3個場景要使用不同的壓縮技術。

  為什麼map端用snappy壓縮格式;而reduce用gzip或者bzip2的壓縮格式呢?為什麼每個reduce端壓縮後的資料不要超過一個block的大小呢?

答:1.Map壓縮主要是增加mr運作的效率,我們就需要找壓縮效率最高的壓縮格式,snappy的壓縮時間最快。2.Reduce壓縮就是輸出檔案壓縮 ,故考慮占用磁盤空間的大小;選擇高壓縮比gzip或者bzip2;而考慮到會用reduce結果做二次運算;則對于選用不支援分割gzip或者bzip2原因有兩個:(1)是這兩個壓縮格式高(2)對于不可分割我們采用每個reduce端壓縮後的資料不要超過一個block的大小的方法;則對于後續的map清洗也就不會出現分割問題。

c.大資料裡常用的壓縮:

壓縮格式 壓縮工具 算法 檔案擴充名 Hadoop類 hadoop自帶 是否支援分片
gzip gzip deffault .gz org.apache.hadoop.io.compress.GzipCodec No
bzip2 bzip2 bzip2 .bz2 org.apache.hadoop.io.compress.BZip2Codec Yes
LZO LZO LZO .lzo com.hadoop.compression.lzo.LzoCodec Yes(建索引)
LZ4 LZ4 LZ4 .lz4 org.apache.hadoop.io.compress.Lz4Codec No
Snappy N/A Snappy .snappy org.apache.hadoop.io.compress.SnappyCodec No
大資料篇--SQL on Hadoop

  從上圖可以看出,壓縮比越高,壓縮速率越慢,壓縮時間越長,壓縮比:Snappy<LZ4<LZO<GZIP<BZIP2(注意,壓縮比的大小不是通過數字的大小來看的,是數字越小,壓縮比越大,是以snappy的壓縮比是22.2%,資料最大,但是壓縮比是最小的)。

gzip:優點:壓縮比在四種壓縮方式中較高;hadoop本身支援,在應用中處理gzip格式的檔案就和直接處理文本一樣;有hadoop native庫;大部分linux系統都自帶gzip指令,使用友善。缺點:不支援split。

lzo:優點:壓縮/解壓速度也比較快,合理的壓縮率;支援split,是hadoop中最流行的壓縮格式;支援hadoop native庫;需要在linux系統下自行安裝lzop指令,使用友善。缺點:壓縮率比gzip要低;hadoop本身不支援,需要安裝;lzo雖然支援split,但需要對lzo檔案建索引,否則hadoop也是會把lzo檔案看成一個普通檔案(為了支援split需要建索引,需要指定inputformat為lzo格式)。

snappy:優點:壓縮速度快;支援hadoop native庫;缺點:不支援split;壓縮比低;hadoop本身不支援,需要安裝;linux系統下沒有對應的指令。

bzip2:優點:支援split;具有很高的壓縮率,比gzip壓縮率都高;hadoop本身支援,但不支援native;在linux系統下自帶bzip2指令,使用友善。缺點:壓縮/解壓速度慢;不支援native。

d.整合Hadoop的使用:

檢查Hadoop的壓縮格式是否可用,你可能都是false,這就需要你編譯一下,把壓縮的東西整進來。

大資料篇--SQL on Hadoop

壓縮配置在core-site.xml:

<property>
<name>io.compression.codecs</name>
	<value>
		org.apache.hadoop.io.compress.GzipCodec,
		org.apache.hadoop.io.compress.DefaultCodec,
		org.apache.hadoop.io.compress.BZip2Codec
	</value>
</property>
           

這裡在mapred-site.xml裡配置一下reduce和map輸出,中間輸出并沒有配:

<property>
	<name>mapreduce.output.fileoutputformat.compress</name>
	<value>true</value>
	<description>Reduce是否啟用輸出壓縮</description>
</property>
<property>
	<name>mapreduce.output.fileoutputformat.compress.codec</name>
	<value>org.apache.hadoop.io.compress.GzipCodec</value>
	<description>Reduce輸出壓縮算法:Gzip</description>
</property>
<property>
	<name>mapreduce.map.output.compress</name> 
	<value>true</value>
	<description>Map是否開啟輸出壓縮</description>
</property>	
<property>
	<name>mapreduce.map.output.compress.codec</name>
	<value>org.apache.hadoop.io.compress.SnappyCodec</value>
	<description>Map輸出壓縮算法:Snappy</description>
</property>
           
大資料篇--SQL on Hadoop

e.整合Hive的使用:

測試BZIp2壓縮:

建立一張表:
hive> create table page_views(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

hive> load data local inpath '/home/hadoop/data/page_views.dat' overwrite into table page_views;

查大小:
[[email protected] data]$ hadoop fs -du -s hdfs://hadoop000:8020/user/hive/warehouse/interview.db/page_views
18.1 M   18.1 M hdfs://hadoop000:8020/user/hive/warehouse/interview.db/page_views

hive設定壓縮:
hive> set hive.exec.compress.output=true;
hive> set hive.exec.compress.output;
hive.exec.compress.output=true
hive> set mapreduce.output.fileoutputformat.compress.codec;
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec; -- 預設是DefaultCodec
hive> set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZIp2Codec; -- 我們改為BZIp2Codec

建立另一張表:會運作一個MR作業
hive> create table page_views_bzip2    
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
as select * from page_views;

再查這張表的大小:
[[email protected] data]$ hadoop fs -du -s hdfs://hadoop000:8020/user/hive/warehouse/interview.db/page_views_bzip2
3.6 M   3.6 M hdfs://hadoop000:8020/user/hive/warehouse/interview.db/page_views_bzip2
           

也可以在建表的時候進行設定,一般選擇orcfile/parquet + snappy 的方式比較好:

create table tablename (
 xxx,string
 xxx, bigint
)
ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress" = "SNAPPY")
           

f.整合Spark的使用:

參考spark2.4.1版本來講解:spark 壓縮分為3塊,rdd壓縮、broadcast壓縮和spark sql的壓縮

rdd 壓縮:spark.rdd.compress。

用來壓縮rdd的分區,預設值為false。用來一想也是,rdd的分區緩存在記憶體中,我們本身就是想使用記憶體來加速,壓縮和相應的解壓縮反而浪費了時間。預設的壓縮格式是用的spark.io.compression.codec配置的值。

RDD輸出壓縮檔案:

import org.apache.hadoop.io.compress.BZip2Codec
// bzip2 壓縮率最高,壓縮解壓速度較慢,支援split。
rdd.saveAsTextFile("codec/bzip2",classOf[BZip2Codec])

import org.apache.hadoop.io.compress.SnappyCodec
//snappy json文本壓縮率 38.2%,壓縮和解壓縮時間短。
rdd.saveAsTextFile("codec/snappy",classOf[SnappyCodec])

import org.apache.hadoop.io.compress.GzipCodec
//gzip 壓縮率高,壓縮和解壓速度較快,不支援split,如果不對檔案大小進行控制,下次分析可能可能會造成效率低下的問題。
// rdd..saveAsTextFile(path,GzipCodec.class); 
rdd.saveAsTextFile("codec/gzip",classOf[GzipCodec])
           

broadcast壓縮:spark.broadcast.compress

壓縮發送的廣播變量,預設壓縮。壓縮格式使用spark.io.compression.codec 配置的值。想想也合理,壓縮廣播變量,可以大大的節省帶寬和IO,節省效率。

spark sql中的壓縮:spark.sql.parquet.compression.codec和spark.sql.orc.compression.codec snappy

上面兩個配置說的就是寫parquet 和orc 檔案預設都是使用snappy格式壓縮的。

spark sql中的壓縮指的比如spark sql建表 spark sql寫檔案等操作。這塊的配置在官方文檔沒有,這邊順帶說聲spark sql 想檢視相關配置應該怎麼做:打開spark shell 使用

spark.sql(“SET -v”).write.saveAsTable(“demo”);

把這些spark sql的配置直接存成一張表也友善檢視,不然80多個有點多。

// spark sql 輸出壓縮檔案:parquet檔案壓縮
// parquet為檔案提供了列式存儲,查詢時隻會取出需要的字段和分區,對IO性能的提升非常大,同時占用空間較小,即使是parquet的uncompressed存儲方式也比普通的文本要小的多。
// 預設值snappy.Acceptable values include: none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.

sparkConf.set("spark.sql.parquet.compression.codec","gzip")
dataset.write().parquet("path");
           
val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
    .doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
      "`parquet.compression` is specified in the table-specific options/properties, the " +
      "precedence would be `compression`, `parquet.compression`, " +
      "`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
      "snappy, gzip, lzo, brotli, lz4, zstd.")
    .stringConf
    .transform(_.toLowerCase(Locale.ROOT))
    .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
    .createWithDefault("snappy");
           
// orc:預設值snappy.Acceptable values include: none, uncompressed, snappy, zlib, lzo

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
    .doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
      "`orc.compress` is specified in the table-specific options/properties, the precedence " +
      "would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
      "Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
    .stringConf
    .transform(_.toLowerCase(Locale.ROOT))
    .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
    .createWithDefault("snappy");
           

參考:

SparkSQL的幾種輸出格式及壓縮方式

https://www.cnblogs.com/yyy-blog/p/12747133.html

spark的壓縮使用和簡單介紹

在Spark程式中使用壓縮

2.執行層面調優:

(1)推測執行:

參考官網:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

大資料篇--SQL on Hadoop

  在分布式叢集環境下,因為程式Bug(包括Hadoop本身的bug),負載不均衡(比如作業在一個負載比較高的機器上運作會比較慢一些)或者資源分布不均(比如作業在一個低配的機器上運作就會比較慢一些)等原因,會造成同一個作業的多個任務之間運作速度不一緻,有些任務的運作速度可能明顯慢于其他任務(比如一個作業的某個任務進度隻有50%,而其他所有任務已經運作完畢),則這些任務會拖慢作業的整體執行進度。

  為了避免這種情況發生,Hadoop采用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖後腿”的任務,并為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份資料,并最終選用最先成功運作完成任務的計算結果作為最終結果,原有任務和新任務哪個先執行完就把另外一個kill掉。

設定開啟推測執行參數:Hadoop的mapred-site.xml檔案中進行配置,預設是true:

<property>
    <name>mapreduce.map.speculative</name>
    <value>true</value>
    <description>If true, then multiple instances of some map tasks 
    may be executed in parallel.
    </description>
</property>

<property>
    <name>mapreduce.reduce.speculative</name>
    <value>true</value>
    <description>If true, then multiple instances of some reduce tasks 
    may be executed in parallel.
    </description>
</property>
           

hive本身也提供了配置項來控制reduce-side的推測執行,預設是true:

<property>
    <name>hive.mapred.reduce.tasks.speculative.execution</name>
    <value>true</value>
    <description>Whether speculative execution for reducers 
    should be turned on.     
    </description>
</property>
           

  在測試環境下我們都把應用程式測試OK了,如果還加上推測執行,如果有一個資料分片本來就會發生資料傾斜,執行時間就是比其他的時間長,那麼hive就會把這個執行時間長的job當作運作失敗,繼而又産生一個相同的job去運作,後果可想而知,可通過如下設定關閉推測執行:

set mapreduce.map.speculative=false
set mapreduce.reduce.speculative=false
set hive.mapred.reduce.tasks.speculative.execution=false
           
(2)并行執行:
大資料篇--SQL on Hadoop

  當一個sql中有多個job時候,且這多個job之間沒有依賴,則可以讓順序執行變為并行執行(一般為用到union all );

// 開啟任務并行執行
set hive.exec.parallel=true;
// 同一個sql允許并行任務的最大線程數 
set hive.exec.parallel.thread.number=8;
           
(3)JVM重用:

  JVM重用是Hadoop調優參數的内容,其對Hive的性能具有非常大的影響,特别是對于很難避免小檔案的場景或task特别多的場景,這類場景大多數執行時間都很短。

  Hadoop的預設配置通常是使用派生JVM來執行map和Reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成百上千task任務的情況。JVM重用可以使得JVM執行個體在同一個job中重新使用N次。N的值可以在Hadoop的mapred-site.xml檔案中進行配置。通常在10-20之間,具體多少需要根據具體業務場景測試得出。

<property>
  <name>mapreduce.job.jvm.numtasks</name>
  <value>10</value>
  <description>How many tasks to run per jvm. If set to -1, there is
  no limit. 
  </description>
</property>
           

  我們也可以在hive當中通過:

set mapred.job.reuse.jvm.num.tasks=10;

設定我們的jvm重用。

  當然,這個功能也是有它的缺點的。開啟JVM重用将一直占用使用到的task插槽,以便進行重用,直到任務完成後才能釋放。如果某個“不平衡的”job中有某幾個reduce task執行的時間要比其他Reduce task消耗的時間多的多的話,那麼保留的插槽就會一直空閑着卻無法被其他的job使用,直到所有的task都結束了才會釋放。

繼續閱讀