天天看点

整点你不会的新名词:Spark ESS

整点你不会的新名词:Spark ESS

编 辑:LCP

彭友们好,我是老彭啊。最近写技术文章写的少了,今天偷个懒,让L(老)C(*)P(*)给大家讲讲Spark动态分配的ESS(external shuffle service)。

我们在学习spark的时候都知道shuffle,可external shuffle又是啥玩意?我们从字面意思上理解是一个外部shuffle的服务。

那这个外部shuffle有啥用?本文就针对这几个疑点展开来讨论,在上主菜之前,我们先来复习下在平时学习spark中的shuffle是怎么一回事。

整点你不会的新名词:Spark ESS

回顾shuffle

在日常工作当中经常要和shuffle模块打交道,而且还经常做一些优化工作,所以学习这个模块会非常痛苦,可能对于一些数据开发的彭友来说,一提到shuffle就会有点怕。让人更加痛苦的是在面试中也经常会被问到。

其实大家需要知道的一点就是:无论是MR中的Shuffle流程,还是Spark中的Shuffle流程,都包含Shuffle读和Shuffle写这两块,其他的就是细节的东西。

整点你不会的新名词:Spark ESS

大家在入门学习Hadoop的时候应该都看到过上图,我之前有详细写过Shuffle的过程。其目的是想让大家有一个印象:在MR引擎模式下,其shuffle要经过分区、排序、合并操作,然后再由reduce端拉取进行下一步的处理。

对于Spark Shuffle,LCP从源码层面把里面的一些比较关键的类关系进行了整理分享给大家,并围绕着这张图展开来简单介绍下shuffle(涉及到几个很重要的Shuffle类,墙裂建议大家看看):

整点你不会的新名词:Spark ESS

这张图里涉及到一个很关键的类,也是学习Shuffle的入口即ShuffleManager。从下图的注释中我们可以了解到这个Shuffle系统的可插拔接口的创建方式以及类型控制,并且在driver端和每个executor上都会被创建。

整点你不会的新名词:Spark ESS

ShuffleManager从字面意思上也可以知道其作用是作为Shuffle系统的大总管,executor端在shuffle过程中的读写都要基于shuffleManager来提供处理器。

ok,那我们来看看executor端是怎么和ShuffleManager进行交互的。这里你需要知道的一点就是既然有shuffle发生,那么这个阶段的stage肯定是ShuffleMapStage,对应的task类型就是ShuffleMapTask。

整点你不会的新名词:Spark ESS

下图为ShuffleMapTask类中具体执行task中的一小部分细节,从图中代码中可以看出是先拿到ShuffleManager,然后获取到ShuffleWriter将中间数据写入到文件中(终于扯到写文件了)。

整点你不会的新名词:Spark ESS

底层细节的东西不在该文中展现,这里只是想告诉大家,executor端的shuffle中间结果是写入到本地磁盘中的(当然也会写入到内存中),其中文件的路径地址是根据spark.local.dir来的(前提是你得设置这个参数)

整点你不会的新名词:Spark ESS

而且根据shuffle类型的不同,其也有可能会生成索引文件。文件的格式如下图:

整点你不会的新名词:Spark ESS

以上就是关于shuffle写的几个小细节。

其实以上的内容都是为了引出executor会把shuffle结果落入到本地磁盘中这一结论。那么这种方式有什么问题呢?

在介绍动态分配的文章中也提及过,如果executor被释放掉,那么对应的shuffle文件也会被删除,这样的话其他executor就无非拉取到这部分的shuffle文件了。

如果executor进程负载过高,或者发生GC的时候不能为给其他executor提供shuffle数据,那么最后就会导致重算拉长了任务产出,影响任务稳定性。

整点你不会的新名词:Spark ESS
整点你不会的新名词:Spark ESS

解决办法:ESS

为了解决刚才提到的问题,社区引入了一个External Shuffle Service(下文中简称为ess)服务,该服务运行在每一个nodemanager节点上,而且是常驻的进程,用来管理该节点上每个executor 在shuffle writer过程中生成的中间数据。

所以和executor的生命周期还不太一样。(从类注释上也可以很容易理解其作用)

整点你不会的新名词:Spark ESS

那么ess在底层是怎么来管理这些shuffle文件的呢?

1、首先要先启动该服务,必须要设置spark.shuffle.service.enabled参数为true;

2、而且还要设置spark.shuffle.service.port端口,默认为7337

3、既然有了ExternalShuffleServer服务端,那么肯定会有客户端与之通信(可以看看ExternalShuffleClient),在客户端的逻辑中可以发现会把executor注册到ess服务上

整点你不会的新名词:Spark ESS

同时也包含shuffle file的存储信息,那么这样一来ess只需要管理shuffle file的地址即可

整点你不会的新名词:Spark ESS

最后通过一张图来总结一下:

整点你不会的新名词:Spark ESS

关于如何配置ESS的手册,大家可以看下官网,这里截取一部分:

整点你不会的新名词:Spark ESS
整点你不会的新名词:Spark ESS

扩展及Git工程

ESS的特点是相当于启动了一个executor的代理服务来管理shuffle文件,但文件还是落在本地了。那能不能把shuffle文件存放到远程存储系统呢?这个问题也是群友发起的(这一个问题让我直接写了三篇文章)

整点你不会的新名词:Spark ESS

后来在SPARK-25299上看到了一些关于ESS的缺陷:

1、在前面我们提到ESS是在yarn上的每一个nodemanager上启动一个进程服务,可是如果这个节点或者服务不可用的话,那么还是会重新计算丢失的shuffle块

2、第二个缺陷是关于资源浪费的问题,如果没有应用程序使用ess服务,那么就是浪费资源了

3、最后一个问题是spark on k8s的场景,executor写入的shuffle数据可能在一个容器 内,但在实际情况中会有一些权限隔离问题,不一定能被ess所读取到。

基于以上这三点痛点,把shuffle数据写入到外部存储系统可能确实是比较好的解决方案,而且spark3也在这块发力,但其性能是否会受影响(目前也是在讨论中),相信社区也有相关的优化方案。

我们在查看issue的时候,发现也有贡献者将远程存储shuffle独立出一个工程(可以支持将shuffle数据写入到hadoop文件系统中,将存储和计算进行分离;并结合DAOS 提升shuffle性能)。

整点你不会的新名词:Spark ESS

工程在这里:https://github.com/oap-project/remote-shuffle。如果大家有更好的方案,并乐于交流的话,可以分享下。感兴趣的朋友可以自己研究一下这个工程。

整点你不会的新名词:Spark ESS

LCP最后要说的话

感谢大家能够认真的阅读完这篇文章,如果你有自己的见解,欢迎加入彭友会一起来探讨。如果你有工作上的一些困扰,也非常欢迎你来骚扰我们。

整点你不会的新名词:Spark ESS

继续阅读