天天看點

Join時資料類型不一緻導緻的傾斜

  本篇以hive sql解析器來讨論問題,spark sql 的處理方法類似,大家可自行測試。

  在進行join操作時,有mapjoin和hashjoin兩個大類。mapjoin需要的是一個大表和一個小表進行join,小表存于記憶體中,對大表進行周遊,不會産生資料傾斜。

  如果是大表join大表,在記憶體中放不下,便會對兩張表join的字段求hash值,然後将hash值相同的資料放入同一個reduce或者同一個spark分區進行處理,這樣join條件相同的内容就能放在一起處理了。若得到的某個hash值比例過大,全部進入一個分區,資料傾斜就形成了。

  有這樣兩張表大表A 、B ,兩張表裡都有pid字段。但A表中的pid字段為bigint類型,而B表中的pid字段為string類型。

  執行的sql為:

  正常情況下,這樣一條簡單的sql并不會産生資料傾斜。但如果在B表中,有大量的pid是超過bigint範圍的資料,如:‘10000000000000000000000000001’,‘10000000000000000000000000002’ … 這時就可能産生資料傾斜問題。

  2個大表進行join時,會将join的内容求hash值,這樣才能将join上的資料放在一起處理。但A表和B表中join的字段,資料類型是不同的。這時hive會先進行類型轉換,再求hash值,那hive會将資料類型都轉成bigint還是string呢。這個還要看hive的版本,不同的版本處理方式也不同,在一些版本中,hive會将兩個字段都轉換為bigint。這時,B表中那些超過範圍的資料,轉換就會出問題。

  我們來看看在hive中不同版本的測試結果:

  2.x

Join時資料類型不一緻導緻的傾斜

  3.x

Join時資料類型不一緻導緻的傾斜

  不管是哪個版本,超過範圍的數值在轉換為bigint時都會變成相同的結果。如果對這些資料求hash值,得出的結果也都是一樣的。這樣的資料會被配置設定到同一個分區進行處理,資料傾斜就可能形成。

解決方法1:

  手動将資料類型不一緻的字段轉換為string類型

解決方法2:

  參考下面的複雜join條件傾斜解決。

  業務背景

  trackinfo與pm_info兩張表均為GB級别,左關聯代碼塊如下:

from trackinfo a
	left outer join pm_info b
	on (a.ext_field7 = b.id)
           

  使用以上代碼塊需要耗時1.5小時。

  第一次優化 (這就是上面說到的資料類型不一緻的問題)

  考慮到pm_info表的id是bigint類型,trackinfo表的ext_field7是string類型,其關聯時資料類型不一緻,預設的hash操作會按bigint型的id進行配置設定,這樣會導緻所有string類型的ext_field7集中到一個reduce裡面,是以,改為如下:

from trackinfo a
	left outer join pm_info b
	on (cast(a.ext_field7 as bigint) = b.id)
           

  改動為上面代碼後,效果仍然不理想,耗時為1.5小時。

  第二次優化 (平時我們可能也就考慮到null值問題,對null進行過濾,進一步也可以像他這樣進行更加細緻的過濾)

  考慮到trackinfo表的ext_field7字段缺失率很高(為空、字段長度為零、字段填充了非整數)情況,做進行左關聯時空字段的關聯操作實際上沒有意義,是以,如果左表關聯字段ext_field7為無效字段,則不需要關聯,是以,改為如下:

from trackinfo a
	left outer join pm_info b
	on (a.ext_field7 is not null
	and length(a.ext_field7) > 0
	and a.ext_field7 rlike ‘1+$’
	and a.ext_field7 = b.id)
           

  上面代碼塊的作用是,如果左表關聯字段ext_field7為無效字段時(為空、字段長度為零、字段填充了非整數),不去關聯右表,由于空字段左關聯以後取到的右表字段仍然為null,是以不會影響結果。改動為上面代碼後,效果仍然不理想,耗時為50分鐘。

  第三次優化

  第二次優化效果效果不理想的原因,其實是在左關聯中,雖然設定了左表關聯字段為空不去關聯右表,但是這樣做,左表中未關聯的記錄(ext_field7為空)将會全部聚集在一個reduce中進行處理,展現為reduce進度長時間處在99%。

  為了保證能join上的資料放在一個分區或一個reduce進行處理,不會簡單的隻對字段進行求hash值,而是會對join的條件求hash值。;比如:on (A.id+1) = (b.id+1) 就會對a.id+1 和 b.id+1 的結果求hash值;

  在第二次優化時,采用了複雜的join條件,将對trackinfo表的ext_field7的字段過濾放在了前面,如果該字段被過濾掉,都不用再判斷最後是否相等的邏輯。是以被過濾掉的字段,傳回的hash值也是相同的。會進入同一個分區。

  換一種思路,解決辦法的突破點就在于如何把左表的未關聯記錄的key盡可能打散,是以可以這麼做:若左表關聯字段無效(為空、字段長度為零、字段填充了非整數),則在關聯前将左表關聯字段設定為一個随機數,再去關聯右表,這麼做的目的是即使是左表的未關聯記錄,它的key也分布得十分均勻

from trackinfo a
	left outer join pm_info b
	on (
		case when (a.ext_field7 is not null
		and length(a.ext_field7) > 0
		and a.ext_field7 rlike ‘2+$’)
		then
		cast(a.ext_field7 as bigint)
		else
		cast(ceiling(rand() * -65535) as bigint)
		end = b.id
	)
           

  第三次改動後,耗時從50分鐘降為了1分鐘32秒,效果顯著!

繼續閱讀