基于google protobuf的RPC engine,必須在伺服器端和用戶端都完成了初始化之後,才能開始通信。在《Hadoop 基于protobuf 的RPC的伺服器端實作原理》這篇博文中,我介紹了RPC 的伺服器端實作,那麼,用戶端是如何基于預先定義的protobuf協定,來與遠端的基于相同的protobuf協定的服務端進行通信的呢?
比如,NodeManger與遠端的ResourceManager進行RPC通信,它們的通信基于ResourceTracker這個RPC協定,協定定義在ResourceTracker.proto檔案中:
service ResourceTrackerService {
rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
}
協定中定義了兩個通信接口,
registerNodeManager
和
nodeHeartbeat
。
registerNodeManager
負責在節點啟的NodeManager啟動的時候向ResourceManger注冊自己,而
nodeHeartbeat
是通過定時心跳的方式,不斷向ResourceManager報告自己的存在,并将自己的狀态彙報給ResourceManager。
在Yarn的基于Master/Slave的設計模式中,
register
思想是最核心的 設計思想。NodeManager被ResourceManager管理,那麼NodeManager在啟動的時候必須向ResourceManager注冊,RPCEngine想要生效,Engine在初始化的時候也必須向ResourceManager注冊。我認為這種設計思想的根本目的,是将主動權交給使用者(Slave)而不是管理者(Master),這樣,将Master從繁雜的管理工作中解脫出來,Master不需要關心、不需要輪訓NodeManager什麼時候來,什麼時候啟動,而是讓NodeManager在啟動的時候主動告知即可。
那麼,這個用戶端協定是怎麼進行初始化并向遠端的ResourceTracker發送消息的呢?既然NodeManager是該協定的用戶端,我們從NodeManager代碼進入,來看看初始化以及初始化以後基于協定進行通信的用戶端過程。
Yarn代碼設計的另外一個重要特點就是功能服務化,無論是NodeManager、ResourceManager還是MapReduce的ApplicationMaster(MRAppMaster),都抽象為服務,服務之間功能獨立,服務的運作 被抽象為初始化、啟動、運作和停止等基本過程,讓整個代碼邏輯非常清晰、封裝性變得非常好。
在NodeManager的
serviceInit()
方法中,我們看到:
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics);
}
建立了一個運作時類型為
NodeStatusUpdaterImpl
的狀态更新器,其實,這個NodeStatusUpdater也是一個
service
,啟動ResourceTracker協定的用戶端,就是在
NodeStatusUpdaterImpl.serviceStart()
中進行:
@Override
protected void serviceStart() throws Exception {
// NodeManager is the last service to start, so NodeId is available.
//...
try {
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
//建立基于ResourceManager協定的RPC用戶端
this.resourceTracker = getRMClient();
registerWithRM();
//...
} catch (Exception e) {
//...
}
}
跟蹤代碼,到
ServerRMProxy.createRMProxy()
:
/**
* Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as
* well.
* 對于ResourceTracker協定來說,這裡的參數protocol就是是ResourceTracker.class, instanse參數是ServerRMProxy
*/
@Private
protected static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol, RMProxy instance) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(conf);
//通過讀取配置檔案,判斷是否開啟High Availability模式
if (HAUtil.isHAEnabled(conf)) {//開啟HA模式
RMFailoverProxyProvider<T> provider =
instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else {//不開啟HA模式
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
}
}
在這裡我們開始接觸到
proxy
, 如果大家對IPC(Inter-Process Communication,程序間通信)或者RMI(Remote Method Invocation,遠端方法調用)不是很熟悉,也許對proxy的了解産生偏差。在這裡,proxy指的就是調用者,即用戶端。由于在程序間通信或者遠端方法調用的時候,調用者隻需要調用方法,不需要關心方法是在本地還是遠端執行,是以存在一個代理者(即proxy,在java RMI中,也叫做stub程式),來負責将本地用戶端的調用通過TCP等網絡協定在遠端伺服器端進行調用,然後取回調用結果提供給調用者。這就是代理的含義。
是否開啟HA模式與本文讨論的話題無關,是以我們選取開啟HA模式的分支。繼續跟蹤代碼,看看基于ResourceTracker協定的RPC 用戶端是怎麼建立的。
跟蹤·RMProxy.createRMFailoverProxyProvider()
方法:
/**
* Helper method to create FailoverProxyProvider.
* 同樣,
*/
private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
Configuration conf, Class<T> protocol) {
Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
try {
defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
Class.forName( YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
} catch (Exception e) {
//some exception
}
//通過配置檔案中定義的provider,采用反射方式,建立這個FailoverProxyProvider執行個體
//預設情況下,這個Provider是
//建立一個Provider,根據預設配置,這個provider是org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider,這個執行個體的作用是對用戶端進行HA的封裝代理,似的用戶端無需關心HA環境下自己連接配接的到底是哪個ResourceManager
RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
defaultProviderClass, RMFailoverProxyProvider.class), conf);
provider.init(conf, (RMProxy<T>) this, protocol);
return provider;
}
然後,進入
ConfiguredRMFailoverProxyProvider.init()
方法:
public void init(Configuration configuration, RMProxy<T> rmProxy, Class<T> protocol) {
this.rmProxy = rmProxy;
this.protocol = protocol;
this.rmProxy.checkAllowedProtocols(this.protocol);
//....
}
由此可見,ConfiguredRMFailoverProxyProvider對我們的通信協定進行了HA的封裝,在
init
方法中,設定了它所代理的協定名稱(ResourceTracker)和這個協定的代理對象RMProxy;在HA環境下,用戶端隻需要直接使用
ConfiguredRMFailoverProxyProvider
給我們提供的代理對象,而不需要關心這個代理對象到底是指向了哪一個ResourceManager,這就是
ConfiguredRMFailoverProxyProvider
的職責,負責隐藏HA環境下的FailOver細節。
再回到上面提到的代碼片段
RMProxy.createRMProxy
:
 // 通過讀取配置檔案,判斷是否開啟High Availability模式
if (HAUtil.isHAEnabled(conf)) {// 開啟HA模式
//如果開啟HA,則provider是一個org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider
RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);//instance是ServerRMProxy
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else {// 不開啟HA模式
//.....
}
RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);
負責建立和初始化ResourceTracker協定在HA環境下的代理
ConfiguredRMFailoverProxyProvider
,那麼,
ConfiguredRMFailoverProxyProvider
是怎麼建立真正的RPC用戶端的呢?
我們繼續跟蹤下一行代碼
(T) RetryProxy.create(protocol, provider, retryPolicy);
,此時,protocol是ResourceTracker.class,provider是
ConfiguredRMFailoverProxyProvider
:
public static <T> Object create(Class<T> iface,
FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
);
}
可以看到,ConfiguredRMFailoverProxyProvider通過java動态代理來代理了ResourceTracker協定裡面方法的執行。熟悉java動态代理的都會明白,每一個動态代理proxy都需要有繼承
java.lang.reflect.InvocationHandler
并實作其
invoke()
方法,用來代替被代理類的執行,這裡,這個
InvocationHandler
就是RetryInvocationHandler。ConfiguredRMFailoverProxyProvider底層真正的RPC(已經說過,ConfiguredRMFailoverProxyProvider就是對真正的RPC封裝了一層HA特性),就是RetryInvocationHandler來實作的,其實是在RetryInvocationHandler的構造方法裡面進行的:
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy defaultPolicy,
Map<String, RetryPolicy> methodNameToPolicyMap) {
//....
this.currentProxy = proxyProvider.getProxy();//proxyProvider其實是ConfiguredRMFailoverProxyProvider對象
}
@Override
public synchronized ProxyInfo<T> getProxy() {
String rmId = rmServiceIds[currentProxyIndex];
T current = proxies.get(rmId);
if (current == null) {
current = getProxyInternal();
proxies.put(rmId, current);
}
return new ProxyInfo<T>(current, rmId);
}
接着往下跟蹤:
/**
* 負責建立用戶端代理對象
* @return
*/
private T getProxyInternal() {
try {
//通過配置檔案,擷取遠端
//rmProxy的運作時對象是ServerRMProxy,protocol是ResourceTracker.class
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
return RMProxy.getProxy(conf, protocol, rmAddress);
} catch (IOException ioe) {
LOG.error("Unable to create proxy to the ResourceManager " +
rmServiceIds[currentProxyIndex], ioe);
return null;
}
建立遠端ResourceManager伺服器端的位址對象,即ResourceTracker協定的伺服器端位址資訊
@InterfaceAudience.Private
@Override
protected InetSocketAddress getRMAddress(YarnConfiguration conf,
Class<?> protocol) {
if (protocol == ResourceTracker.class) {
//
return conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
//YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS的值是yarn.resourcemanager.resource-tracker.addresss,conf.getSocketAddr會從配置檔案中讀取YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS對應的配置項,如果沒有配置,就使用預設值YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,預設是0.0.0.0:8031,端口預設值是YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT ,8031
} else {
//Throw some exceptions
}
}
顯然,
getRMAddress()
方法就是通過讀取配置檔案來建立了一個
InetSocketAddress
對象,然後,真正底層建立proxy的時刻到來,來看
RMProxy.getProxy()
:
* Get a proxy to the RM at the specified address. To be used to create a
* RetryProxy.
* 對于ResourceTracker協定來說,這裡的參數protocol就是ResourceTracker.class
*/
@Private
static <T> T getProxy(final Configuration conf, final Class<T> protocol, final InetSocketAddress rmAddress)
throws IOException {
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<T>() {
@Override
public T run() {
//Yarn的所有RPC用戶端和伺服器端都是用YarnRPC進行建立
return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
}
});
}
YarnRPC是一個抽象類(Abstract Class),是Yarn對Hadoop RPC 的封裝,基于曆史原因和版本更新疊代,Hadoop RPC有基于多種序列化方式的RPC協定,但是由于Yarn是Hadoop 2.0之後才有的元件,是很新的component, 是以Yarn所有的RPC調用都是基于google protobuf序列化方式的RPC進行的實作。
我們一起來看YarnRPC的類圖:
YarnRPC.create()
public static YarnRPC create(Configuration conf) {
LOG.debug("Creating YarnRPC for " +
conf.get(YarnConfiguration.IPC_RPC_IMPL));
//yarn.ipc.rpc.class 預設是org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);
if (clazzName == null) {
clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;
}
try {
return (YarnRPC) Class.forName(clazzName).newInstance();
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
YarnRPC這個抽象類的實際實作類的名稱是通過Yarn配置檔案讀取,預設是
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
這個類,
是以
YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
實際上調用了
HadoopYarnProtoRPC.getProxy()
方法:
public Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) {
LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
// 預設的clientfactory是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl
return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, , addr, conf);
}
進入
RpcClientFactoryPBImpl.getClient()
:
//對于ResourceTracker協定來說,這裡的參數protocol就是ResourceTracker.class
public Object getClient(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf) {
Constructor<?> constructor = cache.get(protocol);
if (constructor == null) {
Class<?> pbClazz = null;
try {
//根據Yarn自身的規定,需要根據protocol名稱拿到具體的用戶端實作,即從ResourceTracker -> ResourceTrackerPBClientImpl,所有的Yarn RPC都遵循這樣的轉換規定,除了ResourceTracker,還有比如ApplicationMaster和ResourceManager進行溝通的協定ApplicationMasterProtocol,它對應的用戶端實作類叫做ResourceTrackerPBClientImpl,而對應的伺服器端實作類叫做ResourceTrackerPBServerImpl
pbClazz = localConf.getClassByName(getPBImplClassName(protocol));
} catch (ClassNotFoundException e) {
//some exceptions
}
try {
//ResourceTrackerPBClientImpl的構造函數
constructor = pbClazz.getConstructor(Long.TYPE, InetSocketAddress.class, Configuration.class);
constructor.setAccessible(true);
cache.putIfAbsent(protocol, constructor);
} catch (NoSuchMethodException e) {
//some exceptions
}
}
try {
//構造ResourceTrackerPBClientImpl對象
Object retObject = constructor.newInstance(clientVersion, addr, conf);
return retObject;
} catch (InvocationTargetException e) {
///some exceptions
}
}
ResourceTrackerPBClientImpl就是對ResourceTracker協定的最下層代理,來看ResourceTrackerPBClientImpl的構造函數:
public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf)
throws IOException {
//為protocol注冊處理引擎
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
//設定并擷取protocol的代理類
proxy = (ResourceTrackerPB) RPC.getProxy(ResourceTrackerPB.class, clientVersion, addr, conf);
}
在這裡,我們再次看到了hadoop中的注冊思想。我們的ResourceTrackerPBClientImpl協定要想使用,必須向對應的RPC Engine注冊自己。所有基于protobuf協定的RPC都必須向ProtobufRpcEngine進行注冊,注冊完成以後,建立底層的用戶端代理。
終于,經過繁雜但是設計良好的Protobuf RPC的初始化,我們終于拿到了ResourceTracker協定的用戶端實作類。此後,ResourceTracker協定的用戶端,我們的 NodeManager,就可以根據協定的定義,來進行協定中的
registerNodeManager()
方法的調用。我們跟蹤一下這個過程,試圖搞清楚用戶端在調用這個方法的時候,是如何不知不覺通過RPC變成了伺服器端的調用的。
registerNodeManager()
是由NodeManager發起的,NodeManager實際上是委托
NodeStatusUpdaterImpl
來與伺服器端的ResourceManager進行溝通,看
NodeStatusUpdaterImpl.registerNodeManager
:
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
try {
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
這個proxy對象,是 ResourceTrackerPBClientImpl構造函數執行的時候建立的:
由于Yarn RPC使用Protobuf ,是以
RPC.getProxy
實際上調用的是
ProtobufRpcEngine().getProxy()
方法:
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
很明顯,這裡是通過java動态代理,來對
ResourceTrackerPBClientImpl
的方法進行代理執行。再次重複上面關于java 動态代理的解釋,所有java動态代理都必須實作
java.lang.reflect.InvocationHandler
接口,實作其
invoke()
方法,用來代替被代理類的執行,對于
ProtobufRpcEngine
,這個
InvocationHandler
就是
ProtobufRpcEngine.Invoker
。我們看
ProtobufRpcEngine.Invoker
是怎麼代理這個用戶端的registerNodeManager()方法的執行的:
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = ;
//some code
//開始将方法的資訊和請求資訊進行包裝,準備發送給server
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
//....
//提取請求方法的參數
Message theRequest = (Message) args[];
final RpcResponseWrapper val;
try {
//将請求資訊發送給遠端伺服器
//remoteId是一個 Client.ConnectionId,封裝了該協定對應的遠端伺服器的資訊,比如ip、端口等
//RpcRequestWrapper封裝了請求的方法、參數資訊,并且RpcRequestWrapper是一個Writable,是以
//可以被序列化然後發送給遠端
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch (Throwable e) {
//some exceptions
} finally {
if (traceScope != null) traceScope.close();
}
Message prototype = null;
try {
prototype = getReturnProtoType(method);
} catch (Exception e) {
throw new ServiceException(e);
}
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
.mergeFrom(val.theResponseRead).build();
} catch (Throwable e) {
throw new ServiceException(e);
}
return returnMessage;
}
可以看到,ProtobufRpcEngine.Invokder.invoke()方法做的工作,就是提取用戶端請求的方法以及方法的參數,将這些資訊發送給遠端伺服器。遠端伺服器再通過解析,提取出方法和方法參數,在伺服器端本地執行對應的代碼,比如,伺服器端從用戶端請求中提取了方法名稱為registerNodeManager()以及參數(包含了節點資訊等等),會将節點資訊進行注冊和管理,然後傳回注冊成功資訊。
在HA環境下,通過一層層代理封裝,Yarn實作了HA環境下的ResourceManager協定用戶端,ResourceTrackerPBClientImpl封裝了該協定的用戶端實作,屬于下層代理,通過這個下層動态代理,将用戶端對應方法的調用,轉換成位元組碼資訊發送給遠端,而
ConfiguredRMFailoverProxyProvider
也是通過動态代理,在ResourceTrackerPBClientImpl的上層進行了封裝,以實作High Availability特性。在HA環境下,NodeManager作為ResourceTracker用戶端,從ConfiguredRMFailoverProxyProvider的上層代理往下調用,到達ResourceTrackerPBClientImpl下層代理,然後ResourceTrackerPBClientImpl通過動态代理,将請求資訊發送到RPC Server,實作了該協定的一次調用。