天天看點

YARN ApplicationMaster與ResourceManager之間基于applicationmaster_protocol.proto協定的allocate()接口源碼解析

Yarn Application

運作期間,

ApplicationMaster

相當于這個Application的監護人和管理者,負責監控、管理這個Application的所有

Attempt

在cluster中各個節點上的具體運作,同時負責向

Yarn

ResourceManager

申請資源、返還資源等。可以說,

ApplicationMaster

ResourceManager

之間的通信是整個Yarn應用從送出到運作的最核心部分,是Yarn對整個叢集進行動态資源管理的根本步驟,Yarn的動态性,就是來源于多個Application的

ApplicationMaster

動态地和

ResourceManager

進行溝通,不斷地申請、釋放、再申請、再釋放資源的過程。是以,我們一起通過具體實作,來看

ApplicationMaster

從第一次向Yarn注冊自己一直到申請和釋放資源時服務端處理的全過程。

注意,本文隻關注服務端即ResourceManager對ApplicationMaster用戶端的相關請求的處理邏輯,而用戶端即ApplicationMaster關于建立請求、監控應用的實作原理,有興趣的讀者可以自己去閱讀代碼。提示一下,對于MapReduce任務,用戶端ApplicationMaster的實作是MRAppMaster,讀者可以從這裡看進去,了解用戶端注冊、申請資源的過程。同時,董西成董部落格《YARN/MRv2 MRAppMaster深入剖析—整體架構》關于MRAppMaster也有比較上層的講解。

ApplicationMaster

ResourceManager

之間的通信協定也是基于

Protobuf

協定進行的,協定的檔案定義在檔案

applicationmaster_protocol.proto

中:

service ApplicationMasterProtocolService {
  rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);
  rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
  rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);
}
           

整個

RPC

通信過程中,

ApplicationMaster

RPC Client

的角色,而

ResourceManager

屬于RPC Server的角色。Hadoop RPC的原理和實作,可以參考我的另外兩篇部落格:《Hadoop 基于protobuf 的RPC的用戶端實作原理》和 《Hadoop 基于protobuf 的RPC的伺服器端實作原理》

其實,

ResourceManager

是委托

ApplicationMasterService

來代替自己進行資源管理的。我在其它博文中也多次提到了Yarn的子產品化設計,

ResourceManager

本身被抽象為一個

Service

,并且它是由很多個子

Service

組成,

ApplicationMasterService

就是其中一個。通過一個簡單示意圖,我們看看這些

Service

之間的層次關系:

YARN ApplicationMaster與ResourceManager之間基于applicationmaster_protocol.proto協定的allocate()接口源碼解析

從上圖的服務層級關系可以看到,負責與

AppliationMaster

進行溝通的服務

ApplicationMasterService

RMActiveServices

服務的一個子服務,

RMActiveServices

是專門用來給HA模式下的

Active ResourceManager

運作的服務,而

Standby

ResourceManager

是不會啟動這個服務的。這個邏輯我們從

ResourceManager

的一段代碼中很容易看到:

//啟動resourceManager服務
  @Override
  protected void serviceStart() throws Exception {
    if (this.rmContext.isHAEnabled()) {
      transitionToStandby(true);//如果HA模式打開,則直接進入standBy模式,後續會通過ZK決定是否成為Active
    } else {
      transitionToActive();//如果HA模式沒有打開,則直接進入Active模式
    }
//省略
  }           

HA模式下,兩台

ResourceManager

啟動的時候,都是直接進入Standby模式,然後再去競争以擷取Active的角色和身份。非HA模式下,隻有一台

ResourceManager

,自然是直接進入Active模式。

ApplicationMaster

心跳和資源申請、釋放接口

allocate()

是Yarn的資源申請、釋放動态過程的最關鍵接口,該接口的主要職責和功能包括:

  • 心跳: 周期性通過allcate接口告知

    ResourceManager

    自己依然是alive的狀态;
  • 資源請求:通過一個或者多個

    ResourceRequest

    ResourceManager

    發起資源請求;
  • 黑名單清單:向

    ResourceManager

    提供黑名單清單,

    ResourceManager

    收到該清單以後,不會向

    ApplicationMaster

    配置設定黑名單清單機器上的任何資源
  • 伺服器傳回:

    ResourceManager

    會在響應資訊裡面告知AppliationMaster關于已經配置設定的container資訊、這個

    Appliation

    已經完成的container資訊以及這個應用的headroom(個人了解是剩餘可用資源資訊,

    AplicationMaster

    可以根據headroom資訊決定如何使用已經配置設定的資源以及如何明智地決定以後的資源申請)的作用。

一個Application的每個應用被分布到不同的節點執行,

ResourceManager

不會直接和所有節點上的所有attempt通信,而是由

ApplicationMaster

與自己的所有attempt通信,把資訊通過

allocate()

接口發送給

ApplicationMasterService(

ResourceManager

),

ResourceManager

則維護了所有

Application

Container

attempt

資訊,通過心跳,RM端維護的資訊雖然肯定稍有延遲,但是卻能不斷被更新和同步。

好了,我們直接來上allocate這個關鍵方法的代碼,代碼比較多,我将所有關鍵代碼添加了注釋,同時會對代碼進行一一解析。代碼的複雜性代表了場景的複雜性,這也無法避免。

public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {

    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();

    ApplicationAttemptId appAttemptId =
        amrmTokenIdentifier.getApplicationAttemptId();
    ApplicationId applicationId = appAttemptId.getApplicationId();

    //每次方法調用都是一次心跳資訊,是以記錄此次心跳資訊
    this.amLivelinessMonitor.receivedPing(appAttemptId);

    /* check if its in cache */
    //驗證RM端是否已經有了ApplationMaster程序的attemptid資訊
    //正常情況下,ApplicationMaster對應的程序的attemp在啟動的時候應該注冊給AMS,即記錄在responseMap中
    AllocateResponseLock lock = responseMap.get(appAttemptId);
    if (lock == null) {
      String message =
          "Application attempt " + appAttemptId
              + " doesn't exist in ApplicationMasterService cache.";
      LOG.error(message);
      throw new ApplicationAttemptNotFoundException(message);
    }
    synchronized (lock) {
      //ApplicationMaster每次與AMS互動,都會生成并記錄一個AllocateResponse,AllocateResponse
      //中記錄的互動Id每次互動都會遞增。從registerAppAtempt()中設定為-1,registerApplicationMaster()
      //設定為0, 以後開始每次互動均遞增
      AllocateResponse lastResponse = lock.getAllocateResponse();
      //校驗AM是否注冊過
      if (!hasApplicationMasterRegistered(appAttemptId)) {
        String message =
            "AM is not registered for known application attempt: " + appAttemptId
                + " or RM had restarted after AM registered . AM should re-register.";
        LOG.info(message);
        RMAuditLogger.logFailure(
          this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
            .getUser(), AuditConstants.AM_ALLOCATE, "",
          "ApplicationMasterService", message, applicationId, appAttemptId);
        throw new ApplicationMasterNotRegisteredException(message);
      }

      //請求中序列号為上次請求的序列号,說明是一次重複請求,則直接傳回上次的response
      if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
        /* old heartbeat */
        return lastResponse;
      } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
          //request裡面的id是更早以前的,直接判定非法
        String message =
            "Invalid responseId in AllocateRequest from application attempt: "
                + appAttemptId + ", expect responseId to be "
                + (lastResponse.getResponseId() + 1);
        throw new InvalidApplicationMasterRequestException(message);
      }

      //過濾非法的進度資訊,進度資訊用一個浮點數表示,代表程序執行的百分比
      //filter illegal progress values
      float filteredProgress = request.getProgress();
      if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
        || filteredProgress < 0) {
         request.setProgress(0);
      } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
        request.setProgress(1);
      }

      // Send the status update to the appAttempt.
      //将ApplicationMaster傳回到關于進度的資訊,更新到ReSourceManager所維護的appAttempt中去,
      //使得這兩部分資訊保持一緻,   this.rmContext.getDispatcher()是AsyncDispatcher,得到的
      //eventHandler是ApplicationAttemptEventDispatcher
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));

      //新的資源請求
      List<ResourceRequest> ask = request.getAskList();
      //NodeManager已經釋放的container資訊
      List<ContainerId> release = request.getReleaseList();
      //黑名單資訊,不希望自己的container配置設定到這些機器上
      ResourceBlacklistRequest blacklistRequest =
          request.getResourceBlacklistRequest();
      //添加到黑名單中的資源list
      List<String> blacklistAdditions =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
      //應該從黑名單中移除的資源名稱的list
      List<String> blacklistRemovals =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
      //ResourceManager維護的這個application的資訊,運作時,這個app是一個RMAppImpl
      RMApp app =
          this.rmContext.getRMApps().get(applicationId);

      // set label expression for Resource Requests if resourceName=ANY 
      ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
      for (ResourceRequest req : ask) {
        if (null == req.getNodeLabelExpression()
            && ResourceRequest.ANY.equals(req.getResourceName())) {
          req.setNodeLabelExpression(asc.getNodeLabelExpression());
        }
      }

      //完整性檢查,包括規範化NodeLabel , 同時對資源合法性進行校驗
      try {
        RMServerUtils.normalizeAndValidateRequests(ask,
            rScheduler.getMaximumResourceCapability(), app.getQueue(),
            rScheduler, rmContext);
      } catch (InvalidResourceRequestException e) {
        LOG.warn("Invalid resource ask by application " + appAttemptId, e);
        throw e;
      }

      try {
          //對黑名單資源進行檢查
        RMServerUtils.validateBlacklistRequest(blacklistRequest);
      }  catch (InvalidResourceBlacklistRequestException e) {
        LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
        throw e;
      }

      // In the case of work-preserving AM restart, it's possible for the
      // AM to release containers from the earlier attempt.
      //在work-preserving 關閉的情況下,不應該發生申請釋放的container的applicationAttemptId
      //與目前AM的attemptId不一緻的 情況,如果發生,則抛出異常
      if (!app.getApplicationSubmissionContext()
        .getKeepContainersAcrossApplicationAttempts()) {
        try {
          //确認釋放請求中所有的container都是目前這個application的id
          //如果真的發生了AM restart并且work-preserving AM restart打開,那麼這些container中包含的
            //getApplicationAttemptId應該與重新開機以後的ApplicationAttemptId不同,這時候這個
          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
        } catch (InvalidContainerReleaseException e) {
          LOG.warn("Invalid container release by application " + appAttemptId, e);
          throw e;
        }
      }

      // Send new requests to appAttempt.
      //如果我們使用的是fairScheduler,則調用的是FairScheduler.allocate()
      Allocation allocation =
          this.rScheduler.allocate(appAttemptId, ask, release, 
              blacklistAdditions, blacklistRemovals);

      if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
        LOG.info("blacklist are updated in Scheduler." +
            "blacklistAdditions: " + blacklistAdditions + ", " +
            "blacklistRemovals: " + blacklistRemovals);
      }
      RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
      AllocateResponse allocateResponse =
          recordFactory.newRecordInstance(AllocateResponse.class);
      if (!allocation.getContainers().isEmpty()) {
        allocateResponse.setNMTokens(allocation.getNMTokens());
      }

      // update the response with the deltas of node status changes
      //設定response中所有節點的資訊
      List<RMNode> updatedNodes = new ArrayList<RMNode>();
      if(app.pullRMNodeUpdates(updatedNodes) > 0) {//将節點資訊放入到updatedNodes中
        List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
        for(RMNode rmNode: updatedNodes) {
          SchedulerNodeReport schedulerNodeReport =  
              rScheduler.getNodeReport(rmNode.getNodeID());
          Resource used = BuilderUtils.newResource(0, 0);
          int numContainers = 0;
          if (schedulerNodeReport != null) {
            used = schedulerNodeReport.getUsedResource();
            numContainers = schedulerNodeReport.getNumContainers();
          }
          NodeId nodeId = rmNode.getNodeID();
          NodeReport report =
              BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
                  rmNode.getHttpAddress(), rmNode.getRackName(), used,
                  rmNode.getTotalCapability(), numContainers,
                  rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
                  rmNode.getNodeLabels());

          updatedNodeReports.add(report);
        }
        allocateResponse.setUpdatedNodes(updatedNodeReports);
      }

      //已經為這個application配置設定的資訊
      allocateResponse.setAllocatedContainers(allocation.getContainers());
      //已經完成的container的狀态
      allocateResponse.setCompletedContainersStatuses(appAttempt
          .pullJustFinishedContainers());
      //responseID自增1
      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);

      allocateResponse.setAvailableResources(allocation.getResourceLimit());

      //叢集中可用節點的數目
      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());

      // add preemption to the allocateResponse message (if any)
      allocateResponse
          .setPreemptionMessage(generatePreemptionMessage(allocation));

      // update AMRMToken if the token is rolled-up
      MasterKeyData nextMasterKey =
          this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();

      if (nextMasterKey != null
          && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
            .getKeyId()) {
        RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
        Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
        if (nextMasterKey.getMasterKey().getKeyId() !=
            appAttemptImpl.getAMRMTokenKeyId()) {
          LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
              + " to application: " + applicationId);
          amrmToken = rmContext.getAMRMTokenSecretManager()
              .createAndGetAMRMToken(appAttemptId);
          appAttemptImpl.setAMRMToken(amrmToken);
        }
        allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
          .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
            .toString(), amrmToken.getPassword(), amrmToken.getService()
            .toString()));
      }

      /*
       * As we are updating the response inside the lock object so we don't
       * need to worry about unregister call occurring in between (which
       * removes the lock object).
       */
      lock.setAllocateResponse(allocateResponse);
      return allocateResponse;
    }    
  }           

在allocate()方法的最開始,是通過Token機制,獲得發起請求的

ApplicationMaster

資訊(

ApplicationAttemptId

,代表了運作

ApplicationMaster

的這個attempt)以及這個

ApplicationMaster

所管理的Application的資訊(ApplicationId對象),如以下代碼:

AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();

    ApplicationAttemptId appAttemptId =
        amrmTokenIdentifier.getApplicationAttemptId();
    ApplicationId applicationId = appAttemptId.getApplicationId();
           

關于基于Token的Hadoop認證機制,大家可以參考董西成的部落格:

《Hadoop 2.0 (YARN)中的安全機制概述》

然後,再次重申,

allocate()

方法的職責并不真的如同方法名稱一樣代表資源配置設定,從根本上說也是心跳以及用戶端和服務端互相報告狀态的接口,每次allocate()發生,并不一定就是

ApplicationMaster

在申請資源:

this.amLivelinessMonitor.receivedPing(appAttemptId);
           

然後,确認目前向自己發起請求的ApplicationMaster是否是一個已經記錄的attempt(ApplicationMaster本身也是一個程序即一個attempt):

AllocateResponseLock lock = responseMap.get(appAttemptId);
    if (lock == null) {
      String message =
          "Application attempt " + appAttemptId
              + " doesn't exist in ApplicationMasterService cache.";
      LOG.error(message);
      throw new ApplicationAttemptNotFoundException(message);
    }           

關于

ApplicationMaster

的啟動過程,我看到CSDN上有另外一篇部落格:《Hadoop源碼解析之ApplicationMaster啟動流程》,本篇部落格側重講了使用者向

Yarn

送出應用一直到

ApplicationMaster

啟動的過程。

确認

ApplicaitionMaster

的attempt的确是一個已經記錄過的attempt,那麼,為了保證有序,還必須對

ApplicationMaster

本次的

allocate()

請求的時機盡心驗證,即确認這個

ApplicaitionMaster

是一個已經注冊過(即掉用過

registerApplicationMaster()

接口的)的

ApplicationMaster

Yarn

通過一個序列号進行管理:

AllocateResponse lastResponse = lock.getAllocateResponse();
      //校驗AM是否注冊過
      if (!hasApplicationMasterRegistered(appAttemptId)) {
        String message =
            "AM is not registered for known application attempt: " + appAttemptId
                + " or RM had restarted after AM registered . AM should re-register.";
        LOG.info(message);
        RMAuditLogger.logFailure(
          this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
            .getUser(), AuditConstants.AM_ALLOCATE, "",
          "ApplicationMasterService", message, applicationId, appAttemptId);
        throw new ApplicationMasterNotRegisteredException(message);
      }

      //請求中序列号為上次請求的序列号,說明是一次重複請求,則直接傳回上次的response
      if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
        /* old heartbeat */
        return lastResponse;
      } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
          //request裡面的id是更早以前的,直接判定非法
        String message =
            "Invalid responseId in AllocateRequest from application attempt: "
                + appAttemptId + ", expect responseId to be "
                + (lastResponse.getResponseId() + 1);
        throw new InvalidApplicationMasterRequestException(message);
      }           

在用戶端送出一個應用的時候,

ApplicationMasterService.registerAppAttempt()

會在response中設定responseId為-1

然後,

ApplicationMaster

程序開始運作,通過

registerApplicationMaster()

會在response中設定id為0,即增1,每次請求,

ApplicationMaster

自己都會攜帶上一次收到的響應的responseId,同時,

ApplicaitonMaster

會保留上一次的response對象,通過對比此次請求的序号和上一次的response的序号以校驗合法性:

  • 如果請求中的sequenceId與上次請求id一緻,則說明這次是一個新的、有序請求,合法
  • 如果請求中的sequenceId+1等于上次請求的sequence id,說明這次請求試一次重複請求,這時候直接将上一次的response再傳回一次
  • 如果請求中的sequenceId+1小于上次請求的sequence id,說明這是一次非常陳舊的請求,直接抛出異常。

校驗完畢,

ApplicationMasterServer

會把請求中的進度資訊同步到自己維護的資料結構中:

//過濾非法的進度資訊,進度資訊用一個浮點數表示,代表程序執行的百分比
      //filter illegal progress values
      float filteredProgress = request.getProgress();
      if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
        || filteredProgress < 0) {
         request.setProgress(0);
      } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
        request.setProgress(1);
      }

      // Send the status update to the appAttempt.
      //将ApplicationMaster傳回到關于進度的資訊,更新到ReSourceManager所維護的appAttempt中去,
      //使得這兩部分資訊保持一緻,   this.rmContext.getDispatcher()是AsyncDispatcher,得到的
      //eventHandler是ApplicationAttemptEventDispatcher
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));           

首先會對request中攜帶的進度資訊進行校驗和格式化,然後,是下面這段代碼:

this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));           

這一小段代碼的目的就是為了把

ApplicationMaster

傳回的應用的執行進度消息更新到RM端所維護的對應的AppAttempt中(

ResourceManager

端維護的關于Attempt的對象叫做

RMAppAttemptImpl

),這一段代碼涉及到Yarn的兩個重要核心元件,狀态機(

StateMachine

)和分派器(Dispatcher),每一個元件都可以用一篇長文才能解釋清楚,有興趣的讀者可以自行google調研。我這裡簡單分析:

Hadoop排程器即分派器,其實就是根據目前發生的事件,根據自己儲存的事件(event)和事件處理器(eventHandler)之間的關系,拿出對應的事件處理器(eventHandler)對這個事件進行處理。那麼,這個對應關系是怎麼建立起來的呢?這要求分派器工作之前,要想讓分派器能夠處理某個事件,必須首先向分派器注冊(register)事件處理器(eventHandler)。是以,從上述代碼中,我們必須要找到對應的

eventHandler

,即找到注冊代碼的位置,才知道這段代碼實際上做了什麼!

this.rmContext

ResourceManager

維護的最核心的類,用來對

ResourceManager

的一些核心資訊比如配置檔案、

ResourceManager

的管理服務AdminServer、HA是否打開等資訊進行管理,實作類是

RMContextImpl

我們來看

ResourceManager

關于

rmContext

的設定。在

ResourceManager.serviceInit()

方法中,建立了

RMContextImpl

對象:

protected void serviceInit(Configuration conf) throws Exception {
        this.conf = conf;
        this.rmContext = new RMContextImpl();
        //省略
        rmDispatcher = setupDispatcher();
        addIfService(rmDispatcher);
        rmContext.setDispatcher(rmDispatcher);//設定事件分派器,即AsyncDispatcher
        }           

顯然,

this.rmContext.getDispatcher()

擷取的就是這個

rmDispatcher

。在

ResourceManager.RMActiveServices.serviceInit()

中,我們看到了對

rmDispatcher

進行各種注冊的代碼片段:

@Override
        protected void serviceInit(Configuration configuration) throws Exception {

           //省略代碼
          // Register event handler for RmAppAttemptEvents
          //将RmAppAttemptEvent交給RMAppAttempt去維護,其實作類是RMAppAttemptImpl
          rmDispatcher.register(RMAppAttemptEventType.class,
              new ApplicationAttemptEventDispatcher(rmContext));
           //省略代碼
        }           

RMAppAttemptStatusupdateEvent

這個事件的事件類型就是

RMAppAttemptEventType

,是以,

this.rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptStatusupdateEvent(appAttemptId, request .getProgress()));

實際上是将這個事件傳遞給了

ApplicationAttemptEventDispatcher

進行處理,即

this.rmContext.getDispatcher().getEventHandler()

擷取的handler實際上是

ApplicationAttemptEventDispatcher

。我們來看其

handle()

方法:

@Override
    public void handle(RMAppAttemptEvent event) {
      //從事件對象中取出attemptid對象
      ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
      //取出對應的application資訊
      ApplicationId appAttemptId = appAttemptID.getApplicationId();
      //通過applicationId資訊取出對應的伺服器端Appliatoin資訊
      RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId);

      if (rmApp != null) {
         //一個Applicaiton由一個活着多個attempt資訊,根據attemptid取出這個attempt
        RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);
        if (rmAppAttempt != null) {
          try {
            //調用這個attempt即RMAppAttemptImpl.handle()
            rmAppAttempt.handle(event);
          } catch (Throwable t) {
            LOG.error("Error in handling event type " + event.getType()
                + " for applicationAttempt " + appAttemptId, t);
          }
        }
      }
    }           

代碼注釋詳細解釋了事件傳遞流程,不再贅述。總之我們可以看到,一個應用的

ApplicationMaster

發送過來的應用執行進度資訊,最終被ApplicationMasterService更新到了服務端所維護的應用資訊中去,進而使得服務端關于應用執行的進度等資訊與各個計算節點的實際資訊一緻。

同步完資訊以後,開始解析request中的資源請求,包括新的資源請求(ask)和資源釋放請求(release),來看:

//新的資源請求

List<ResourceRequest> ask = request.getAskList();
      //NodeManager已經釋放的container資訊
      List<ContainerId> release = request.getReleaseList();
      //黑名單資訊,不希望自己的container配置設定到這些機器上
      ResourceBlacklistRequest blacklistRequest =
          request.getResourceBlacklistRequest();
      //添加到黑名單中的資源list
      List<String> blacklistAdditions =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
      //應該從黑名單中移除的資源名稱的list
      List<String> blacklistRemovals =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;           

提取完請求資訊,開始對這些資訊進行校驗:

RMApp app =
          this.rmContext.getRMApps().get(applicationId);

      // set label expression for Resource Requests if resourceName=ANY 
      ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
      for (ResourceRequest req : ask) {
         //ResourceRequest.ANY代表這個資源分派請求對機器不挑剔,叢集中任何機器都行
        if (null == req.getNodeLabelExpression()
            && ResourceRequest.ANY.equals(req.getResourceName())) {
          //如果這個資源請求不挑機器,并且沒有設定nodeLabel, 那麼就将nodeLabel設定為
         //用戶端送出應用時候指定的nodelabel,當然,有可能用戶端送出應用的時候沒有指定nodeLabel
          req.setNodeLabelExpression(asc.getNodeLabelExpression());
        }
      }           
//完整性檢查,包括規範化NodeLabel , 同時對資源合法性進行校驗
      try {
        RMServerUtils.normalizeAndValidateRequests(ask,
            rScheduler.getMaximumResourceCapability(), app.getQueue(),
            rScheduler, rmContext);
      } catch (InvalidResourceRequestException e) {
        LOG.warn("Invalid resource ask by application " + appAttemptId, e);
        throw e;
      }           

這段代碼做的工作有兩個:

  • 對請求中攜帶的nodeLabel資訊進行格式化,關于NodeLabel特性,大家可以看Yarn的官方文檔《YARN Node Labels》
private static void normalizeNodeLabelExpressionInRequest(
      ResourceRequest resReq, QueueInfo queueInfo) {
    String labelExp = resReq.getNodeLabelExpression();
    //如果這個resReq中沒有nodelabel,并且對機器不挑剔,而且所請求的隊列有label,則
    //直接把請求的label設定為所在隊列的label
    if (labelExp == null && queueInfo != null && ResourceRequest.ANY
        .equals(resReq.getResourceName())) {
      labelExp = queueInfo.getDefaultNodeLabelExpression();
    }
    //如果還是沒有nodeLabel,則規範化為RMNodeLabelsManager.NO_LABEL
    if (labelExp == null) {
      labelExp = RMNodeLabelsManager.NO_LABEL;
    }
    resReq.setNodeLabelExpression(labelExp);
  }           
  • 對請求中的資源進行校驗。代碼如下:
private static void validateResourceRequest(ResourceRequest resReq,
      Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
      throws InvalidResourceRequestException {
    //判斷請求中的記憶體為非負數并且小于最大資源量
    if (resReq.getCapability().getMemory() < 0 ||
        resReq.getCapability().getMemory() > maximumResource.getMemory()) {
      throw new InvalidResourceRequestException("Invalid resource request"
          + ", requested memory < 0"
          + ", or requested memory > max configured"
          + ", requestedMemory=" + resReq.getCapability().getMemory()
          + ", maxMemory=" + maximumResource.getMemory());
    }
  //判斷請求中的vCPU為非負數并且小于最大資源量 
    if (resReq.getCapability().getVirtualCores() < 0 ||
        resReq.getCapability().getVirtualCores() >
        maximumResource.getVirtualCores()) {
      throw new InvalidResourceRequestException("略");
    }
    String labelExp = resReq.getNodeLabelExpression();

    // we don't allow specify label expression other than resourceName=ANY now
    //不允許在resourceName != ANY的情況下指定nodelabel,現在還不支援如此
    if (!ResourceRequest.ANY.equals(resReq.getResourceName())
        && labelExp != null && !labelExp.trim().isEmpty()) {
      throw new InvalidResourceRequestException("略");
    }

    //不允許通過&&連接配接符指定多個nodelabel
    // we don't allow specify label expression with more than one node labels now
    if (labelExp != null && labelExp.contains("&&")) {
      throw new InvalidResourceRequestException("略");
    }

    //確定請求中的nodelabel屬于隊列的nodelabel,并且,保證請求中的nodelabel是叢集允許的
    if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
      if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
          labelExp, rmContext)) {
        throw new InvalidResourceRequestException("略");
      }
    }
  }           

在資源限制部分,是為了保證所有請求的資源大小都小于yarn-site.xml中所配置的最大資源請求量:

<property>
                <name>yarn.scheduler.maximum-allocation-mb</name>
                <value>102400</value>
        </property>
        <property>
                <name>yarn.scheduler.maximum-allocation-vcores</name>
                <value>102400</value>
        </property>           

同時,在對nodeLabel進行了規範化和格式化以後,對nodeLabel進行了最後的合法性檢查:

  • 目前不支援通過&&連接配接符指定多個nodelabel
  • 經過前面的nodelabel格式化,請求中的nodelabel必須是所在隊列的多個label中的一個,這個是強制要求。同時,這個nodelabel必須是整個yarn叢集的多個nodelabel中的一個,不可以是一個誰也不知道的nodelabel,這同樣是硬性要求。

在完成了對nodelabel的校驗和請求中的資源合法性校驗以後,開始對請求中的黑名單進行校驗,很簡單,確定blacklist是合法的,至少,不可以是

ResourceRequest.ANY

,即把所有資源拉進了黑名單。。

try {
          //對黑名單資源進行檢查
        RMServerUtils.validateBlacklistRequest(blacklistRequest);
      }  catch (InvalidResourceBlacklistRequestException e) {
        LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
        throw e;
      }           

然後,對資源釋放請求(release)進行校驗,其中涉及到

work-preserving AppliationMaster Restart

的概念,這還是一個開發之中的open issue,是以官網文檔上并沒有,這個issue的詳細内容在這裡。這個功能意味着,如果ApplicationMaster發生異常關閉,依然保留正在運作的container 讓他們繼續運作,當ApplicationMaster重新開機以後,container重新注冊到ApplicationMaster運作。顯然,在這種情況下,某些container中的儲存的ApplicationMaster的attemptid資訊是以前的ApplicationMaster的attemptid,與目前的ApplicationMaster的attemtId不同。隻有打開了

work-preserving AppliationMaster Restart

功能,才允許這種情況,否則,直接抛出異常,因為絕對不允許資源釋放請求來源于另外一個ApplicationMaster。

//在work-preserving 關閉的情況下,不應該發生申請釋放的container的applicationAttemptId
      //與目前AM的attemptId不一緻的 情況,如果發生,則抛出異常
      if (!app.getApplicationSubmissionContext()
        .getKeepContainersAcrossApplicationAttempts()) {
        try {
          //确認釋放請求中所有的container都是目前這個application的id
          //如果真的發生了AM restart并且work-preserving AM restart打開,那麼這些container中包含的
            //getApplicationAttemptId應該與重新開機以後的ApplicationAttemptId不同,這時候這個
          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
        } catch (InvalidContainerReleaseException e) {
          LOG.warn("Invalid container release by application " + appAttemptId, e);
          throw e;
        }
      }           

然後,我們真的到了最重要最重要的環節:資源配置設定,代碼是:

Allocation allocation =
          this.rScheduler.allocate(appAttemptId, ask, release, 
              blacklistAdditions, blacklistRemovals);           

Yarn的資源配置設定是由可配置的Scheduler進行的,以FairScheduler為例,在yarn-site.xml中:

<property>
                <name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
        </property>           

如果我們配置的排程器是FairScheduler,那麼,以上代碼實際調用的就是

FairScheduler.allocate()

。這一部分涉及到的東西很多,有必要用單獨的一篇文章來解釋,在這裡不做贅述。

總之,排程器通過資源配置設定,傳回一個Allocation對象,儲存了資源配置設定和資源搶占相關結果:

public class Allocation {

  final List<Container> containers;
  final Set<ContainerId> strictContainers;//application.getPreemptionContainers()的list
  final Set<ContainerId> fungibleContainers;
  final List<ResourceRequest> fungibleResources;
  final List<NMToken> nmTokens;
  private Resource resourceLimit;
  //省略
}           
  • containers 配置設定給對應

    ApplicationMaster

    的資源
  • strictContainers 嚴格搶占的container,這些container目前還沒有被搶占,隻是标記為被強占,這些container在真正被強占前通過response告知AM,友善AM儲存現場等操作,一旦超過指定時間,這些container就會被搶占和回收
  • fungibleContainers 靈活搶占的container,fungible在Yarn中被叫negotiable,即可協商的。這些container也被标記為被搶占,但是,還有回旋餘地,如果

    fungibleResources

    中描述的資源請求能夠被滿足,則

    fungibleContainers

    不會被搶占
  • fungibleResources 如果

    fungibleResources

    中描述的資源能夠被這個

    AppliationMaster

    傳回給

    ResourceManager

    ,那麼

    fungibleContainers

    就不會被搶占
  • nmTokens

    NodeManager

    的token資訊,AM拿到這些

    NodeManager

    的token,就擁有了同這些

    NodeManager

    進行溝通的

    Authentication

  • resourceLimit 叢集目前對這個

    ApplicationMaster

    來說可用的資源

FairScheduler

中并沒有用到

fungibleContainers

fungibleResources

,隻有

CapacityScheduler

會使用到。是在

CapacityScheduler

進行資源配置設定的時候,大家看

CapacityScheduler.allocate()

->

FiCaSchedulerApp.getAllocation()

這個代碼,可以發現,

fungibleResources

其實就是與

strictContainers

等量的資源,即,

CapacityScheduler

希望搶占

strictContainers

中的container,但是,如果ApplicationMaster能夠傳回給我等量的可用資源,那

CapacityScheduler

也能同意,即不限制搶占的具體的containerId,隻要滿足搶占的資源量就行。

好了,現在排程器已經通過Allocation對象傳回了資源資訊,下面就需要将這個對象封裝到response中,傳回給ApplicationMaster:

//設定已經為這個application配置設定的container資訊到response中
      allocateResponse.setAllocatedContainers(allocation.getContainers());
      //設定已經完成的container的狀态資訊到response中
      allocateResponse.setCompletedContainersStatuses(appAttempt
          .pullJustFinishedContainers());
      //responseID自增1,放到response中
      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
      //設定叢集中可用的資源資訊到response中
      allocateResponse.setAvailableResources(allocation.getResourceLimit());
      //設定叢集中可用節點的數目資訊到response中
      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
      // add preemption to the allocateResponse message (if any)
      //設定搶占資訊到response中
      allocateResponse
          .setPreemptionMessage(generatePreemptionMessage(allocation));
      // update AMRMToken if the token is rolled-up
       //略
      }           

以上就是

ApplicationMaster

ResourceManager

之間基于

applicationmaster_protocol.proto

協定的核心接口

allocate()

的核心内容。

對這部分知識的了解,有助于我們更好的管理Yarn叢集,同時,以這個

allocate()

接口為中心涉及到的狀态機、事件分派器和服務化子產品化設計,都是hadoop的核心思想中的核心,一次了解難度較大,學習門檻很高,需要很長時間,由點到面,從最開始對hadoop的所有東西一無所知,然後慢慢到對大多數元件有所了解,再把對大多數元件的不完整的了解串聯起來,最終形成對yarn的所有元件的全面和深入的認識。

繼續閱讀