這篇分析一下Lease Recovery 和 Block Recovery
hdfs支援hflush後,需要保證hflush的資料被讀到,datanode重新開機不能簡單的丢棄檔案的最後一個block,而是需要保留下hflush的資料。同時為了支援append,需要将已經finalized的block重新打開追加資料。這就為當機的恢複處理帶來了更大的困難,支援hflush/append之前,hdfs隻需要将未關閉檔案的最後一個block的多個副本删除即可.
在hdfs的設計中,Lease是為了實作一個檔案在一個時刻隻能被一個用戶端寫。用戶端寫檔案或者append之前都需要向namenode申請這個檔案的Lease,在用戶端寫資料的過程中,背景線程會不斷的renew lease,不斷的延長獨占寫的時間.實際上,Lease有兩個limit,一個是soft limit,預設60s,一個是hard limit,預設1小時。這兩個limit的差別如下:
lease soft limit過期之前,該用戶端擁有對這個檔案的獨立通路權,其他用戶端不能剝奪該用戶端獨占寫這個檔案的權利。
lease soft limit過期後,任何一個用戶端都可以回收lease,繼而得到這個檔案的lease,獲得對這個檔案的獨占通路權。
lease hard limit過期後,namenode強制關閉檔案,撤銷lease.
考慮用戶端寫檔案的過程中當機,那麼在lease soft limit過期之前,其他的用戶端不能寫這個檔案,等到lease soft limit過期後,其他用戶端可以寫這個檔案,在寫檔案之前,會首先檢查檔案是不是沒有關閉,如果沒有,那麼就會進入lease recovery和block recovery階段,這個階段的目的是使檔案的最後一個block的所有副本資料達到一緻,因為用戶端寫block的多個副本是pipeline寫,pipeline中的副本資料不一緻很正常。
本文考慮用戶端寫的過程中用戶端當機,随後其他用戶端對這個檔案進行append操作的場景。
用戶端通過如下代碼對一個檔案進行append:
FileSystem fs = FileSystem.get(configuration);
FSDataOutputStream out = fs.append(path);
out.write(byte[]);
append操作在namenode這端主要邏輯在FSNameSystem的appendFileInternal函數中處理,内部會調用
// Opening an existing file for write - may need to recover lease.
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
來檢查是否需要首先對檔案進行lease recovery.重點看看這個函數.
private void recoverLeaseInternal(INodeFile fileInode,
String src, String holder, String clientMachine, boolean force)
throws IOException {
// holder是對這個檔案進行append的clientname
assert hasWriteLock();
if (fileInode != null && fileInode.isUnderConstruction()) {
//
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
Lease lease = leaseManager.getLease(holder);
//
// We found the lease for this file. And surprisingly the original
// holder is trying to recreate this file. This should never occur.
//
if (!force && lease != null) {
Lease leaseFile = leaseManager.getLeaseByPath(src);
if ((leaseFile != null && leaseFile.equals(lease)) ||
lease.getHolder().equals(holder)) {
throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder +
" for client " + clientMachine +
" because current leaseholder is trying to recreate file.");
}
}
//
// Find the original holder.
//
FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature();
String clientName = uc.getClientName();
lease = leaseManager.getLease(clientName);
if (lease == null) {
throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder +
" for client " + clientMachine +
" because pendingCreates is non-null but no leases found.");
}
if (force) {
// close now: no need to wait for soft lease expiration and
// close only the file src
LOG.info("recoverLease: " + lease + ", src=" + src +
" from client " + clientName);
internalReleaseLease(lease, src, holder);
} else {
assert lease.getHolder().equals(clientName) :
"Current lease holder " + lease.getHolder() +
" does not match file creator " + clientName;
//
// If the original holder has not renewed in the last SOFTLIMIT
// period, then start lease recovery.
//
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover " + lease + ", src=" + src + " client "
+ clientName);
boolean isClosed = internalReleaseLease(lease, src, null);
if(!isClosed)
throw new RecoveryInProgressException(
"Failed to close file " + src +
". Lease recovery is in progress. Try again later.");
} else {
final BlockInfo lastBlock = fileInode.getLastBlock();
if (lastBlock != null
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException("Recovery in progress, file ["
+ src + "], " + "lease owner [" + lease.getHolder() + "]");
} else {
throw new AlreadyBeingCreatedException("Failed to create file ["
+ src + "] for [" + holder + "] for client [" + clientMachine
+ "], because this file is already being created by ["
+ clientName + "] on ["
+ uc.getClientMachine() + "]");
}
}
}
}
}
- 通過檢查檔案的INode看檔案的狀态,如果處于under construction狀态,說明,該檔案不處于關閉狀态,那麼很可能這個檔案需要經過lease recovery和block recovery階段來對檔案的最後一個block的多個副本達到一緻.
- 從lease manager中根據clientname拿到clientname持有的Lease(holder是調用此次append操作的clientname),如果不為空,說明該用戶端依然持有lease,那麼接着看這個lease中是否包含append的這個檔案名,如果确實有,那麼說明目前用戶端仍然持有這個檔案的lease,append失敗,因為append的前提條件是檔案處于closed狀态.如果lease中不包含這個檔案,說明用戶端目前不持有這個檔案的Lease,那麼繼續往下走
- 從INode中找出這個之前擁有這個檔案的leaseholder,也就是在我們設定的場景中的當機的用戶端,然後從lease manager中找到當機的用戶端對應的Lease,然後檢查是否這個lease已經soft limit過期,如果過期,則調用
boolean isClosed = internalReleaseLease(lease, src, null);
這個函數檢查是否需要真正的進入block recovery階段,這個階段需要datanode的參與。下面函數的主要邏輯如下.
3.1. 如果檔案的所有block都是completed狀态,則不需要進行block recovery,關閉檔案.
則從lease manager将這個檔案的lease删除,将INode的狀态置為complete,最後記一條close file的edit log
3.2. 如果最後一個block是committed狀态,那麼看該檔案的最後兩個block的狀态,如果倒數第二個block和最後一個block都滿足最小副本數要求(預設是1),關閉檔案.否則,用戶端抛異常。
3.3. 如果最後一個block是under construction或者under recovery狀态,并且最後一個block沒有任何datanode彙報上來,很有可能是pipeline還沒建立起來,用戶端就當機了,這種情況下,隻需要把最後一個block從INode中溢出,并且關閉檔案.
3.4. 進入block recovery階段.
- 為這次block recovery過程申請一個block recovery id,标示這次block recovery過程.block recovery id實際是一個新配置設定的generation stamp
- 将block狀态設定為under recovery,從block的多個副本中選擇一個副本所在的datanode作為primary data node,然後将這個block放入這個datanode的recoverBlocks清單中,随後,namenode在處理datanode的定期心跳中,會将這個datanode的所有的recoverBlocks都在心跳回複中發送給datanode,以BlockRecoveryCommand的形式.代碼:
DatanodeManager::handleHeartbeat //check lease recovery BlockInfoUnderConstruction[] blocks = nodeinfo .getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); for (BlockInfoUnderConstruction b : blocks) { final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List<DatanodeStorageInfo> recoveryLocations = new ArrayList<DatanodeStorageInfo>(storages.length); for (int i = 0; i < storages.length; i++) { if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) { recoveryLocations.add(storages[i]); } } // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. if (recoveryLocations.size() > 1) { if (recoveryLocations.size() != storages.length) { LOG.info("Skipped stale nodes for recovery : " + (storages.length - recoveryLocations.size())); } brCommand.add(new RecoveringBlock( new ExtendedBlock(blockPoolId, b), DatanodeStorageInfo.toDatanodeInfos(recoveryLocations), b.getBlockRecoveryId())); } else { // If too many replicas are stale, then choose all replicas to participate // in block recovery. brCommand.add(new RecoveringBlock( new ExtendedBlock(blockPoolId, b), DatanodeStorageInfo.toDatanodeInfos(storages), b.getBlockRecoveryId())); } } return new DatanodeCommand[] { brCommand }; }
現在看DataNode端.
DataNode端的BPServiceActor處理心跳回複,在offerService()函數中,從心跳回複中拿出所有的DataNodeCommand處理。在processCommandFromActive函數中檢查,command類型是DNA_RECOVERBLOCK,說明是block recovery指令,調用DataNode的recoverBlocks處理.
case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress();
dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
dn.recoverBlocks會起一個背景線程專門來處理這件事,對于每個需要recover的block:
- 從block拿出副本所在的datanode,給其他兩個副本所在的datanode建立連接配接,datanode之間的接口定義在InterDatanodeProtocol接口中,調用DataNode(包括自己)的initReplicaRecovery(rBlock)函數,DataNode最終會調用FsDatasetImpl的initReplicaRecovery方法來初始化datanode上需要恢複的replica。看看這個函數:
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
//check replica
if (replica == null) {
return null;
}
//stop writer if there is any
if (replica instanceof ReplicaInPipeline) {
final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
rip.stopWriter(xceiverStopTimeout);
//check replica bytes on disk.
if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
}
//check the replica's files
checkReplicaFiles(rip);
}
//check generation stamp
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ block + ", replica=" + replica);
}
//check recovery id
if (replica.getGenerationStamp() >= recoveryId) {
throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+ ", block=" + block + ", replica=" + replica);
}
//check RUR
final ReplicaUnderRecovery rur;
if (replica.getState() == ReplicaState.RUR) {
rur = (ReplicaUnderRecovery)replica;
if (rur.getRecoveryID() >= recoveryId) {
throw new RecoveryInProgressException(
"rur.getRecoveryID() >= recoveryId = " + recoveryId
+ ", block=" + block + ", rur=" + rur);
}
final long oldRecoveryID = rur.getRecoveryID();
rur.setRecoveryID(recoveryId);
LOG.info("initReplicaRecovery: update recovery id for " + block
+ " from " + oldRecoveryID + " to " + recoveryId);
}
else {
rur = new ReplicaUnderRecovery(replica, recoveryId);
map.add(bpid, rur);
LOG.info("initReplicaRecovery: changing replica state for "
+ block + " from " + replica.getState()
+ " to " + rur.getState());
}
return rur.createInfo();
}
```
首先,檢查副本的狀态,如果目前副本的狀态是正在寫的過程中,那麼調用replica的stopWriter停止這個寫線程,停止的方法就是interupt這個寫線程(寫pipeline時,datanode建立replica時會将目前寫線程的handle存到replica中),從這可以看出blcok recovery優先級很高。然後做一些check,比如副本在磁盤上的檔案是否存在,meta檔案是否存在等,然後,檢查generation stamp,namenode記錄的generation stamp不能比實際的大,recovery id不能比副本的generation stamp小,最後,建立一個ReplicaUnderRecovery,放入replica map中,這裡還會檢查,如果replica已經處于under recovery狀态,則看目前的block recovery過程的recovery id和它誰大,如果更大,則強占它。
接着,将三個副本的資訊(包括recovery前的副本的資訊)都加入一個清單,然後開始sync,sync就是根據三個副本的原來的狀态,做一些選擇,規則如下,這是兩個副本的情況:

參考資料
hadoop-hdfs-2.4.1.jar
Append/Hflush/Read Design