天天看点

YARN ApplicationMaster与ResourceManager之间基于applicationmaster_protocol.proto协议的allocate()接口源码解析

Yarn Application

运行期间,

ApplicationMaster

相当于这个Application的监护人和管理者,负责监控、管理这个Application的所有

Attempt

在cluster中各个节点上的具体运行,同时负责向

Yarn

ResourceManager

申请资源、返还资源等。可以说,

ApplicationMaster

ResourceManager

之间的通信是整个Yarn应用从提交到运行的最核心部分,是Yarn对整个集群进行动态资源管理的根本步骤,Yarn的动态性,就是来源于多个Application的

ApplicationMaster

动态地和

ResourceManager

进行沟通,不断地申请、释放、再申请、再释放资源的过程。因此,我们一起通过具体实现,来看

ApplicationMaster

从第一次向Yarn注册自己一直到申请和释放资源时服务端处理的全过程。

注意,本文只关注服务端即ResourceManager对ApplicationMaster客户端的相关请求的处理逻辑,而客户端即ApplicationMaster关于创建请求、监控应用的实现原理,有兴趣的读者可以自己去阅读代码。提示一下,对于MapReduce任务,客户端ApplicationMaster的实现是MRAppMaster,读者可以从这里看进去,了解客户端注册、申请资源的过程。同时,董西成董博客《YARN/MRv2 MRAppMaster深入剖析—整体架构》关于MRAppMaster也有比较上层的讲解。

ApplicationMaster

ResourceManager

之间的通信协议也是基于

Protobuf

协议进行的,协议的文件定义在文件

applicationmaster_protocol.proto

中:

service ApplicationMasterProtocolService {
  rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);
  rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
  rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);
}
           

整个

RPC

通信过程中,

ApplicationMaster

RPC Client

的角色,而

ResourceManager

属于RPC Server的角色。Hadoop RPC的原理和实现,可以参考我的另外两篇博客:《Hadoop 基于protobuf 的RPC的客户端实现原理》和 《Hadoop 基于protobuf 的RPC的服务器端实现原理》

其实,

ResourceManager

是委托

ApplicationMasterService

来代替自己进行资源管理的。我在其它博文中也多次提到了Yarn的模块化设计,

ResourceManager

本身被抽象为一个

Service

,并且它是由很多个子

Service

组成,

ApplicationMasterService

就是其中一个。通过一个简单示意图,我们看看这些

Service

之间的层次关系:

YARN ApplicationMaster与ResourceManager之间基于applicationmaster_protocol.proto协议的allocate()接口源码解析

从上图的服务层级关系可以看到,负责与

AppliationMaster

进行沟通的服务

ApplicationMasterService

RMActiveServices

服务的一个子服务,

RMActiveServices

是专门用来给HA模式下的

Active ResourceManager

运行的服务,而

Standby

ResourceManager

是不会启动这个服务的。这个逻辑我们从

ResourceManager

的一段代码中很容易看到:

//启动resourceManager服务
  @Override
  protected void serviceStart() throws Exception {
    if (this.rmContext.isHAEnabled()) {
      transitionToStandby(true);//如果HA模式打开,则直接进入standBy模式,后续会通过ZK决定是否成为Active
    } else {
      transitionToActive();//如果HA模式没有打开,则直接进入Active模式
    }
//省略
  }           

HA模式下,两台

ResourceManager

启动的时候,都是直接进入Standby模式,然后再去竞争以获取Active的角色和身份。非HA模式下,只有一台

ResourceManager

,自然是直接进入Active模式。

ApplicationMaster

心跳和资源申请、释放接口

allocate()

是Yarn的资源申请、释放动态过程的最关键接口,该接口的主要职责和功能包括:

  • 心跳: 周期性通过allcate接口告知

    ResourceManager

    自己依然是alive的状态;
  • 资源请求:通过一个或者多个

    ResourceRequest

    ResourceManager

    发起资源请求;
  • 黑名单列表:向

    ResourceManager

    提供黑名单列表,

    ResourceManager

    收到该列表以后,不会向

    ApplicationMaster

    分配黑名单列表机器上的任何资源
  • 服务器返回:

    ResourceManager

    会在响应信息里面告知AppliationMaster关于已经分配的container信息、这个

    Appliation

    已经完成的container信息以及这个应用的headroom(个人理解是剩余可用资源信息,

    AplicationMaster

    可以根据headroom信息决定如何使用已经分配的资源以及如何明智地决定以后的资源申请)的作用。

一个Application的每个应用被分布到不同的节点执行,

ResourceManager

不会直接和所有节点上的所有attempt通信,而是由

ApplicationMaster

与自己的所有attempt通信,把信息通过

allocate()

接口发送给

ApplicationMasterService(

ResourceManager

),

ResourceManager

则维护了所有

Application

Container

attempt

信息,通过心跳,RM端维护的信息虽然肯定稍有延迟,但是却能不断被更新和同步。

好了,我们直接来上allocate这个关键方法的代码,代码比较多,我将所有关键代码添加了注释,同时会对代码进行一一解析。代码的复杂性代表了场景的复杂性,这也无法避免。

public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {

    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();

    ApplicationAttemptId appAttemptId =
        amrmTokenIdentifier.getApplicationAttemptId();
    ApplicationId applicationId = appAttemptId.getApplicationId();

    //每次方法调用都是一次心跳信息,因此记录此次心跳信息
    this.amLivelinessMonitor.receivedPing(appAttemptId);

    /* check if its in cache */
    //验证RM端是否已经有了ApplationMaster进程的attemptid信息
    //正常情况下,ApplicationMaster对应的进程的attemp在启动的时候应该注册给AMS,即记录在responseMap中
    AllocateResponseLock lock = responseMap.get(appAttemptId);
    if (lock == null) {
      String message =
          "Application attempt " + appAttemptId
              + " doesn't exist in ApplicationMasterService cache.";
      LOG.error(message);
      throw new ApplicationAttemptNotFoundException(message);
    }
    synchronized (lock) {
      //ApplicationMaster每次与AMS交互,都会生成并记录一个AllocateResponse,AllocateResponse
      //中记录的交互Id每次交互都会递增。从registerAppAtempt()中设置为-1,registerApplicationMaster()
      //设置为0, 以后开始每次交互均递增
      AllocateResponse lastResponse = lock.getAllocateResponse();
      //校验AM是否注册过
      if (!hasApplicationMasterRegistered(appAttemptId)) {
        String message =
            "AM is not registered for known application attempt: " + appAttemptId
                + " or RM had restarted after AM registered . AM should re-register.";
        LOG.info(message);
        RMAuditLogger.logFailure(
          this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
            .getUser(), AuditConstants.AM_ALLOCATE, "",
          "ApplicationMasterService", message, applicationId, appAttemptId);
        throw new ApplicationMasterNotRegisteredException(message);
      }

      //请求中序列号为上次请求的序列号,说明是一次重复请求,则直接返回上次的response
      if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
        /* old heartbeat */
        return lastResponse;
      } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
          //request里面的id是更早以前的,直接判定非法
        String message =
            "Invalid responseId in AllocateRequest from application attempt: "
                + appAttemptId + ", expect responseId to be "
                + (lastResponse.getResponseId() + 1);
        throw new InvalidApplicationMasterRequestException(message);
      }

      //过滤非法的进度信息,进度信息用一个浮点数表示,代表进程执行的百分比
      //filter illegal progress values
      float filteredProgress = request.getProgress();
      if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
        || filteredProgress < 0) {
         request.setProgress(0);
      } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
        request.setProgress(1);
      }

      // Send the status update to the appAttempt.
      //将ApplicationMaster返回到关于进度的信息,更新到ReSourceManager所维护的appAttempt中去,
      //使得这两部分信息保持一致,   this.rmContext.getDispatcher()是AsyncDispatcher,得到的
      //eventHandler是ApplicationAttemptEventDispatcher
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));

      //新的资源请求
      List<ResourceRequest> ask = request.getAskList();
      //NodeManager已经释放的container信息
      List<ContainerId> release = request.getReleaseList();
      //黑名单信息,不希望自己的container分配到这些机器上
      ResourceBlacklistRequest blacklistRequest =
          request.getResourceBlacklistRequest();
      //添加到黑名单中的资源list
      List<String> blacklistAdditions =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
      //应该从黑名单中移除的资源名称的list
      List<String> blacklistRemovals =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
      //ResourceManager维护的这个application的信息,运行时,这个app是一个RMAppImpl
      RMApp app =
          this.rmContext.getRMApps().get(applicationId);

      // set label expression for Resource Requests if resourceName=ANY 
      ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
      for (ResourceRequest req : ask) {
        if (null == req.getNodeLabelExpression()
            && ResourceRequest.ANY.equals(req.getResourceName())) {
          req.setNodeLabelExpression(asc.getNodeLabelExpression());
        }
      }

      //完整性检查,包括规范化NodeLabel , 同时对资源合法性进行校验
      try {
        RMServerUtils.normalizeAndValidateRequests(ask,
            rScheduler.getMaximumResourceCapability(), app.getQueue(),
            rScheduler, rmContext);
      } catch (InvalidResourceRequestException e) {
        LOG.warn("Invalid resource ask by application " + appAttemptId, e);
        throw e;
      }

      try {
          //对黑名单资源进行检查
        RMServerUtils.validateBlacklistRequest(blacklistRequest);
      }  catch (InvalidResourceBlacklistRequestException e) {
        LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
        throw e;
      }

      // In the case of work-preserving AM restart, it's possible for the
      // AM to release containers from the earlier attempt.
      //在work-preserving 关闭的情况下,不应该发生申请释放的container的applicationAttemptId
      //与当前AM的attemptId不一致的 情况,如果发生,则抛出异常
      if (!app.getApplicationSubmissionContext()
        .getKeepContainersAcrossApplicationAttempts()) {
        try {
          //确认释放请求中所有的container都是当前这个application的id
          //如果真的发生了AM restart并且work-preserving AM restart打开,那么这些container中包含的
            //getApplicationAttemptId应该与重启以后的ApplicationAttemptId不同,这时候这个
          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
        } catch (InvalidContainerReleaseException e) {
          LOG.warn("Invalid container release by application " + appAttemptId, e);
          throw e;
        }
      }

      // Send new requests to appAttempt.
      //如果我们使用的是fairScheduler,则调用的是FairScheduler.allocate()
      Allocation allocation =
          this.rScheduler.allocate(appAttemptId, ask, release, 
              blacklistAdditions, blacklistRemovals);

      if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
        LOG.info("blacklist are updated in Scheduler." +
            "blacklistAdditions: " + blacklistAdditions + ", " +
            "blacklistRemovals: " + blacklistRemovals);
      }
      RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
      AllocateResponse allocateResponse =
          recordFactory.newRecordInstance(AllocateResponse.class);
      if (!allocation.getContainers().isEmpty()) {
        allocateResponse.setNMTokens(allocation.getNMTokens());
      }

      // update the response with the deltas of node status changes
      //设置response中所有节点的信息
      List<RMNode> updatedNodes = new ArrayList<RMNode>();
      if(app.pullRMNodeUpdates(updatedNodes) > 0) {//将节点信息放入到updatedNodes中
        List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
        for(RMNode rmNode: updatedNodes) {
          SchedulerNodeReport schedulerNodeReport =  
              rScheduler.getNodeReport(rmNode.getNodeID());
          Resource used = BuilderUtils.newResource(0, 0);
          int numContainers = 0;
          if (schedulerNodeReport != null) {
            used = schedulerNodeReport.getUsedResource();
            numContainers = schedulerNodeReport.getNumContainers();
          }
          NodeId nodeId = rmNode.getNodeID();
          NodeReport report =
              BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
                  rmNode.getHttpAddress(), rmNode.getRackName(), used,
                  rmNode.getTotalCapability(), numContainers,
                  rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
                  rmNode.getNodeLabels());

          updatedNodeReports.add(report);
        }
        allocateResponse.setUpdatedNodes(updatedNodeReports);
      }

      //已经为这个application分配的信息
      allocateResponse.setAllocatedContainers(allocation.getContainers());
      //已经完成的container的状态
      allocateResponse.setCompletedContainersStatuses(appAttempt
          .pullJustFinishedContainers());
      //responseID自增1
      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);

      allocateResponse.setAvailableResources(allocation.getResourceLimit());

      //集群中可用节点的数目
      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());

      // add preemption to the allocateResponse message (if any)
      allocateResponse
          .setPreemptionMessage(generatePreemptionMessage(allocation));

      // update AMRMToken if the token is rolled-up
      MasterKeyData nextMasterKey =
          this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();

      if (nextMasterKey != null
          && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
            .getKeyId()) {
        RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
        Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
        if (nextMasterKey.getMasterKey().getKeyId() !=
            appAttemptImpl.getAMRMTokenKeyId()) {
          LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
              + " to application: " + applicationId);
          amrmToken = rmContext.getAMRMTokenSecretManager()
              .createAndGetAMRMToken(appAttemptId);
          appAttemptImpl.setAMRMToken(amrmToken);
        }
        allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
          .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
            .toString(), amrmToken.getPassword(), amrmToken.getService()
            .toString()));
      }

      /*
       * As we are updating the response inside the lock object so we don't
       * need to worry about unregister call occurring in between (which
       * removes the lock object).
       */
      lock.setAllocateResponse(allocateResponse);
      return allocateResponse;
    }    
  }           

在allocate()方法的最开始,是通过Token机制,获得发起请求的

ApplicationMaster

信息(

ApplicationAttemptId

,代表了运行

ApplicationMaster

的这个attempt)以及这个

ApplicationMaster

所管理的Application的信息(ApplicationId对象),如以下代码:

AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();

    ApplicationAttemptId appAttemptId =
        amrmTokenIdentifier.getApplicationAttemptId();
    ApplicationId applicationId = appAttemptId.getApplicationId();
           

关于基于Token的Hadoop认证机制,大家可以参考董西成的博客:

《Hadoop 2.0 (YARN)中的安全机制概述》

然后,再次重申,

allocate()

方法的职责并不真的如同方法名称一样代表资源分配,从根本上说也是心跳以及客户端和服务端相互报告状态的接口,每次allocate()发生,并不一定就是

ApplicationMaster

在申请资源:

this.amLivelinessMonitor.receivedPing(appAttemptId);
           

然后,确认当前向自己发起请求的ApplicationMaster是否是一个已经记录的attempt(ApplicationMaster本身也是一个进程即一个attempt):

AllocateResponseLock lock = responseMap.get(appAttemptId);
    if (lock == null) {
      String message =
          "Application attempt " + appAttemptId
              + " doesn't exist in ApplicationMasterService cache.";
      LOG.error(message);
      throw new ApplicationAttemptNotFoundException(message);
    }           

关于

ApplicationMaster

的启动过程,我看到CSDN上有另外一篇博客:《Hadoop源码解析之ApplicationMaster启动流程》,本篇博客侧重讲了用户向

Yarn

提交应用一直到

ApplicationMaster

启动的过程。

确认

ApplicaitionMaster

的attempt的确是一个已经记录过的attempt,那么,为了保证有序,还必须对

ApplicationMaster

本次的

allocate()

请求的时机尽心验证,即确认这个

ApplicaitionMaster

是一个已经注册过(即掉用过

registerApplicationMaster()

接口的)的

ApplicationMaster

Yarn

通过一个序列号进行管理:

AllocateResponse lastResponse = lock.getAllocateResponse();
      //校验AM是否注册过
      if (!hasApplicationMasterRegistered(appAttemptId)) {
        String message =
            "AM is not registered for known application attempt: " + appAttemptId
                + " or RM had restarted after AM registered . AM should re-register.";
        LOG.info(message);
        RMAuditLogger.logFailure(
          this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
            .getUser(), AuditConstants.AM_ALLOCATE, "",
          "ApplicationMasterService", message, applicationId, appAttemptId);
        throw new ApplicationMasterNotRegisteredException(message);
      }

      //请求中序列号为上次请求的序列号,说明是一次重复请求,则直接返回上次的response
      if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
        /* old heartbeat */
        return lastResponse;
      } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
          //request里面的id是更早以前的,直接判定非法
        String message =
            "Invalid responseId in AllocateRequest from application attempt: "
                + appAttemptId + ", expect responseId to be "
                + (lastResponse.getResponseId() + 1);
        throw new InvalidApplicationMasterRequestException(message);
      }           

在客户端提交一个应用的时候,

ApplicationMasterService.registerAppAttempt()

会在response中设置responseId为-1

然后,

ApplicationMaster

进程开始运行,通过

registerApplicationMaster()

会在response中设置id为0,即增1,每次请求,

ApplicationMaster

自己都会携带上一次收到的响应的responseId,同时,

ApplicaitonMaster

会保留上一次的response对象,通过对比此次请求的序号和上一次的response的序号以校验合法性:

  • 如果请求中的sequenceId与上次请求id一致,则说明这次是一个新的、有序请求,合法
  • 如果请求中的sequenceId+1等于上次请求的sequence id,说明这次请求试一次重复请求,这时候直接将上一次的response再返回一次
  • 如果请求中的sequenceId+1小于上次请求的sequence id,说明这是一次非常陈旧的请求,直接抛出异常。

校验完毕,

ApplicationMasterServer

会把请求中的进度信息同步到自己维护的数据结构中:

//过滤非法的进度信息,进度信息用一个浮点数表示,代表进程执行的百分比
      //filter illegal progress values
      float filteredProgress = request.getProgress();
      if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
        || filteredProgress < 0) {
         request.setProgress(0);
      } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
        request.setProgress(1);
      }

      // Send the status update to the appAttempt.
      //将ApplicationMaster返回到关于进度的信息,更新到ReSourceManager所维护的appAttempt中去,
      //使得这两部分信息保持一致,   this.rmContext.getDispatcher()是AsyncDispatcher,得到的
      //eventHandler是ApplicationAttemptEventDispatcher
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));           

首先会对request中携带的进度信息进行校验和格式化,然后,是下面这段代码:

this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));           

这一小段代码的目的就是为了把

ApplicationMaster

返回的应用的执行进度消息更新到RM端所维护的对应的AppAttempt中(

ResourceManager

端维护的关于Attempt的对象叫做

RMAppAttemptImpl

),这一段代码涉及到Yarn的两个重要核心组件,状态机(

StateMachine

)和分派器(Dispatcher),每一个组件都可以用一篇长文才能解释清楚,有兴趣的读者可以自行google调研。我这里简单分析:

Hadoop调度器即分派器,其实就是根据当前发生的事件,根据自己保存的事件(event)和事件处理器(eventHandler)之间的关系,拿出对应的事件处理器(eventHandler)对这个事件进行处理。那么,这个对应关系是怎么建立起来的呢?这要求分派器工作之前,要想让分派器能够处理某个事件,必须首先向分派器注册(register)事件处理器(eventHandler)。因此,从上述代码中,我们必须要找到对应的

eventHandler

,即找到注册代码的位置,才知道这段代码实际上做了什么!

this.rmContext

ResourceManager

维护的最核心的类,用来对

ResourceManager

的一些核心信息比如配置文件、

ResourceManager

的管理服务AdminServer、HA是否打开等信息进行管理,实现类是

RMContextImpl

我们来看

ResourceManager

关于

rmContext

的设置。在

ResourceManager.serviceInit()

方法中,创建了

RMContextImpl

对象:

protected void serviceInit(Configuration conf) throws Exception {
        this.conf = conf;
        this.rmContext = new RMContextImpl();
        //省略
        rmDispatcher = setupDispatcher();
        addIfService(rmDispatcher);
        rmContext.setDispatcher(rmDispatcher);//设置事件分派器,即AsyncDispatcher
        }           

显然,

this.rmContext.getDispatcher()

获取的就是这个

rmDispatcher

。在

ResourceManager.RMActiveServices.serviceInit()

中,我们看到了对

rmDispatcher

进行各种注册的代码片段:

@Override
        protected void serviceInit(Configuration configuration) throws Exception {

           //省略代码
          // Register event handler for RmAppAttemptEvents
          //将RmAppAttemptEvent交给RMAppAttempt去维护,其实现类是RMAppAttemptImpl
          rmDispatcher.register(RMAppAttemptEventType.class,
              new ApplicationAttemptEventDispatcher(rmContext));
           //省略代码
        }           

RMAppAttemptStatusupdateEvent

这个事件的事件类型就是

RMAppAttemptEventType

,因此,

this.rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptStatusupdateEvent(appAttemptId, request .getProgress()));

实际上是将这个事件交付给了

ApplicationAttemptEventDispatcher

进行处理,即

this.rmContext.getDispatcher().getEventHandler()

获取的handler实际上是

ApplicationAttemptEventDispatcher

。我们来看其

handle()

方法:

@Override
    public void handle(RMAppAttemptEvent event) {
      //从事件对象中取出attemptid对象
      ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
      //取出对应的application信息
      ApplicationId appAttemptId = appAttemptID.getApplicationId();
      //通过applicationId信息取出对应的服务器端Appliatoin信息
      RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId);

      if (rmApp != null) {
         //一个Applicaiton由一个活着多个attempt信息,根据attemptid取出这个attempt
        RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);
        if (rmAppAttempt != null) {
          try {
            //调用这个attempt即RMAppAttemptImpl.handle()
            rmAppAttempt.handle(event);
          } catch (Throwable t) {
            LOG.error("Error in handling event type " + event.getType()
                + " for applicationAttempt " + appAttemptId, t);
          }
        }
      }
    }           

代码注释详细解释了事件交付流程,不再赘述。总之我们可以看到,一个应用的

ApplicationMaster

发送过来的应用执行进度信息,最终被ApplicationMasterService更新到了服务端所维护的应用信息中去,从而使得服务端关于应用执行的进度等信息与各个计算节点的实际信息一致。

同步完信息以后,开始解析request中的资源请求,包括新的资源请求(ask)和资源释放请求(release),来看:

//新的资源请求

List<ResourceRequest> ask = request.getAskList();
      //NodeManager已经释放的container信息
      List<ContainerId> release = request.getReleaseList();
      //黑名单信息,不希望自己的container分配到这些机器上
      ResourceBlacklistRequest blacklistRequest =
          request.getResourceBlacklistRequest();
      //添加到黑名单中的资源list
      List<String> blacklistAdditions =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
      //应该从黑名单中移除的资源名称的list
      List<String> blacklistRemovals =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;           

提取完请求信息,开始对这些信息进行校验:

RMApp app =
          this.rmContext.getRMApps().get(applicationId);

      // set label expression for Resource Requests if resourceName=ANY 
      ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
      for (ResourceRequest req : ask) {
         //ResourceRequest.ANY代表这个资源分派请求对机器不挑剔,集群中任何机器都行
        if (null == req.getNodeLabelExpression()
            && ResourceRequest.ANY.equals(req.getResourceName())) {
          //如果这个资源请求不挑机器,并且没有设置nodeLabel, 那么就将nodeLabel设置为
         //客户端提交应用时候指定的nodelabel,当然,有可能客户端提交应用的时候没有指定nodeLabel
          req.setNodeLabelExpression(asc.getNodeLabelExpression());
        }
      }           
//完整性检查,包括规范化NodeLabel , 同时对资源合法性进行校验
      try {
        RMServerUtils.normalizeAndValidateRequests(ask,
            rScheduler.getMaximumResourceCapability(), app.getQueue(),
            rScheduler, rmContext);
      } catch (InvalidResourceRequestException e) {
        LOG.warn("Invalid resource ask by application " + appAttemptId, e);
        throw e;
      }           

这段代码做的工作有两个:

  • 对请求中携带的nodeLabel信息进行格式化,关于NodeLabel特性,大家可以看Yarn的官方文档《YARN Node Labels》
private static void normalizeNodeLabelExpressionInRequest(
      ResourceRequest resReq, QueueInfo queueInfo) {
    String labelExp = resReq.getNodeLabelExpression();
    //如果这个resReq中没有nodelabel,并且对机器不挑剔,而且所请求的队列有label,则
    //直接把请求的label设置为所在队列的label
    if (labelExp == null && queueInfo != null && ResourceRequest.ANY
        .equals(resReq.getResourceName())) {
      labelExp = queueInfo.getDefaultNodeLabelExpression();
    }
    //如果还是没有nodeLabel,则规范化为RMNodeLabelsManager.NO_LABEL
    if (labelExp == null) {
      labelExp = RMNodeLabelsManager.NO_LABEL;
    }
    resReq.setNodeLabelExpression(labelExp);
  }           
  • 对请求中的资源进行校验。代码如下:
private static void validateResourceRequest(ResourceRequest resReq,
      Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
      throws InvalidResourceRequestException {
    //判断请求中的内存为非负数并且小于最大资源量
    if (resReq.getCapability().getMemory() < 0 ||
        resReq.getCapability().getMemory() > maximumResource.getMemory()) {
      throw new InvalidResourceRequestException("Invalid resource request"
          + ", requested memory < 0"
          + ", or requested memory > max configured"
          + ", requestedMemory=" + resReq.getCapability().getMemory()
          + ", maxMemory=" + maximumResource.getMemory());
    }
  //判断请求中的vCPU为非负数并且小于最大资源量 
    if (resReq.getCapability().getVirtualCores() < 0 ||
        resReq.getCapability().getVirtualCores() >
        maximumResource.getVirtualCores()) {
      throw new InvalidResourceRequestException("略");
    }
    String labelExp = resReq.getNodeLabelExpression();

    // we don't allow specify label expression other than resourceName=ANY now
    //不允许在resourceName != ANY的情况下指定nodelabel,现在还不支持如此
    if (!ResourceRequest.ANY.equals(resReq.getResourceName())
        && labelExp != null && !labelExp.trim().isEmpty()) {
      throw new InvalidResourceRequestException("略");
    }

    //不允许通过&&连接符指定多个nodelabel
    // we don't allow specify label expression with more than one node labels now
    if (labelExp != null && labelExp.contains("&&")) {
      throw new InvalidResourceRequestException("略");
    }

    //确保请求中的nodelabel属于队列的nodelabel,并且,保证请求中的nodelabel是集群允许的
    if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
      if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
          labelExp, rmContext)) {
        throw new InvalidResourceRequestException("略");
      }
    }
  }           

在资源限制部分,是为了保证所有请求的资源大小都小于yarn-site.xml中所配置的最大资源请求量:

<property>
                <name>yarn.scheduler.maximum-allocation-mb</name>
                <value>102400</value>
        </property>
        <property>
                <name>yarn.scheduler.maximum-allocation-vcores</name>
                <value>102400</value>
        </property>           

同时,在对nodeLabel进行了规范化和格式化以后,对nodeLabel进行了最后的合法性检查:

  • 目前不支持通过&&连接符指定多个nodelabel
  • 经过前面的nodelabel格式化,请求中的nodelabel必须是所在队列的多个label中的一个,这个是强制要求。同时,这个nodelabel必须是整个yarn集群的多个nodelabel中的一个,不可以是一个谁也不知道的nodelabel,这同样是硬性要求。

在完成了对nodelabel的校验和请求中的资源合法性校验以后,开始对请求中的黑名单进行校验,很简单,确保blacklist是合法的,至少,不可以是

ResourceRequest.ANY

,即把所有资源拉进了黑名单。。

try {
          //对黑名单资源进行检查
        RMServerUtils.validateBlacklistRequest(blacklistRequest);
      }  catch (InvalidResourceBlacklistRequestException e) {
        LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
        throw e;
      }           

然后,对资源释放请求(release)进行校验,其中涉及到

work-preserving AppliationMaster Restart

的概念,这还是一个开发之中的open issue,所以官网文档上并没有,这个issue的详细内容在这里。这个功能意味着,如果ApplicationMaster发生异常关闭,依然保留正在运行的container 让他们继续运行,当ApplicationMaster重启以后,container重新注册到ApplicationMaster运行。显然,在这种情况下,某些container中的保存的ApplicationMaster的attemptid信息是以前的ApplicationMaster的attemptid,与当前的ApplicationMaster的attemtId不同。只有打开了

work-preserving AppliationMaster Restart

功能,才允许这种情况,否则,直接抛出异常,因为绝对不允许资源释放请求来源于另外一个ApplicationMaster。

//在work-preserving 关闭的情况下,不应该发生申请释放的container的applicationAttemptId
      //与当前AM的attemptId不一致的 情况,如果发生,则抛出异常
      if (!app.getApplicationSubmissionContext()
        .getKeepContainersAcrossApplicationAttempts()) {
        try {
          //确认释放请求中所有的container都是当前这个application的id
          //如果真的发生了AM restart并且work-preserving AM restart打开,那么这些container中包含的
            //getApplicationAttemptId应该与重启以后的ApplicationAttemptId不同,这时候这个
          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
        } catch (InvalidContainerReleaseException e) {
          LOG.warn("Invalid container release by application " + appAttemptId, e);
          throw e;
        }
      }           

然后,我们真的到了最重要最重要的环节:资源分配,代码是:

Allocation allocation =
          this.rScheduler.allocate(appAttemptId, ask, release, 
              blacklistAdditions, blacklistRemovals);           

Yarn的资源分配是由可配置的Scheduler进行的,以FairScheduler为例,在yarn-site.xml中:

<property>
                <name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
        </property>           

如果我们配置的调度器是FairScheduler,那么,以上代码实际调用的就是

FairScheduler.allocate()

。这一部分涉及到的东西很多,有必要用单独的一篇文章来解释,在这里不做赘述。

总之,调度器通过资源分配,返回一个Allocation对象,保存了资源分配和资源抢占相关结果:

public class Allocation {

  final List<Container> containers;
  final Set<ContainerId> strictContainers;//application.getPreemptionContainers()的list
  final Set<ContainerId> fungibleContainers;
  final List<ResourceRequest> fungibleResources;
  final List<NMToken> nmTokens;
  private Resource resourceLimit;
  //省略
}           
  • containers 分配给对应

    ApplicationMaster

    的资源
  • strictContainers 严格抢占的container,这些container当前还没有被抢占,只是标记为被强占,这些container在真正被强占前通过response告知AM,方便AM保存现场等操作,一旦超过指定时间,这些container就会被抢占和回收
  • fungibleContainers 灵活抢占的container,fungible在Yarn中被叫negotiable,即可协商的。这些container也被标记为被抢占,但是,还有回旋余地,如果

    fungibleResources

    中描述的资源请求能够被满足,则

    fungibleContainers

    不会被抢占
  • fungibleResources 如果

    fungibleResources

    中描述的资源能够被这个

    AppliationMaster

    返回给

    ResourceManager

    ,那么

    fungibleContainers

    就不会被抢占
  • nmTokens

    NodeManager

    的token信息,AM拿到这些

    NodeManager

    的token,就拥有了同这些

    NodeManager

    进行沟通的

    Authentication

  • resourceLimit 集群目前对这个

    ApplicationMaster

    来说可用的资源

FairScheduler

中并没有用到

fungibleContainers

fungibleResources

,只有

CapacityScheduler

会使用到。是在

CapacityScheduler

进行资源分配的时候,大家看

CapacityScheduler.allocate()

->

FiCaSchedulerApp.getAllocation()

这个代码,可以发现,

fungibleResources

其实就是与

strictContainers

等量的资源,即,

CapacityScheduler

希望抢占

strictContainers

中的container,但是,如果ApplicationMaster能够返回给我等量的可用资源,那

CapacityScheduler

也能同意,即不限制抢占的具体的containerId,只要满足抢占的资源量就行。

好了,现在调度器已经通过Allocation对象返回了资源信息,下面就需要将这个对象封装到response中,返回给ApplicationMaster:

//设置已经为这个application分配的container信息到response中
      allocateResponse.setAllocatedContainers(allocation.getContainers());
      //设置已经完成的container的状态信息到response中
      allocateResponse.setCompletedContainersStatuses(appAttempt
          .pullJustFinishedContainers());
      //responseID自增1,放到response中
      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
      //设置集群中可用的资源信息到response中
      allocateResponse.setAvailableResources(allocation.getResourceLimit());
      //设置集群中可用节点的数目信息到response中
      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
      // add preemption to the allocateResponse message (if any)
      //设置抢占信息到response中
      allocateResponse
          .setPreemptionMessage(generatePreemptionMessage(allocation));
      // update AMRMToken if the token is rolled-up
       //略
      }           

以上就是

ApplicationMaster

ResourceManager

之间基于

applicationmaster_protocol.proto

协议的核心接口

allocate()

的核心内容。

对这部分知识的理解,有助于我们更好的管理Yarn集群,同时,以这个

allocate()

接口为中心涉及到的状态机、事件分派器和服务化模块化设计,都是hadoop的核心思想中的核心,一次理解难度较大,学习门槛很高,需要很长时间,由点到面,从最开始对hadoop的所有东西一无所知,然后慢慢到对大多数组件有所了解,再把对大多数组件的不完整的了解串联起来,最终形成对yarn的所有组件的全面和深入的认识。

继续阅读