天天看點

Hdfs NameNode中資料塊管理與資料節點管理分析

資料塊管理

       在上一節介紹了BlockManager中的資料塊副本狀态,主要是儲存各個資料塊副本狀态的存儲對象。名位元組點第二關系的管理包括資料塊管理和資料節點管理,其對資料塊的管理是依托于BlockManager類來實作的。

1、添加資料塊副本

        BlockManager.addStoredBlock()用于在blocksMap中添加/更新資料節點node上的資料塊副本block。當DataNode上寫入了一個新的資料塊副本或者完成了資料塊副本的複制操作後,其會調用遠端方法DatanodeProtocol.blockReport()或者DatanodeProtocol.blockReceivedAndDeleted()向名位元組點彙報該新增的副本,最終會調用BlockManager.addStoredBlock()方法,将資料塊副本和資料節點資訊更新到BlocksMap對象中。其基本流程及源碼如下:

Hdfs NameNode中資料塊管理與資料節點管理分析
private Block addStoredBlock(final BlockInfo block,
                             DatanodeStorageInfo storageInfo,
                             DatanodeDescriptor delNodeHint,
                             boolean logEveryBlock)
throws IOException {
  // ......
  if (storedBlock == null || storedBlock.getBlockCollection() == null) {
    // 目前block不屬于任何inode
    return block;
  }
  BlockCollection bc = storedBlock.getBlockCollection();
  
  // add block to the datanode
  // 在block->datanode映射中添加目前datanode
  boolean added = storageInfo.addBlock(storedBlock);

  // ......
  // 判斷副本對應的block狀态并進行狀态轉換
  if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
      numLiveReplicas >= minReplication) {
    storedBlock = completeBlock(bc, storedBlock, false);
  } else if (storedBlock.isComplete() && added) {
    namesystem.incrementSafeBlockCount(numCurrentReplica);
  }
  
  // if file is under construction, then done for now
  if (bc.isUnderConstruction()) {
    return storedBlock;
  }

  // do not try to handle over/under-replicated blocks during first safe mode
  if (!namesystem.isPopulatingReplQueues()) {
    return storedBlock;
  }

  // handle underReplication/overReplication
  // 判斷資料塊副本數量是否滿足,不滿足則複制添加到BlockManager.neededReplications中
  // 否則從neededReplications中移除
  short fileReplication = bc.getBlockReplication();
  if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
    neededReplications.remove(storedBlock, numCurrentReplica,
        num.decommissionedReplicas(), fileReplication);
  } else {
    updateNeededReplications(storedBlock, curReplicaDelta, 0);
  }
  
  // 判斷副本數量是否超出設定,需要删除, 選擇删除副本添加到excessReplicateMap中
  if (numCurrentReplica > fileReplication) {
    processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
  }
  
  // 
  if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
    invalidateCorruptReplicas(storedBlock);
  return storedBlock;
}
           

2、删除資料塊副本

    資料塊副本的删除包括如下3種情況:

  1. 資料塊副本所屬的檔案被删除,副本也就相應的删除掉。
  2. 資料塊副本數多于副本系數,多餘的副本會被删除。
  3. 資料塊副本數已經損壞,也需要删除損壞的副本。

    (1)删除檔案擁有的資料塊副本

        當删除目錄上的一個檔案時(實作在FSDirectory.delete()中),檔案擁有的資料塊通過FSNamesystem.removePathAndBlocks()最終調用BlockManager.removeBlocks()調用ddToInvalidates()方法将資料塊對應的所有副本添加到invalidateBlocks待删除對象中。然後再從BlockManager中的資料塊儲存對象中删除;包括從blocksMap中删除;删除corruptRelcas中可能存在的副本損壞記錄;由于資料塊被删除,即使有副本損壞,也都不需要再進行資料塊複制,所有管理資料塊複制對象中也需要删除:如pendingReplications、neededReplications等。

public void removeBlock(Block block) {
  // ......
  addToInvalidates(block);
  removeBlockFromMap(block);
  // Remove the block from pendingReplications and neededReplications
  pendingReplications.remove(block);
  neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
  if (postponedMisreplicatedBlocks.remove(block)) {
    postponedMisreplicatedBlocksCount.decrementAndGet();
  }
}

private void addToInvalidates(Block b) {
  if (!namesystem.isPopulatingReplQueues()) {
    return;
  }
  for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
    final DatanodeDescriptor node = storage.getDatanodeDescriptor();
    invalidateBlocks.add(b, node, false);
  }
}

public void removeBlockFromMap(Block block) {
  removeFromExcessReplicateMap(block);
  blocksMap.removeBlock(block);
  // If block is removed from blocksMap remove it from corruptReplicasMap
  corruptReplicas.removeFromCorruptReplicasMap(block);
}
           

    (2)多餘副本删除

        addStoredBlock()方法最後階段的進行中,移除多餘副本使用了processOverReplicatedBlock()方法。這個方法其實隻是為chooseExcessReplicates()準備資料,由于blocksMap儲存着資料塊的所有副本所在的資料節點,方法需要周遊這些資料節點,當節點同時滿足下面3個條件時,才把它标記為”等待删除資料塊“的候選資料節點,并儲存在變量nonExcess中。

  1. 該節點資訊不在excessReplicateMap中。
  2. 該節點不是一個處于”正在撤銷“或”已撤銷“的資料節點。
  3. 該節點儲存的副本已經損壞

       具體選擇執行删除操作的資料節點的原則為:盡量保證删除之後的副本能均勻分布在不同的機架上,盡量從空間較少的節點上删除備援副本。最終processOverReplicatedBlock()方法,代碼如下:

private void processOverReplicatedBlock(final Block block,
    final short replication, final DatanodeDescriptor addedNode,
    DatanodeDescriptor delNodeHint) {
    
  Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
  Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);
  
  for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
    final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
    if (storage.areBlockContentsStale()) {
      postponeBlock(block);
      return;
    }
    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
        .getDatanodeUuid());
    if (excessBlocks == null || !excessBlocks.contains(block)) {
      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
        // exclude corrupt replicas
        if (corruptNodes == null || !corruptNodes.contains(cur)) {
          nonExcess.add(storage);
        }
      }
    }
  }
  // 按照條件選出合适的DataNode,将需要删除的資料塊副本加入到invalidateBlocks隊列中
  chooseExcessReplicates(nonExcess, block, replication, 
      addedNode, delNodeHint);
}
           

    (3)損壞副本删除

       在用戶端讀檔案或者資料節點上的資料塊掃描器都可能回發現損壞的副本,它們會通過對應接口的reportBadBlocks()将損壞的資料塊副本資訊彙報給名位元組點,最終由BlockManager.markBlocksAsCorrupt()方法進行處理。該方法會将損壞的資料塊副本加入到corruptReplicas隊列中,然後判斷該資料塊副本是否有足夠的副本數量,否則加入到neededReplications隊列中,等待進行資料塊的複制操作。代碼如下:

private void markBlockAsCorrupt(BlockToMarkCorrupt b,
    DatanodeStorageInfo storageInfo,
    DatanodeDescriptor node) throws IOException {

  BlockCollection bc = b.corrupted.getBlockCollection();
  if (bc == null) { // 資料塊副本不屬于任何檔案 直接删除
    addToInvalidates(b.corrupted, node);
    return;
  } 

  // Add replica to the data-node if it is not already there
  if (storageInfo != null) {
    storageInfo.addBlock(b.stored);
  }

  // Add this replica to corruptReplicas Map // 添加到損壞副本對象中
  corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
      b.reasonCode);

  NumberReplicas numberOfReplicas = countNodes(b.stored);
  boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc
      .getBlockReplication();
  boolean minReplicationSatisfied =
      numberOfReplicas.liveReplicas() >= minReplication;
  boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
      (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
      bc.getBlockReplication();
  boolean corruptedDuringWrite = minReplicationSatisfied &&
      (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
  // 如果有足夠的副本數量,則直接删除這個副本
  if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
      || corruptedDuringWrite) {
    // the block is over-replicated so invalidate the replicas immediately
    invalidateBlock(b, node);
  } else if (namesystem.isPopulatingReplQueues()) {
    // 如果副本數不足,則複制這個資料塊
    // add the block to neededReplication
    updateNeededReplications(b.stored, -1, 0);
  }
}
           

最終,其副本删除的邏輯流程整理後如下:

Hdfs NameNode中資料塊管理與資料節點管理分析

       通過對副本删除邏輯的分析可以知道,BlockManager會将需要删除的資料塊副本添加到BlockManager.invalidateBlocks隊列中,等待生成對應的删除指令;在上文中對DatanodeDescriptor的分析中我們可以知道,NameNode生成對應DataNode的删除指令,隻是簡單的從DatanodeDescriptor.invalidateBlocks對象中取出該DataNode上需要删除的資料塊,封裝成對應的BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)即可;

       接下來分析一下BlockManager是如何生成對應的删除指令,也即是如何将BlockManager.invalidateBlocks隊列中待删除的副本對象封裝添加到對應的DatanodeDescriptor.invalidateBlocks對象中,其具體分析過程如下:

       在BlockManager對象中,其擁有一個内部的線程類ReplicationMonitor; 在BlockManager初始化構造啟動中,該ReplicationMonitor線程也會啟動,其會周期性的調用computeDatanodeWork()方法觸發資料塊副本的複制和删除任務,然後調用processPendingReplications()方法将已産生複制請求但逾時處理的資料塊副本重新添加到neededReplications隊列中,等待重新生成複制指令。

private class ReplicationMonitor implements Runnable {
  @Override
  public void run() {
    while (namesystem.isRunning()) {
      try {
        // Process replication work only when active NN is out of safe mode.
        if (namesystem.isPopulatingReplQueues()) {
          computeDatanodeWork(); // 執行對應的複制操作和删除操作
          processPendingReplications();
        }
        Thread.sleep(replicationRecheckInterval);
      } catch (Throwable t) {
        // ......
      }
    }
  }
}
           

computeDatanodeWork()方法執行了以下兩個操作過程:

  1. 複制操作:從blockManager中的待複制資料塊清單neededReplications中選出若幹個資料塊執行複制操作,為這些資料塊的複制操作選出source源節點以及target目标節點,然後将其封裝成BlockTargetPair對象添加到DatanodeDescriptor.replicateBlocks中,等待下次該DataNode心跳的時候将構造複制指令帶到目标節點以執行副本的複制操作。
  2. 删除操作:從blockManager中的待删除資料塊清單invalidateBlocks中選出若幹個副本,然後構造删除指令,也即是将blockManager.invalidateBlocks中的待删除資料塊添加到對應的DatanodeDescriptor.invalidateBlocks中,等待下次該DataNode心跳的時候将構造删除指令帶到目标節點以執行副本的删除操作。

其基本的複制、删除操作邏輯流程如下:

Hdfs NameNode中資料塊管理與資料節點管理分析
Hdfs NameNode中資料塊管理與資料節點管理分析

接下來針對資料塊副本删除操作,詳細的分析下computeDatanodeWork()方法的工作流程:

int computeDatanodeWork() {

  final int numlive = heartbeatManager.getLiveDatanodeCount();
  final int blocksToProcess = numlive                 // 計算出需要複制操作的資料塊數量
      * this.blocksReplWorkMultiplier;
  final int nodesToProcess = (int) Math.ceil(numlive  // 計算出需要删除操作的資料塊數量
      * this.blocksInvalidateWorkPct);

  // 計算出需要進行複制的副本
  int workFound = this.computeReplicationWork(blocksToProcess);

  // Update counters
  // ......
  
  // 計算出需要進行删除的副本
  workFound += this.computeInvalidateWork(nodesToProcess);
  return workFound;
}
           
// 針對删除操作
int computeInvalidateWork(int nodesToProcess) {
  // 從invalidateBlocks選出所有存在無效資料塊的DataNode
  final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes();
  Collections.shuffle(nodes);

  nodesToProcess = Math.min(nodes.size(), nodesToProcess); // 一次删除的資料塊數量

  int blockCnt = 0;
  for (DatanodeInfo dnInfo : nodes) {
    // 對每個資料節點調用invalidateWorkForOneNode()将待删除的資料塊副本
    // 添加到對應的DatanodeDescriptor.invalidateBlocks對象中
    int blocks = invalidateWorkForOneNode(dnInfo);
    if (blocks > 0) {
      blockCnt += blocks;
      if (--nodesToProcess == 0) {
        break;
      }
    }
  }
  return blockCnt;
}
           
private int invalidateWorkForOneNode(DatanodeInfo dn) {
  final List<Block> toInvalidate;
 
  try {
    DatanodeDescriptor dnDescriptor = datanodeManager.getDatanode(dn);
    
    // 在此處将對應需要删除的資料塊添加到DatanodeDescriptor中
    toInvalidate = invalidateBlocks.invalidateWork(dnDescriptor);
    
  } catch(UnregisteredNodeException une) {
  // ......
}

//invalidateBlocks#invalidateWork()
synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
  // ......
  // # blocks that can be sent in one message is limited
  final int limit = blockInvalidateLimit;
  final List<Block> toInvalidate = set.pollN(limit);

  // If we send everything in this message, remove this node entry
  if (set.isEmpty()) {
    remove(dn);
  }

  // 調用DatanodeDescriptor.addBlocksToBeInvalidated将待删除的資料塊清單添加到
  // DatanodeDescriptor.invalidateBlocks中
  dn.addBlocksToBeInvalidated(toInvalidate);
  numBlocks -= toInvalidate.size();
  return toInvalidate;
}
           

資料節點管理

       資料節點啟動時,會和名位元組點進行RPC通信,主要包括握手、注冊并進行資料塊上報,然後定期發送心跳資訊,維護和名位元組點的聯系。接下來主要來關注名位元組點是如何管理資料節點的,包括:添加和撤銷資料節點,資料節點啟動時名位元組點上執行的流程,心跳處理和名位元組點的指令如何産生并下發等流程。

1、添加和撤銷資料節點

       HDFS在需要增加叢集容器時,可以動态地往叢集添加新的資料節點。相反,如果希望縮小叢集的規模,那麼需要撤銷已存在的資料節點。如果一個資料節點頻繁地發生故障或者進行緩慢,也可以通過撤銷操作,将節點下架。上述操作讓HDFS有了一定的彈性,可根據應用規模進行擴充或收縮。HDFS提供了dfs.hosts檔案和dfs.hosts.exclude檔案來對能夠連接配接到名位元組點的資料節點和不能連接配接到名位元組點的資料節點進行明确的管理,以保證資料節點受叢集控制,也可以防止配置錯誤的資料節點接入名位元組點。

       指令refreshNodes其實是使用遠端接口ClientProtocol.refreshNode(),通過名位元組點更新Include和exclude檔案。該遠端方法最終會由DataNodeManager.refreshNode()方法響應,refreshNode()方法首先會調用refreshHostsReader()方法将include檔案與exclude檔案加載到hostFileManager中,而後調用refreshDatanodes()重新整理所有的資料節點。

其主要實作邏輯如下:

public void refreshNodes(final Configuration conf) throws IOException {
  refreshHostsReader(conf); // 加載include檔案與exclude檔案到hostFileManager中
  namesystem.writeLock();
  try {
    refreshDatanodes(); // 重新整理所有的資料節點
    countSoftwareVersions();
  } finally {
    namesystem.writeUnlock();
  }
}

private void refreshDatanodes() {
  for(DatanodeDescriptor node : datanodeMap.values()) { // 周遊所有的DatanodeDescriptor對象
    // Check if not include.
    if (!hostFileManager.isIncluded(node)) { // 不在include檔案中
      node.setDisallowed(true); // case 2. // 将DatanodeDescriptor.disallowed設定為撤銷狀态
    } else {
      if (hostFileManager.isExcluded(node)) { // 在exclude檔案中,開始撤銷操作
        startDecommission(node); // case 3.
      } else { // 不在exclude檔案中,開取消撤銷操作
        stopDecommission(node); // case 4.
      }
    }
  }
}
           

       撤銷節點通過exclude檔案,将要撤銷的節點增加到檔案中,然後還是執行“hadoop dfsadmin  - refreshNodes”指令,名位元組點就會開始撤銷資料節點。被撤銷節點上的資料塊會複制到叢集的其他資料節點,這個過程中,資料節點處于“正在撤銷”狀态,資料複制完成後才會轉移到“已撤銷”,這個時候就可以關閉相應的資料節點了。

2、資料節點的啟動

       添加資料節點并啟動節點時,執行的流程和正常的資料節點啟動是一樣的。資料節點啟動時,需要和名位元組點進行握手、注冊和資料塊上報。如果系統支援Append操作,還需要上報處于用戶端寫狀态的資料塊資訊。在之前的文章中已經分析了DataNode啟動時的操作(DataNode啟動流程分析),現在來看下NameNode側的響應調用,即遠端接口DatanodeProtocol的versionRequest()、registerDataNode()和blockReport()方法在名位元組點上的實作。

(1)握手:握手請求是由NameNodeRpcServer實作,并最終調用FSNamesystem.getNamespaceInfo()并傳回命名空間的資訊。

public NamespaceInfo versionRequest() throws IOException { // NameNodeRpcServer中
  checkNNStartup();
  namesystem.checkSuperuserPrivilege();
  return namesystem.getNamespaceInfo();
}

NamespaceInfo unprotectedGetNamespaceInfo() { // FSNamesystem中
  return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
      getClusterId(), getBlockPoolId(),
      getFSImage().getStorage().getCTime());
}
           

(2)注冊:遠端方法registerDataNode()的主要處理邏輯在FSNamesystem.registerDatanode()中,NameNode會為注冊的DataNode配置設定唯一的storageId作為辨別(storageId在dataNodeMap中作為key,用于擷取DatanodeDescriptor對象)。需要注意到資料節點可以重複發送注冊資訊,是以需要對DataNode注冊時的不同情況進行不同的處理。

名位元組點需要根據不同情況,對注冊請求進行不同的處理如下:(詳細源代碼在DataNodeManager.registerDatanode()中):

  1. 該資料節點沒有注冊過。
  2. 資料節點注冊過,這次注冊時重複注冊。
  3. 資料節點注冊過,但這次注冊使用了新的資料節點存儲辨別(storageID),表明該資料節點存儲空間已經被清理了,原有的資料塊副本已經被删除。

       首先,registerDatanode()需要為注冊節點生成資料節點辨別,名位元組點不能完全信任資料節點發送過來的資訊,它需要根據實際情況更新注冊資訊中攜帶的資料節點辨別,然後,使用這個辨別進行後續的處理。在DataNodeManager中維護者兩個DatanodeDescriptor對象的映射關系:

  • 用nodeS表示從DataNodeManager.datanodeMap中通過storageId擷取的DatanodeDescriptor對象;
  • 用nodeN表示從DataNodeManager.host2DataNodeMap中通過hostname擷取的DatanodeDescriptor對象;
if (!hostFileManager.isIncluded(nodeReg)) {
  throw new DisallowedDatanodeException(nodeReg);
}
DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
    nodeReg.getIpAddr(), nodeReg.getXferPort());
           

1、資料節點未注冊:nodeS==null && nodeN==null;建立新的資料節點描述符、獲得節點的網絡拓撲位置、添加節點到datanodeMap、host2DataNodeMap和心跳資訊清單heartbeats中。

// 建立新的資料節點描述對象
DatanodeDescriptor nodeDescr 
  = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
boolean success = false;
try {
  // resolve network location
  // 更新網絡拓撲
  if(this.rejectUnresolvedTopologyDN) {
    nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
    nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
  } else {
    nodeDescr.setNetworkLocation(
        resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
    nodeDescr.setDependentHostNames(
        getNetworkDependenciesWithDefault(nodeDescr));
  }
  networktopology.add(nodeDescr);
  nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());

  // register new datanode
  // 注冊節點  添加節點到datanodeMap、host2DataNodeMap
  addDatanode(nodeDescr);
  checkDecommissioning(nodeDescr);
  
  // 添加到心跳資訊清單heartbeats中
  heartbeatManager.addDatanode(nodeDescr);
  success = true;
  incrementVersionCount(nodeReg.getSoftwareVersion());
} finally {
  // ......
}
           

2、資料節點重複注冊,nodeS!=null;由于名位元組點已經擁有該節點的資訊,這時隻需用新的注冊資訊更新NameNode中儲存的原有的DataNode資訊即可,更新節點在網絡拓撲中的位置和(可能的)心跳資訊。

// 更新網絡拓撲、節點資訊
getNetworkTopology().remove(nodeS);
if(shouldCountVersion(nodeS)) {
  decrementVersionCount(nodeS.getSoftwareVersion());
}
nodeS.updateRegInfo(nodeReg);

nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list

// resolve network location
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
getNetworkTopology().add(nodeS);
  
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
           

3、使用新的資料節點存儲辨別(storageID)注冊,nodeN!=null && nodeN!=nodeS;即原先在DataNode上儲存的資料塊失效,需要先清理NameNode中這個DataNode的資訊。操作流程為:nodeN即是原有老資料節點辨別,利用這個辨別,通過DataNodeManager的removeDatanode()和wipeDatanode()方法,清理原有節點在名位元組點中儲存的資訊,并将nodeN設定為空,後續的處理,就和請求一資料節點未注冊情況是一緻的。

if (nodeN != null && nodeN != nodeS) {
  removeDatanode(nodeN); // 删除DatanodeDescriptor對象
  wipeDatanode(nodeN); // 從datanodeMap、host2DataNodeMap中删除
  nodeN = null;
}
           

       其中removeDatanode()删除了NameNode記憶體中所有該DataNode對應的DatanodeDescriptor對象,同時從BlockManager.blocksMap中删除該DataNode存儲的資料塊副本;wipeDatanode()方法将DataNodeManager内部的datanodeMap、host2DataNodeMap中包含的DatanodeDescriptor對象删除。

private void removeDatanode(DatanodeDescriptor nodeInfo) {
  assert namesystem.hasWriteLock();
  heartbeatManager.removeDatanode(nodeInfo);
  blockManager.removeBlocksAssociatedTo(nodeInfo);
  networktopology.remove(nodeInfo);
  decrementVersionCount(nodeInfo.getSoftwareVersion());
  namesystem.checkSafeMode();
}

private void wipeDatanode(final DatanodeID node) {
  final String key = node.getDatanodeUuid();
  synchronized (datanodeMap) {
    host2DatanodeMap.remove(datanodeMap.remove(key));
  }
  blockManager.removeFromInvalidates(new DatanodeInfo(node));
}
           

(3)資料塊上報:成功注冊的資料節點,接下來會進行資料塊上報,向名位元組點提供它的資料塊資訊,該請求的主要處理實作是BlockManager.processReport()中。在進行中,如果是第一次塊彙報,則會調用processFirstBlockReport(),否則調用processReport()方法進行處理;其基本差別如下(具體分析可參照源碼):

  1. processFirstBlockReport:将塊彙報中所有有效的副本快速加入到NameNode中,其并不會考慮和操作存儲資料塊副本的隊列(如corruptReplicas)
  2. processReport:将塊彙報中的資料塊副本和NameNode中儲存的副本狀态做對比,并将其添加到不同的副本狀态管理對象中。如方法變量toInvalidate中就儲存了要删除的資料塊副本,這些副本最終通過addToInvalidates()方法,添加到BlockManager的成員變量invalidateBlocks中。
public boolean processReport(final DatanodeID nodeID,
    final DatanodeStorage storage,
    final BlockListAsLongs newReport, BlockReportContext context,
    boolean lastStorageInRpc) throws IOException {
  try {
    // ......   
    if (storageInfo.getBlockReportCount() == 0) {
      // 第一次資料塊彙報
      processFirstBlockReport(storageInfo, newReport);
    } else {
      // 不是第一次資料塊彙報 調用私有的processReport()方法
      invalidatedBlocks = processReport(storageInfo, newReport);
    }
    
    boolean staleBefore = storageInfo.areBlockContentsStale();
    storageInfo.receivedBlockReport();
  }
  // ......
}
           

3、心跳

        在BPServiceActor.offerService()中,資料節點利用循環向節點發送心跳資訊,維護它們間的關系,上報負載資訊并擷取名位元組點指令。名位元組點和資料節點心跳相關的代碼可以分為兩部分:心跳資訊處理和心跳檢查。

(1)心跳資訊處理:NameNodeRpcServer.sendHeartbeat()被DataNodeManager.handleHeartbeat()響應調用,心跳資訊處理如下:

  1. 首先是對發送請求的資料節點進行檢查,判斷該節點是否能連接配接到名位元組點。同時也判斷資料節點是否已經注冊過,未注冊的資料節點會收到DatanodeCommand.REGISTER指令,這時,節點要重新注冊并上報資料塊資訊。
  2. 名位元組點利用心跳資訊中的負載資訊,調用heartbeatManager.updateHeartbeat()方法更新整個HDFS系統的負載資訊。DatanodeDescriptor.updateHeartbear()不但更新節點負載,同時也更新了節點的心跳時間。
  3. 名位元組點會為這個資料節點産生名位元組指令,并通過遠端調用的傳回值傳回。其中DatanodeCommand.REGISTER,就是一種名位元組點指令。

       handleHeartbet()方法一般通過DatanodeDescriptor的對應方法産生名位元組點指令,下面代碼中的删除資料塊副本指令,就是通過getInvalidateBlocks()方法獲得,其從DatanodeDescriptor中成員變量invalidateBlocks中擷取要删除的副本清單,并根據清單建立删除指令。代碼如下:

/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, final String blockPoolId,
    long cacheCapacity, long cacheUsed, int xceiverCount, 
    int maxTransfers, int failedVolumes
    ) throws IOException {
  synchronized (heartbeatManager) {
    synchronized (datanodeMap) {
      DatanodeDescriptor nodeinfo = null;
      // 未注冊,則先發送注冊指令進行注冊
      try {
        nodeinfo = getDatanode(nodeReg);
      } catch(UnregisteredNodeException e) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
      }
      
      // Check if this datanode should actually be shutdown instead. 
      if (nodeinfo != null && nodeinfo.isDisallowed()) {
        setDatanodeDead(nodeinfo);
        throw new DisallowedDatanodeException(nodeinfo);
      }

      if (nodeinfo == null || !nodeinfo.isAlive) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
      }

      // 負載資訊及心跳時間的更新
      heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                       cacheCapacity, cacheUsed,
                                       xceiverCount, failedVolumes);

      // If we are in safemode, do not send back any recovery / replication
      // requests. Don't even drain the existing queue of work.
      if(namesystem.isInSafeMode()) {
        return new DatanodeCommand[0];
      }
      
      // ......
      // 名位元組點指令的生成,此處隻展示删除資料塊指令
      //check block invalidation
      Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
      if (blks != null) {
        cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
            blockPoolId, blks));
      }
     
      if (!cmds.isEmpty()) {
        return cmds.toArray(new DatanodeCommand[cmds.size()]);
      }
    }
  }
  return new DatanodeCommand[0];
}
           

(2)心跳檢查:心跳資訊的處理由遠端方法sendHeartbeat()實作。DatanodeManager中和心跳相關的另一部分代碼是心跳檢查,由HeartbeatManager類實作,其擁有自己的線程Monitor,并定期通過調用heartbeatCheck()執行檢查邏輯。心跳檢查的間隔儲存在成員變量heartbeatRechekInterval中,預設值是5分鐘,可以通過配置項${dfs.namenode.heartbeat.recheck-interval}配置。代碼如下:

private class Monitor implements Runnable {
  private long lastHeartbeatCheck;
  private long lastBlockKeyUpdate;

  @Override
  public void run() {
    while(namesystem.isRunning()) {
      try {
        final long now = Time.now();
        if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
          heartbeatCheck(); // 檢查心跳更新情況(會有對應的故障處理)
          lastHeartbeatCheck = now;
        }
        // ......
      } catch (Exception e) {
        // ......
      }
      try {
        Thread.sleep(5000);  // 5 seconds
      } catch (InterruptedException ie) {
      }
    }
  }
}
           

繼續閱讀