天天看點

強制DataNode向NameNode上報blocks

正常情況下,什麼時候上報blocks,是由NameNode通過回複心跳響應的方式觸發的。

一次機房搬遷中,原機房hadoop版本為2.7.2,新機房版本為2.8.0,采用先擴容再縮容的方式搬遷。由于新舊機房機型不同和磁盤數不同,操作過程搞混過hdfs-site.xml,因為兩種不同的機型,hdfs-site.xml不便做到相同,導緻了NameNode報大量“missing block”。

然而依據NameNode所報資訊,在DataNode能找到那些被标記為“missing”的blocks。修複配置問題後,“missing block”并沒有消失。結合DataNode源代碼,估計是因為DataNode沒有向NameNode上報blocks。

結合DataNode的源代碼,發現了HDFS自帶的工具triggerBlockReport,它可以強制指定的DataNode向NameNode上報塊,使用方法為:

hdfs dfsadmin -triggerBlockReport datanode_host:ipc_port

如:hdfs dfsadmin -triggerBlockReport 192.168.31.35:50020

正常情況下NameNode啟動時,會要求DataNode上報一次blocks(通過fullBlockReportLeaseId值來控制),相關源代碼如下:

DataNode相關代碼(BPServiceActor.java):

private void offerService() throws Exception {

    HeartbeatResponse resp = sendHeartBeat(requestBlockReportLease); // 向NameNode發向心跳

    long fullBlockReportLeaseId = resp.getFullBlockReportLeaseId(); // 心跳響應

    boolean forceFullBr = scheduler.forceFullBlockReport.getAndSet(false); // triggerBlockReport強制上報僅一次有效

    if (forceFullBr) {

        LOG.info("Forcing a full block report to " + nnAddr);

    }

    if ((fullBlockReportLeaseId != 0) || forceFullBr) {

        cmds = blockReport(fullBlockReportLeaseId);

        fullBlockReportLeaseId = 0;

}

// NameNode相關代碼(FSNamesystem.java):

/**

* The given node has reported in.  This method should:

* 1) Record the heartbeat, so the datanode isn't timed out

* 2) Adjust usage stats for future block allocation

* If a substantial amount of time passed since the last datanode 

* heartbeat then request an immediate block report.  

* @return an array of datanode commands 

* @throws IOException

*/

HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,

  StorageReport[] reports, long cacheCapacity, long cacheUsed,

  int xceiverCount, int xmitsInProgress, int failedVolumes,

  VolumeFailureSummary volumeFailureSummary,

  boolean requestFullBlockReportLease) throws IOException {

    readLock();

    try {

        //get datanode commands

        final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;

        DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(

            nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,

            xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);

        long fullBlockReportLeaseId = 0;

        if (requestFullBlockReportLease) {

            fullBlockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);

        }

        //create ha status

        final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(

            haContext.getState().getServiceState(),

            getFSImage().getCorrectLastAppliedOrWrittenTxId());

        return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo, fullBlockReportLeaseId);

    } finally {

        readUnlock("handleHeartbeat");

繼續閱讀