天天看点

Join时数据类型不一致导致的倾斜

  本篇以hive sql解析器来讨论问题,spark sql 的处理方法类似,大家可自行测试。

  在进行join操作时,有mapjoin和hashjoin两个大类。mapjoin需要的是一个大表和一个小表进行join,小表存于内存中,对大表进行遍历,不会产生数据倾斜。

  如果是大表join大表,在内存中放不下,便会对两张表join的字段求hash值,然后将hash值相同的数据放入同一个reduce或者同一个spark分区进行处理,这样join条件相同的内容就能放在一起处理了。若得到的某个hash值比例过大,全部进入一个分区,数据倾斜就形成了。

  有这样两张表大表A 、B ,两张表里都有pid字段。但A表中的pid字段为bigint类型,而B表中的pid字段为string类型。

  执行的sql为:

  正常情况下,这样一条简单的sql并不会产生数据倾斜。但如果在B表中,有大量的pid是超过bigint范围的数据,如:‘10000000000000000000000000001’,‘10000000000000000000000000002’ … 这时就可能产生数据倾斜问题。

  2个大表进行join时,会将join的内容求hash值,这样才能将join上的数据放在一起处理。但A表和B表中join的字段,数据类型是不同的。这时hive会先进行类型转换,再求hash值,那hive会将数据类型都转成bigint还是string呢。这个还要看hive的版本,不同的版本处理方式也不同,在一些版本中,hive会将两个字段都转换为bigint。这时,B表中那些超过范围的数据,转换就会出问题。

  我们来看看在hive中不同版本的测试结果:

  2.x

Join时数据类型不一致导致的倾斜

  3.x

Join时数据类型不一致导致的倾斜

  不管是哪个版本,超过范围的数值在转换为bigint时都会变成相同的结果。如果对这些数据求hash值,得出的结果也都是一样的。这样的数据会被分配到同一个分区进行处理,数据倾斜就可能形成。

解决方法1:

  手动将数据类型不一致的字段转换为string类型

解决方法2:

  参考下面的复杂join条件倾斜解决。

  业务背景

  trackinfo与pm_info两张表均为GB级别,左关联代码块如下:

from trackinfo a
	left outer join pm_info b
	on (a.ext_field7 = b.id)
           

  使用以上代码块需要耗时1.5小时。

  第一次优化 (这就是上面说到的数据类型不一致的问题)

  考虑到pm_info表的id是bigint类型,trackinfo表的ext_field7是string类型,其关联时数据类型不一致,默认的hash操作会按bigint型的id进行分配,这样会导致所有string类型的ext_field7集中到一个reduce里面,因此,改为如下:

from trackinfo a
	left outer join pm_info b
	on (cast(a.ext_field7 as bigint) = b.id)
           

  改动为上面代码后,效果仍然不理想,耗时为1.5小时。

  第二次优化 (平时我们可能也就考虑到null值问题,对null进行过滤,进一步也可以像他这样进行更加细致的过滤)

  考虑到trackinfo表的ext_field7字段缺失率很高(为空、字段长度为零、字段填充了非整数)情况,做进行左关联时空字段的关联操作实际上没有意义,因此,如果左表关联字段ext_field7为无效字段,则不需要关联,因此,改为如下:

from trackinfo a
	left outer join pm_info b
	on (a.ext_field7 is not null
	and length(a.ext_field7) > 0
	and a.ext_field7 rlike ‘1+$’
	and a.ext_field7 = b.id)
           

  上面代码块的作用是,如果左表关联字段ext_field7为无效字段时(为空、字段长度为零、字段填充了非整数),不去关联右表,由于空字段左关联以后取到的右表字段仍然为null,所以不会影响结果。改动为上面代码后,效果仍然不理想,耗时为50分钟。

  第三次优化

  第二次优化效果效果不理想的原因,其实是在左关联中,虽然设置了左表关联字段为空不去关联右表,但是这样做,左表中未关联的记录(ext_field7为空)将会全部聚集在一个reduce中进行处理,体现为reduce进度长时间处在99%。

  为了保证能join上的数据放在一个分区或一个reduce进行处理,不会简单的只对字段进行求hash值,而是会对join的条件求hash值。;比如:on (A.id+1) = (b.id+1) 就会对a.id+1 和 b.id+1 的结果求hash值;

  在第二次优化时,采用了复杂的join条件,将对trackinfo表的ext_field7的字段过滤放在了前面,如果该字段被过滤掉,都不用再判断最后是否相等的逻辑。所以被过滤掉的字段,返回的hash值也是相同的。会进入同一个分区。

  换一种思路,解决办法的突破点就在于如何把左表的未关联记录的key尽可能打散,因此可以这么做:若左表关联字段无效(为空、字段长度为零、字段填充了非整数),则在关联前将左表关联字段设置为一个随机数,再去关联右表,这么做的目的是即使是左表的未关联记录,它的key也分布得十分均匀

from trackinfo a
	left outer join pm_info b
	on (
		case when (a.ext_field7 is not null
		and length(a.ext_field7) > 0
		and a.ext_field7 rlike ‘2+$’)
		then
		cast(a.ext_field7 as bigint)
		else
		cast(ceiling(rand() * -65535) as bigint)
		end = b.id
	)
           

  第三次改动后,耗时从50分钟降为了1分钟32秒,效果显著!

继续阅读