天天看点

spark利用cache优化shuffle一.几种缓存方法二.缓存结果三.一些参数四.注意

cache表,数据放内存,数据被广播到Executor,

将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。

如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升,这个过程是map-side-join。

reduce-side-join 的缺陷在于会将key相同的数据发送到同一个partition中进行运算,大数据集的传输需要长时间的IO,同时任务并发度收到限制,还可能造成数据倾斜。

reduce-side-join 运行图如下

spark利用cache优化shuffle一.几种缓存方法二.缓存结果三.一些参数四.注意

map-side-join 运行图如下

spark利用cache优化shuffle一.几种缓存方法二.缓存结果三.一些参数四.注意

一.几种缓存方法

1.CACHE TABLE

//缓存全表
sqlContext.sql("CACHE TABLE activity")
//缓存过滤结果
sqlContext.sql("CACHE TABLE activity_cached as select * from activity where ...")
           

CACHE TABLE 是即时生效的,如果你想等到一个action操作再缓存数据可以使用 CACHE LAZY TABLE,这样操作会直到一个 action 操作才被触发,例如 count(*)

sqlContext.sql("CACHE LAZY TABLE ...")
           

取消hive表缓存数据

sqlContext.sql("UNCACHE TABLE activity")
           

示例:

spark利用cache优化shuffle一.几种缓存方法二.缓存结果三.一些参数四.注意

我们也需要注意cacheTable与uncacheTable的使用时机,cacheTable主要用于缓存中间表结果,它的特点是少量数据且被后续计算(SQL)频繁使用;如果中间表结果使用完毕,我们应该立即使用uncacheTable释放缓存空间,用于缓存其它数据

2.将dataFrame注册成表并缓存

val df = sqlContext.sql("select * from activity")
df.registerTempTable("activity_cached")
sqlContext.cacheTable("activity_cached")
 
Tip:cacheTable操作是lazy的,需要一个action操作来触发缓存操作。
           

对应的uncacheTable可以取消缓存

sqlContext.uncacheTable("activity_cached")
           

3.缓存dataFrame

val df = sqlContext.sql("select * from tableName")
df.cache()
           

二.缓存结果

缓存时看到如下提示:

Added rdd_xx_x in memory on ...
           

如果内存不足,则会存入磁盘中,提示如下:

Added rdd_xx_x on disk on ...
           

缓存数据后可以在Storage上看到缓存的数据

spark利用cache优化shuffle一.几种缓存方法二.缓存结果三.一些参数四.注意

三.一些参数

spark.sql.autoBroadcastJoinThreshold
           

该参数默认为10M,在进行join等聚合操作时,将小于该值的表broadcast到每台worker,消除了大量的shuffle操作。

spark.rdd.compress true
           

将rdd存入mem或disk前再进行一次压缩,效果显著,我使用cacheTable了一张表,没有开启该参数前总共cache了54G数据,开启这个参数后只34G,可是执行速度并没有收到太大的影响。

spark.sql.shuffle.partitions
           

这个参数默认为200,是join等聚合操作的并行度,如果有大量的数据进行操作,造成单个任务比较重,运行时间过长的时候,会报如下的错误:

org.apache.spark.shuffle.FetchFailedException: Connection from /192.168.xx.xxx:53450 closed
           

这个时候需要提高该值。

四.注意

  • cache 的表不一定会被广播到Executor,执行map side join!!!
  • 有另外一个参数:spark.sql.autoBroadcastJoinThreshold 会判断是否将该表广播;
  • spark.sql.autoBroadcastJoinThreshold参数默认值是10M,所以只有cache的表小于10M的才被广播到Executor上去执行map side join,因此要特别要注意,因此在选择cache表的时候,要注意表的大小和spark.sql.autoBroadcastJoinThreshold参数的调整。如果内存比较充足,建议调大该参数。

继续阅读