编 辑:LCP
彭友们好,我是老彭啊。最近写技术文章写的少了,今天偷个懒,让L(老)C(*)P(*)给大家讲讲Spark动态分配的ESS(external shuffle service)。
我们在学习spark的时候都知道shuffle,可external shuffle又是啥玩意?我们从字面意思上理解是一个外部shuffle的服务。
那这个外部shuffle有啥用?本文就针对这几个疑点展开来讨论,在上主菜之前,我们先来复习下在平时学习spark中的shuffle是怎么一回事。
回顾shuffle
在日常工作当中经常要和shuffle模块打交道,而且还经常做一些优化工作,所以学习这个模块会非常痛苦,可能对于一些数据开发的彭友来说,一提到shuffle就会有点怕。让人更加痛苦的是在面试中也经常会被问到。
其实大家需要知道的一点就是:无论是MR中的Shuffle流程,还是Spark中的Shuffle流程,都包含Shuffle读和Shuffle写这两块,其他的就是细节的东西。
大家在入门学习Hadoop的时候应该都看到过上图,我之前有详细写过Shuffle的过程。其目的是想让大家有一个印象:在MR引擎模式下,其shuffle要经过分区、排序、合并操作,然后再由reduce端拉取进行下一步的处理。
对于Spark Shuffle,LCP从源码层面把里面的一些比较关键的类关系进行了整理分享给大家,并围绕着这张图展开来简单介绍下shuffle(涉及到几个很重要的Shuffle类,墙裂建议大家看看):
这张图里涉及到一个很关键的类,也是学习Shuffle的入口即ShuffleManager。从下图的注释中我们可以了解到这个Shuffle系统的可插拔接口的创建方式以及类型控制,并且在driver端和每个executor上都会被创建。
ShuffleManager从字面意思上也可以知道其作用是作为Shuffle系统的大总管,executor端在shuffle过程中的读写都要基于shuffleManager来提供处理器。
ok,那我们来看看executor端是怎么和ShuffleManager进行交互的。这里你需要知道的一点就是既然有shuffle发生,那么这个阶段的stage肯定是ShuffleMapStage,对应的task类型就是ShuffleMapTask。
下图为ShuffleMapTask类中具体执行task中的一小部分细节,从图中代码中可以看出是先拿到ShuffleManager,然后获取到ShuffleWriter将中间数据写入到文件中(终于扯到写文件了)。
底层细节的东西不在该文中展现,这里只是想告诉大家,executor端的shuffle中间结果是写入到本地磁盘中的(当然也会写入到内存中),其中文件的路径地址是根据spark.local.dir来的(前提是你得设置这个参数)
而且根据shuffle类型的不同,其也有可能会生成索引文件。文件的格式如下图:
以上就是关于shuffle写的几个小细节。
其实以上的内容都是为了引出executor会把shuffle结果落入到本地磁盘中这一结论。那么这种方式有什么问题呢?
在介绍动态分配的文章中也提及过,如果executor被释放掉,那么对应的shuffle文件也会被删除,这样的话其他executor就无非拉取到这部分的shuffle文件了。
如果executor进程负载过高,或者发生GC的时候不能为给其他executor提供shuffle数据,那么最后就会导致重算拉长了任务产出,影响任务稳定性。
解决办法:ESS
为了解决刚才提到的问题,社区引入了一个External Shuffle Service(下文中简称为ess)服务,该服务运行在每一个nodemanager节点上,而且是常驻的进程,用来管理该节点上每个executor 在shuffle writer过程中生成的中间数据。
所以和executor的生命周期还不太一样。(从类注释上也可以很容易理解其作用)
那么ess在底层是怎么来管理这些shuffle文件的呢?
1、首先要先启动该服务,必须要设置spark.shuffle.service.enabled参数为true;
2、而且还要设置spark.shuffle.service.port端口,默认为7337
3、既然有了ExternalShuffleServer服务端,那么肯定会有客户端与之通信(可以看看ExternalShuffleClient),在客户端的逻辑中可以发现会把executor注册到ess服务上
同时也包含shuffle file的存储信息,那么这样一来ess只需要管理shuffle file的地址即可
最后通过一张图来总结一下:
关于如何配置ESS的手册,大家可以看下官网,这里截取一部分:
扩展及Git工程
ESS的特点是相当于启动了一个executor的代理服务来管理shuffle文件,但文件还是落在本地了。那能不能把shuffle文件存放到远程存储系统呢?这个问题也是群友发起的(这一个问题让我直接写了三篇文章)
后来在SPARK-25299上看到了一些关于ESS的缺陷:
1、在前面我们提到ESS是在yarn上的每一个nodemanager上启动一个进程服务,可是如果这个节点或者服务不可用的话,那么还是会重新计算丢失的shuffle块
2、第二个缺陷是关于资源浪费的问题,如果没有应用程序使用ess服务,那么就是浪费资源了
3、最后一个问题是spark on k8s的场景,executor写入的shuffle数据可能在一个容器 内,但在实际情况中会有一些权限隔离问题,不一定能被ess所读取到。
基于以上这三点痛点,把shuffle数据写入到外部存储系统可能确实是比较好的解决方案,而且spark3也在这块发力,但其性能是否会受影响(目前也是在讨论中),相信社区也有相关的优化方案。
我们在查看issue的时候,发现也有贡献者将远程存储shuffle独立出一个工程(可以支持将shuffle数据写入到hadoop文件系统中,将存储和计算进行分离;并结合DAOS 提升shuffle性能)。
工程在这里:https://github.com/oap-project/remote-shuffle。如果大家有更好的方案,并乐于交流的话,可以分享下。感兴趣的朋友可以自己研究一下这个工程。
LCP最后要说的话
感谢大家能够认真的阅读完这篇文章,如果你有自己的见解,欢迎加入彭友会一起来探讨。如果你有工作上的一些困扰,也非常欢迎你来骚扰我们。