Join 背景介紹
Join 是資料庫查詢永遠繞不開的話題,傳統查詢 SQL 技術總體可以分為簡單操作(過濾操作、排序操作 等),聚合操作-groupby 以及 Join 操作等。其中 Join 操作是最複雜、代價最大的操作類型,也是 OLAP 場景中使用相對較多的操作。
另外,從業務層面來講,使用者在數倉建設的時候也會涉及 Join 使用的問題。通常情況下,資料倉庫中的表一般會分為“低層次表”和“高層次表”。
- 所謂“低層次表”:就是資料源導入數倉之後直接生成的表,單表列值較少,一般可以明顯歸為次元表或事實表,表和表之間大多存在外健依賴,是以查詢起來會遇到大量 Join 運算,查詢效率很差。
- “高層次表”:是在“低層次表”的基礎上加工轉換而來,通常做法是使用 SQL 語句将需要 Join 的表預先進行合并形成“寬表”,在寬表上的查詢不需要執行大量 Join,效率很高。但寬表缺點是資料會有大量備援,且相對生成較滞後,查詢結果可能并不及時。
為了獲得時效性更高的查詢結果,大多數場景都需要進行複雜的 Join 操作。Join 操作之是以複雜,主要是通常情況下其時間空間複雜度高,且有很多算法,在不同場景下需要選擇特定算法才能獲得最好的優化效果。
本文将介紹 SparkSQL 所支援的幾種常見的 Join 算法及其适用場景。
基本實作機制
shuffle hash join、broadcast hash join 兩者歸根到底都屬于 hash join,隻不過在 hash join 之前需要先 shuffle 還是先 broadcast。其實,hash join 算法來自于傳統資料庫,而 shuffle 和 broadcast 是大資料的皮(分布式),兩者一結合就成了大資料的算法了。是以可以說,大資料的根就是傳統資料庫。既然 hash join 是“核心”,那就刨出來看看,看完把“皮”再分析一下。
1、hash join
先來看看這樣一條 SQL 語句:
- select * from order,item where item.id = order.i_id
很簡單一個 Join 節點,參與 join 的兩張表是 item 和 order,join key 分别是 item.id 以及 order.i_id。現在假設這個 Join 采用的是 hash join 算法,整個過程會經曆三步:
-
join分析:shuffle hash join、broadcast hash join
分步解釋:
- 1. 确定 Build Table 以及 Probe Table:這個概念比較重要,Build Table 使用 join key 建構 Hash Table,而 Probe Table 使用 join key 進行探測,探測成功就可以 join 在一起。通常情況下,小表會作為 Build Table,大表作為 Probe Table。此事例中 item 為 Build Table,order 為 Probe Table。
- 2. 建構 Hash Table:依次讀取 Build Table(item)的資料,對于每一行資料根據 join key(item.id)進行 hash,hash 到對應的 Bucket,生成 hash table 中的一條記錄。資料緩存在記憶體中,如果記憶體放不下需要 dump 到外存。
- 3. 探測:再依次掃描 Probe Table(order)的資料,使用相同的 hash 函數映射 Hash Table 中的記錄,映射成功之後再檢查 join 條件(item.id = order.i_id),如果比對成功就可以将兩者 join 在一起。
基本流程可以參考上圖,這裡有兩個小問題需要關注:
1. hash join 性能如何?
- 很顯然,hash join 基本都隻掃描兩表一次,可以認為 o(a+b),較之最極端的笛卡爾集運算 a*b,不知甩了多少條街。
2. 為什麼 Build Table 選擇小表?
- 道理很簡單,因為建構的 Hash Table 最好能全部加載在記憶體,效率最高;這也決定了 hash join 算法隻适合至少一個小表的 join 場景,對于兩個大表的 join 場景并不适用。
上文說過,hash join 是傳統資料庫中的單機 join 算法,在分布式環境下需要經過一定的分布式改造,就是盡可能利用分布式計算資源進行并行化計算,提高總體效率。hash join 分布式改造一般有兩種經典方案:
1.broadcast hash join
- 将其中一張小表廣播分發到另一張大表所在的分區節點上,分别并發地與其上的分區記錄進行 hash join。broadcast 适用于小表很小,可以直接廣播的場景。
2. shuffler hash join
- 一旦小表資料量較大,此時就不再适合進行廣播分發。這種情況下,可以根據 join key 相同必然分區相同的原理,将兩張表分别按照 join key 進行重新組織分區,這樣就可以将 join 分而治之,劃分為很多小 join,充分利用叢集資源并行化。
broadcast hash join
如下圖所示,broadcast hash join 可以分為兩步:
-
join分析:shuffle hash join、broadcast hash join
1.broadcast 階段:
- 将小表廣播分發到大表所在的所有主機。廣播算法可以有很多,最簡單的是先發給 driver,driver 再統一分發給所有 executor;要不就是基于 BitTorrent 的 TorrentBroadcast。
2. hash join 階段:
- 在每個 executor 上執行單機版 hash join,小表映射,大表試探。
SparkSQL 規定 broadcast hash join 執行的基本條件為被廣播小表必須小于參數 spark.sql.autoBroadcastJoinThreshold,預設為 10M。
shuffle hash join
在大資料條件下如果一張表很小,執行 join 操作最優的選擇無疑是 broadcast hash join,效率最高。但是一旦小表資料量增大,廣播所需記憶體、帶寬等資源必然就會太大,broadcast hash join 就不再是最優方案。此時可以按照 join key 進行分區,根據 key 相同必然分區相同的原理,就可以将大表 join 分而治之,劃分為很多小表的 join,充分利用叢集資源并行化。如下圖所示
-
join分析:shuffle hash join、broadcast hash join
shuffle hash join 也可以分為兩步:
1.shuffle 階段:
- 分别将兩個表按照 join key 進行分區,将相同 join key 的記錄重分布到同一節點,兩張表的資料會被重分布到叢集中所有節點。這個過程稱為 shuffle。
- 每個分區節點上的資料單獨執行單機 hash join 算法。
看到這裡,可以初步總結出來如果兩張小表 join 可以直接使用單機版 hash join;如果一張大表 join 一張極小表,可以選擇 broadcast hash join 算法;而如果是一張大表 join 一張小表,則可以選擇 shuffle hash join 算法;那如果是兩張大表進行 join 呢?
sort merge join
SparkSQL 對兩張大表 join 采用了全新的算法-sort-merge join,如下圖所示
-
join分析:shuffle hash join、broadcast hash join
整個過程分為三個步驟:
1. shuffle 階段:
- 将兩張大表根據 join key 進行重新分區,兩張表資料會分布到整個叢集,以便分布式并行處理。
2. sort 階段:
- 對單個分區節點的兩表資料,分别進行排序。
3. merge 階段:
- 對排好序的兩張分區表資料執行 join 操作。join 操作很簡單,分别周遊兩個有序序列,碰到相同 join key 就 merge 輸出,否則取更小一邊。如下圖所示:
-
join分析:shuffle hash join、broadcast hash join
經過上文的分析,很明顯可以得出來這幾種 Join 的代價關系:cost(broadcast hash join) < cost(shuffle hash join) < cost(sort merge join),資料倉庫設計時最好避免大表與大表的 join 查詢,SparkSQL 也可以根據記憶體資源、帶寬資源适量将參數 spark.sql.autoBroadcastJoinThreshold 調大,讓更多 join 實際執行為 broadcast hash join。
- https://www.6aiq.com/article/1533984288407