天天看點

Hadoop 基于protobuf 的RPC的用戶端實作原理

基于google protobuf的RPC engine,必須在伺服器端和用戶端都完成了初始化之後,才能開始通信。在《Hadoop 基于protobuf 的RPC的伺服器端實作原理》這篇博文中,我介紹了RPC 的伺服器端實作,那麼,用戶端是如何基于預先定義的protobuf協定,來與遠端的基于相同的protobuf協定的服務端進行通信的呢?

比如,NodeManger與遠端的ResourceManager進行RPC通信,它們的通信基于ResourceTracker這個RPC協定,協定定義在ResourceTracker.proto檔案中:

service ResourceTrackerService {
  rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
  rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
}
           

協定中定義了兩個通信接口,

registerNodeManager

nodeHeartbeat

registerNodeManager

負責在節點啟的NodeManager啟動的時候向ResourceManger注冊自己,而

nodeHeartbeat

是通過定時心跳的方式,不斷向ResourceManager報告自己的存在,并将自己的狀态彙報給ResourceManager。

在Yarn的基于Master/Slave的設計模式中,

register

思想是最核心的 設計思想。NodeManager被ResourceManager管理,那麼NodeManager在啟動的時候必須向ResourceManager注冊,RPCEngine想要生效,Engine在初始化的時候也必須向ResourceManager注冊。我認為這種設計思想的根本目的,是将主動權交給使用者(Slave)而不是管理者(Master),這樣,将Master從繁雜的管理工作中解脫出來,Master不需要關心、不需要輪訓NodeManager什麼時候來,什麼時候啟動,而是讓NodeManager在啟動的時候主動告知即可。

那麼,這個用戶端協定是怎麼進行初始化并向遠端的ResourceTracker發送消息的呢?既然NodeManager是該協定的用戶端,我們從NodeManager代碼進入,來看看初始化以及初始化以後基于協定進行通信的用戶端過程。

Yarn代碼設計的另外一個重要特點就是功能服務化,無論是NodeManager、ResourceManager還是MapReduce的ApplicationMaster(MRAppMaster),都抽象為服務,服務之間功能獨立,服務的運作 被抽象為初始化、啟動、運作和停止等基本過程,讓整個代碼邏輯非常清晰、封裝性變得非常好。

在NodeManager的

serviceInit()

方法中,我們看到:

nodeStatusUpdater =
        createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
           
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
      Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
      metrics);
  }
           

建立了一個運作時類型為

NodeStatusUpdaterImpl

的狀态更新器,其實,這個NodeStatusUpdater也是一個

service

,啟動ResourceTracker協定的用戶端,就是在

NodeStatusUpdaterImpl.serviceStart()

中進行:

@Override
  protected void serviceStart() throws Exception {

    // NodeManager is the last service to start, so NodeId is available.
    //...
    try {
      // Registration has to be in start so that ContainerManager can get the
      // perNM tokens needed to authenticate ContainerTokens.
      //建立基于ResourceManager協定的RPC用戶端
      this.resourceTracker = getRMClient();
      registerWithRM();
      //...
    } catch (Exception e) {
      //...
    }
  }
           

跟蹤代碼,到

ServerRMProxy.createRMProxy()

:

/**
   * Create a proxy for the specified protocol. For non-HA,
   * this is a direct connection to the ResourceManager address. When HA is
   * enabled, the proxy handles the failover between the ResourceManagers as
   * well.
   * 對于ResourceTracker協定來說,這裡的參數protocol就是是ResourceTracker.class, instanse參數是ServerRMProxy
   */
  @Private
  protected static <T> T createRMProxy(final Configuration configuration,
      final Class<T> protocol, RMProxy instance) throws IOException {
    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
        ? (YarnConfiguration) configuration
        : new YarnConfiguration(configuration);
    RetryPolicy retryPolicy = createRetryPolicy(conf);
    //通過讀取配置檔案,判斷是否開啟High Availability模式
    if (HAUtil.isHAEnabled(conf)) {//開啟HA模式
      RMFailoverProxyProvider<T> provider =
          instance.createRMFailoverProxyProvider(conf, protocol);
      return (T) RetryProxy.create(protocol, provider, retryPolicy);
    } else {//不開啟HA模式
      InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
      LOG.info("Connecting to ResourceManager at " + rmAddress);
      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
      return (T) RetryProxy.create(protocol, proxy, retryPolicy);
    }
  }
           

在這裡我們開始接觸到

proxy

, 如果大家對IPC(Inter-Process Communication,程序間通信)或者RMI(Remote Method Invocation,遠端方法調用)不是很熟悉,也許對proxy的了解産生偏差。在這裡,proxy指的就是調用者,即用戶端。由于在程序間通信或者遠端方法調用的時候,調用者隻需要調用方法,不需要關心方法是在本地還是遠端執行,是以存在一個代理者(即proxy,在java RMI中,也叫做stub程式),來負責将本地用戶端的調用通過TCP等網絡協定在遠端伺服器端進行調用,然後取回調用結果提供給調用者。這就是代理的含義。

是否開啟HA模式與本文讨論的話題無關,是以我們選取開啟HA模式的分支。繼續跟蹤代碼,看看基于ResourceTracker協定的RPC 用戶端是怎麼建立的。

跟蹤·RMProxy.createRMFailoverProxyProvider()

方法:

/**
   * Helper method to create FailoverProxyProvider.
   * 同樣,
   */
  private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
      Configuration conf, Class<T> protocol) {
    Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
    try {
      defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
          Class.forName(          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
    } catch (Exception e) {
     //some exception
    }
    //通過配置檔案中定義的provider,采用反射方式,建立這個FailoverProxyProvider執行個體
    //預設情況下,這個Provider是
    //建立一個Provider,根據預設配置,這個provider是org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider,這個執行個體的作用是對用戶端進行HA的封裝代理,似的用戶端無需關心HA環境下自己連接配接的到底是哪個ResourceManager
    RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
        conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
            defaultProviderClass, RMFailoverProxyProvider.class), conf);
    provider.init(conf, (RMProxy<T>) this, protocol);
    return provider;
  }
           

然後,進入

ConfiguredRMFailoverProxyProvider.init()

方法:

public void init(Configuration configuration, RMProxy<T> rmProxy, Class<T> protocol) {
        this.rmProxy = rmProxy;
        this.protocol = protocol;
        this.rmProxy.checkAllowedProtocols(this.protocol);
//....
    }
           

由此可見,ConfiguredRMFailoverProxyProvider對我們的通信協定進行了HA的封裝,在

init

方法中,設定了它所代理的協定名稱(ResourceTracker)和這個協定的代理對象RMProxy;在HA環境下,用戶端隻需要直接使用

ConfiguredRMFailoverProxyProvider

給我們提供的代理對象,而不需要關心這個代理對象到底是指向了哪一個ResourceManager,這就是

ConfiguredRMFailoverProxyProvider

的職責,負責隐藏HA環境下的FailOver細節。

再回到上面提到的代碼片段

RMProxy.createRMProxy

          // 通過讀取配置檔案,判斷是否開啟High Availability模式
        if (HAUtil.isHAEnabled(conf)) {// 開啟HA模式
            //如果開啟HA,則provider是一個org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider
            RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);//instance是ServerRMProxy
            return (T) RetryProxy.create(protocol, provider, retryPolicy);
        } else {// 不開啟HA模式
            //.....
        }
           

RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);

負責建立和初始化ResourceTracker協定在HA環境下的代理

ConfiguredRMFailoverProxyProvider

,那麼,

ConfiguredRMFailoverProxyProvider

是怎麼建立真正的RPC用戶端的呢?

我們繼續跟蹤下一行代碼

(T) RetryProxy.create(protocol, provider, retryPolicy);

,此時,protocol是ResourceTracker.class,provider是

ConfiguredRMFailoverProxyProvider

:

public static <T> Object create(Class<T> iface,
      FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
    return Proxy.newProxyInstance(
        proxyProvider.getInterface().getClassLoader(),
        new Class<?>[] { iface },
        new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
        );
  }
           

可以看到,ConfiguredRMFailoverProxyProvider通過java動态代理來代理了ResourceTracker協定裡面方法的執行。熟悉java動态代理的都會明白,每一個動态代理proxy都需要有繼承

java.lang.reflect.InvocationHandler

并實作其

invoke()

方法,用來代替被代理類的執行,這裡,這個

InvocationHandler

就是RetryInvocationHandler。ConfiguredRMFailoverProxyProvider底層真正的RPC(已經說過,ConfiguredRMFailoverProxyProvider就是對真正的RPC封裝了一層HA特性),就是RetryInvocationHandler來實作的,其實是在RetryInvocationHandler的構造方法裡面進行的:

protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
      RetryPolicy defaultPolicy,
      Map<String, RetryPolicy> methodNameToPolicyMap) {
    //....
    this.currentProxy = proxyProvider.getProxy();//proxyProvider其實是ConfiguredRMFailoverProxyProvider對象
  }
           
@Override
  public synchronized ProxyInfo<T> getProxy() {
    String rmId = rmServiceIds[currentProxyIndex];
    T current = proxies.get(rmId);
    if (current == null) {
      current = getProxyInternal();
      proxies.put(rmId, current);
    }
    return new ProxyInfo<T>(current, rmId);
  }
           

接着往下跟蹤:

/**
   * 負責建立用戶端代理對象
   * @return
   */
  private T getProxyInternal() {
    try {
      //通過配置檔案,擷取遠端
      //rmProxy的運作時對象是ServerRMProxy,protocol是ResourceTracker.class
      final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
      return RMProxy.getProxy(conf, protocol, rmAddress);
    } catch (IOException ioe) {
      LOG.error("Unable to create proxy to the ResourceManager " +
          rmServiceIds[currentProxyIndex], ioe);
      return null;
    }
           

建立遠端ResourceManager伺服器端的位址對象,即ResourceTracker協定的伺服器端位址資訊

@InterfaceAudience.Private
  @Override
  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
                                           Class<?> protocol) {
    if (protocol == ResourceTracker.class) {
    //
      return conf.getSocketAddr(
        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
        //YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS的值是yarn.resourcemanager.resource-tracker.addresss,conf.getSocketAddr會從配置檔案中讀取YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS對應的配置項,如果沒有配置,就使用預設值YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,預設是0.0.0.0:8031,端口預設值是YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT ,8031
    } else {
       //Throw some exceptions 
    }
  }
           

顯然,

getRMAddress()

方法就是通過讀取配置檔案來建立了一個

InetSocketAddress

對象,然後,真正底層建立proxy的時刻到來,來看

RMProxy.getProxy()

* Get a proxy to the RM at the specified address. To be used to create a
     * RetryProxy.
     * 對于ResourceTracker協定來說,這裡的參數protocol就是ResourceTracker.class
     */
    @Private
    static <T> T getProxy(final Configuration conf, final Class<T> protocol, final InetSocketAddress rmAddress)
            throws IOException {
        return UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<T>() {
            @Override
            public T run() {
                //Yarn的所有RPC用戶端和伺服器端都是用YarnRPC進行建立
                return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
            }
        });
    }
           

YarnRPC是一個抽象類(Abstract Class),是Yarn對Hadoop RPC 的封裝,基于曆史原因和版本更新疊代,Hadoop RPC有基于多種序列化方式的RPC協定,但是由于Yarn是Hadoop 2.0之後才有的元件,是很新的component, 是以Yarn所有的RPC調用都是基于google protobuf序列化方式的RPC進行的實作。

我們一起來看YarnRPC的類圖:

Hadoop 基于protobuf 的RPC的用戶端實作原理

YarnRPC.create()

public static YarnRPC create(Configuration conf) {
    LOG.debug("Creating YarnRPC for " + 
        conf.get(YarnConfiguration.IPC_RPC_IMPL)); 
    //yarn.ipc.rpc.class  預設是org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
    String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);
    if (clazzName == null) {
      clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;
    }
    try {
      return (YarnRPC) Class.forName(clazzName).newInstance();
    } catch (Exception e) {
      throw new YarnRuntimeException(e);
    }
  }
           

YarnRPC這個抽象類的實際實作類的名稱是通過Yarn配置檔案讀取,預設是

org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC

這個類,

是以

YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);

實際上調用了

HadoopYarnProtoRPC.getProxy()

方法:

public Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) {
        LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
        // 預設的clientfactory是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl
        return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, , addr, conf);
    }
           

進入

RpcClientFactoryPBImpl.getClient()

:

//對于ResourceTracker協定來說,這裡的參數protocol就是ResourceTracker.class
public Object getClient(Class<?> protocol, long clientVersion,
      InetSocketAddress addr, Configuration conf) {
    Constructor<?> constructor = cache.get(protocol);
    if (constructor == null) {
      Class<?> pbClazz = null;
      try {
     //根據Yarn自身的規定,需要根據protocol名稱拿到具體的用戶端實作,即從ResourceTracker -> ResourceTrackerPBClientImpl,所有的Yarn RPC都遵循這樣的轉換規定,除了ResourceTracker,還有比如ApplicationMaster和ResourceManager進行溝通的協定ApplicationMasterProtocol,它對應的用戶端實作類叫做ResourceTrackerPBClientImpl,而對應的伺服器端實作類叫做ResourceTrackerPBServerImpl
        pbClazz = localConf.getClassByName(getPBImplClassName(protocol));
      } catch (ClassNotFoundException e) {
       //some exceptions
      }
      try {
      //ResourceTrackerPBClientImpl的構造函數
        constructor = pbClazz.getConstructor(Long.TYPE, InetSocketAddress.class, Configuration.class);
        constructor.setAccessible(true);
        cache.putIfAbsent(protocol, constructor);
      } catch (NoSuchMethodException e) {
        //some exceptions
      }
    }
    try {
    //構造ResourceTrackerPBClientImpl對象
      Object retObject = constructor.newInstance(clientVersion, addr, conf);
      return retObject;
    } catch (InvocationTargetException e) {
      ///some exceptions 
    }
  }
           

ResourceTrackerPBClientImpl就是對ResourceTracker協定的最下層代理,來看ResourceTrackerPBClientImpl的構造函數:

public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf)
            throws IOException {
        //為protocol注冊處理引擎
        RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
        //設定并擷取protocol的代理類
        proxy = (ResourceTrackerPB) RPC.getProxy(ResourceTrackerPB.class, clientVersion, addr, conf);
    }
           

在這裡,我們再次看到了hadoop中的注冊思想。我們的ResourceTrackerPBClientImpl協定要想使用,必須向對應的RPC Engine注冊自己。所有基于protobuf協定的RPC都必須向ProtobufRpcEngine進行注冊,注冊完成以後,建立底層的用戶端代理。

終于,經過繁雜但是設計良好的Protobuf RPC的初始化,我們終于拿到了ResourceTracker協定的用戶端實作類。此後,ResourceTracker協定的用戶端,我們的 NodeManager,就可以根據協定的定義,來進行協定中的

registerNodeManager()

方法的調用。我們跟蹤一下這個過程,試圖搞清楚用戶端在調用這個方法的時候,是如何不知不覺通過RPC變成了伺服器端的調用的。

registerNodeManager()

是由NodeManager發起的,NodeManager實際上是委托

NodeStatusUpdaterImpl

來與伺服器端的ResourceManager進行溝通,看

NodeStatusUpdaterImpl.registerNodeManager

:

@Override
  public RegisterNodeManagerResponse registerNodeManager(
      RegisterNodeManagerRequest request) throws YarnException,
      IOException {
    RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
    try {

      return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
  }
           

這個proxy對象,是 ResourceTrackerPBClientImpl構造函數執行的時候建立的:

由于Yarn RPC使用Protobuf ,是以

RPC.getProxy

實際上調用的是

ProtobufRpcEngine().getProxy()

方法:

@Override
  @SuppressWarnings("unchecked")
  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {

    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
  }
           

很明顯,這裡是通過java動态代理,來對

ResourceTrackerPBClientImpl

的方法進行代理執行。再次重複上面關于java 動态代理的解釋,所有java動态代理都必須實作

java.lang.reflect.InvocationHandler

接口,實作其

invoke()

方法,用來代替被代理類的執行,對于

ProtobufRpcEngine

,這個

InvocationHandler

就是

ProtobufRpcEngine.Invoker

。我們看

ProtobufRpcEngine.Invoker

是怎麼代理這個用戶端的registerNodeManager()方法的執行的:

@Override
    public Object invoke(Object proxy, Method method, Object[] args)
        throws ServiceException {
      long startTime = ;
      //some code
      //開始将方法的資訊和請求資訊進行包裝,準備發送給server
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
      //....
      //提取請求方法的參數
      Message theRequest = (Message) args[];
      final RpcResponseWrapper val;
      try {
        //将請求資訊發送給遠端伺服器
          //remoteId是一個 Client.ConnectionId,封裝了該協定對應的遠端伺服器的資訊,比如ip、端口等
          //RpcRequestWrapper封裝了請求的方法、參數資訊,并且RpcRequestWrapper是一個Writable,是以
          //可以被序列化然後發送給遠端
        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);

      } catch (Throwable e) {
        //some exceptions
      } finally {
        if (traceScope != null) traceScope.close();
      }
      Message prototype = null;
      try {
        prototype = getReturnProtoType(method);
      } catch (Exception e) {
        throw new ServiceException(e);
      }
      Message returnMessage;
      try {
        returnMessage = prototype.newBuilderForType()
            .mergeFrom(val.theResponseRead).build();

      } catch (Throwable e) {
        throw new ServiceException(e);
      }
      return returnMessage;
    }
           

可以看到,ProtobufRpcEngine.Invokder.invoke()方法做的工作,就是提取用戶端請求的方法以及方法的參數,将這些資訊發送給遠端伺服器。遠端伺服器再通過解析,提取出方法和方法參數,在伺服器端本地執行對應的代碼,比如,伺服器端從用戶端請求中提取了方法名稱為registerNodeManager()以及參數(包含了節點資訊等等),會将節點資訊進行注冊和管理,然後傳回注冊成功資訊。

在HA環境下,通過一層層代理封裝,Yarn實作了HA環境下的ResourceManager協定用戶端,ResourceTrackerPBClientImpl封裝了該協定的用戶端實作,屬于下層代理,通過這個下層動态代理,将用戶端對應方法的調用,轉換成位元組碼資訊發送給遠端,而

ConfiguredRMFailoverProxyProvider

也是通過動态代理,在ResourceTrackerPBClientImpl的上層進行了封裝,以實作High Availability特性。在HA環境下,NodeManager作為ResourceTracker用戶端,從ConfiguredRMFailoverProxyProvider的上層代理往下調用,到達ResourceTrackerPBClientImpl下層代理,然後ResourceTrackerPBClientImpl通過動态代理,将請求資訊發送到RPC Server,實作了該協定的一次調用。

繼續閱讀