概要
Spark RPC概述中介绍了Spark PRC抽象接口RpcEndpointRef 、RpcEndpoint的用法,RpcEndpointVerifier的作用是,当RpcEndpointRef访问对应的RpcEndpoint前,判断RpcEndpoint是否存在。
定义
RpcEndpointVerifier定义如上图,主要有两点
- RpcEndpointVerifier也是RpcEndpoint的实现类,就意味着和Master、Worker等RpcEndpoint实现类处理消息的流程一致,具体参考这两篇Spark RPC之RpcRequest请求处理流程、 Spark RPC之RpcResponse处理,后面不再介绍。
- 定义了消息类型CheckExistence。
请求流程
上图的Spark PRC例子主要做了以下几件事
- 代码82行,此处省略了env对象的创建,创建env调用RpcEnv.create方法,同时底层会注册RpcEndpointVerifier到Dispatcher,名字为endpoint-verifier,服务client端请求。
- 82-88行,server端注册对象RpcEndpoint,名字为send-remotely。
- 90行,创建client端RpcEnv对象。
- 91行,获取RpcEndpointRef对象,此对象用来和步骤1中注册的名称为send-remotely的RpcEndpoint通信,底层NettyRpcEnv发送消息RpcEndpointVerifier.CheckExistence(“send-remotely”)给server端,server端使用步骤1中注册的RpcEndpointVerifier处理,返回RpcEndpoint是否存在。
- 93行,进行通信。
server端注册RpcEndpointVerifier
如上面步骤1描述,创建RpcEnv对象时底层会注册RpcEndpointVerifier到Dispatcher,以deploy模块中Master类创建RpcEnv为例,流程如下
NettyRpcEnv的startServer方法中注册RpcEndpointVerifier,源码如下
至此,server端完成了RpcEndpointVerifier的注册,后续对于client请求的处理,参考Spark RPC之RpcRequest请求处理流程、 Spark RPC之RpcResponse处理。
client发请求CheckExistence
如Spark PRC例子中步骤5描述,调用setupEndpointRef方法会发送CheckExistence消息,检查RpcEndpoint是否存在,简单流程如下
NettyRpcEnv.asyncSetupEndpointRefByURI方法中发送消息CheckExistence的源码如下
到这里,client发送消息的过程就结束了,此外,ask方法发送信息到server的详细流程和server端如何利用注册的RpcEndpointVerifier处理请求,
请参考Spark RPC之RpcRequest请求处理流程、 Spark RPC之RpcResponse处理。
总结
介绍了RpcEndpointVerifier的定义和作用,以及发挥其作用的完整流程,到这里,关于Spark RPC的源码解读也告一段路了,spark-core中RPC模块的代码基本都覆盖到了。