本文主要介紹5個典型的HDFS流程,這些流程充分展現了HDFS實體間IPC接口和stream接口之間的配合。
1. Client和NN
Client到NN有大量的中繼資料操作,比如修改檔案名,在給定目錄下建立一個子目錄,這些操作一般隻涉及Client和NN的互動,通過IPC調用ClientProtocol進行。建立子目錄的邏輯流程如下圖:

從圖中可見,建立子目錄這種操作并沒有涉及DN。因為中繼資料會被NN持久化到edits中,是以在持久化結束之後,這個調用就會被成功傳回。複習一下:NN維護了HDFS的檔案系統目錄樹和檔案與資料塊的對應關系,和資料塊與DN的對應關系。是以,建立目錄僅僅是在NN上也就很容易了解了。
一些更為複雜的操作,如使用
DistributedFileSystem.setReplication()
來增加檔案的副本數,再如通過
DistributedFileSystem.delete()
來删除HDFS上的檔案,都需要DN配合執行一些動作。其中DistributedFileSystem源碼在hadoop-hdfs-project\hadoop-hdfs\src\main\java\org\apache\hadoop\hdfs\DistributedFileSystem.java
以用戶端删除HDFS檔案為例,操作在NN上執行完成後,DN存放的檔案内容的資料塊也必須删除。但是,NN在執行delete()方法時,它隻标記需要删除的資料塊(當然,delete的記錄檔也會被持久化),而不會主動聯系DN去立即删除這些資料。當儲存着這些資料塊的DN在向NN發送心跳時,NN會通過心跳應答攜帶DatanodeCommand指令來通知DN删除資料。也就是說,被删除的資料塊在Client接到成功的響應後,會在一段時間後才能真正删除,NN和DN永遠隻維護簡單的主從關系。NN永遠不會主動發起向DN的調用。NN隻能通過DN心跳應答中攜帶DatanodeCommand的指令對DN進行管理。
2. Client讀檔案
使用Java API讀取檔案的源碼如下:
FileSystem hdfs = FileSystem.get(new Configuration());
Path path = new Path("/testfile");
// reading
FSDataInputStream dis = hdfs.open(path);
byte[] writeBuf = new byte[1024];
int len = dis.read(writeBuf);
System.out.println(new String(writeBuf, 0, len, "UTF-8"));
dis.close();
hdfs.close();
下圖顯示了HDFS在讀取檔案時,Client,NN和DN發生的事件和這些事件的順序:
步驟1的源碼:
public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
return new HdfsDataInputStream(
dfs.open(getPathName(p), bufferSize, verifyChecksum)
);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p) throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
可見open傳回的是HdfsDataInputStream。dfs為
hadoop-hdfs-project\hadoop-hdfs\src\main\java\org\apache\hadoop\hdfs\DFSClient.java。HdfsDataInputStream繼承自FSDataInputStream。構造是并沒有額外的處理。
public class HdfsDataInputStream extends FSDataInputStream {
public HdfsDataInputStream(DFSInputStream in) throws IOException {
super(in);
}
}
FSDataInputStream繼承自DFSInputStream。
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum ) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
openInfo();
}
/** * Grab the open-file info from namenode */
synchronized void openInfo() throws IOException, UnresolvedLinkException {
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else {
break;
}
retriesForLastBlockLength--;
}
if (retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
fetchLocatedBlocksAndGetLastBlockLength通過調用getLocatedBlocks實作了示意圖中的步驟二:
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
if (locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
// if the length is zero, then no data has been written to
// datanode. So no need to wait for the locations.
return 0;
}
return -1;
}
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
currentNode = null;
return lastBlockBeingWrittenLength;
}
你可能會說步驟二調用的是getBlockLocations。看以下的代碼:
<span style="font-size:14px;">@VisibleForTesting
public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException {
return callGetBlockLocations(namenode, src, start, length);
}
/** * @see ClientProtocol#getBlockLocations(String, long, long) */
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,String src, long start, long length) throws IOException {
try {
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,FileNotFoundException.class,UnresolvedPathException.class);
}
} </span>
然後就可以開始讀檔案的資料了。通過NameNode.getBlockLocations的遠端調用接口獲得了檔案開始部分的資料塊的儲存位置。對于檔案中的每個塊,NN傳回儲存着該副本的DN的位址。注意,這些DN根據它們與Client的距離進行了簡單的排序(利用了網絡的拓撲資訊)。
Client調用HdfsDataInputStream的read方法讀取檔案資料時,DFSInputStream對象會通過和DN間的讀資料stream接口,和最近的DN建立連接配接。Client反複調用read方法,資料會通過DN和Client的連接配接上的資料包傳回Client。當到達塊的末端時,DFSInputStream會關閉和DN的連接配接。并通過getBlockLocations()遠端方法獲得儲存着下一個資料塊的DN資訊,嚴格來說,在對象沒有緩存該資料塊的位置時,才會使用這個遠端方法。這就是上圖中的步驟五。然後重複上述過程。
另外,由于NameNode.getBlockLocations()不會一次傳回檔案的所有的資料塊資訊,DFSInputStream可能需要多次調用該遠端方法,檢索下一組資料塊的位置資訊。對于使用者來說,它讀取的是一個連續的資料流,上面所講的聯系不同的DN,多次定位資料塊的過程,都是透明的。當使用者完成資料讀取任務後,通過FSDataInputStream.close()關系資料流。即圖中的步驟六。
如果DN發生了錯誤,如節點停機或者網絡出現故障,那麼Client會嘗試連接配接下一個Block的位置。同時它會記住出現故障的那個DN,不會再進行徒勞的嘗試。在資料的應答中,不單包含了資料,還包含了資料的校驗和,Client會檢查資料的一緻性,如果發現了校驗錯誤,它會将這個資訊報告給NN;同時,嘗試從别的DN讀取另外一個副本的内容。由Client在讀取資料時進行資料完整性檢查,可以降低DN的負載,均衡各個節點的計算能力。
這樣的設計其實可以給我們一個很好的設計大型分布式系統的例子。通過一些有效的設計,将計算和網絡等分散到各個節點上,這樣可以最大程度的保證scalability。
3. Client寫檔案
即使不考慮出現錯誤的情況,寫檔案也是HDFS最複雜的流程。本節通過建立一個新檔案并向檔案寫入資料,結束後關閉這個檔案為例,分析檔案寫入時各個節點之間的配合。
Client調用DistributedFileSystem.create()建立檔案(上圖中的步驟一),這時DistributedFileSystem建立了DFSOutputStream,并由RPC,讓NN執行同名的方法,在檔案系統的命名空間建立一個新檔案。NN建立新檔案時,需要執行檢查,包括NN是否處理正常工作狀态,被建立的檔案不存在,Client是否有在父目錄中建立檔案的權限等。通過檢查後,NN會建構一個新檔案,記錄建立操作到編輯日志edits中。RPC結束後,DistributedFileSystem将該DFSOutputStream對象包裝到FSDataOutputStream執行個體中,傳回Client。
<span style="font-size:14px;">@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize, final Progressable progress,
final ChecksumOpt checksumOpt) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
return new HdfsDataOutputStream(dfs.create(getPathName(p), permission,cflags,
replication, blockSize, progress, bufferSize, checksumOpt),statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException {
return fs.create(p, permission, cflags, bufferSize,replication, blockSize, progress, checksumOpt);
}
}.resolve(this, absF);
}</span>
關鍵的調用點有DFSClient.create:
/**
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
* a hint to where the namenode should place the file blocks.
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
* at the creation time only. HDFS could move the blocks during balancing or
* replication, to move the blocks from favored nodes. A value of null means
* no favored nodes for this create
*/
public DFSOutputStream create(String src,FsPermission permission,EnumSet<CreateFlag> flag,boolean createParent,
short replication,long blockSize,Progressable progress,int buffersize,
ChecksumOpt checksumOpt,InetSocketAddress[] favoredNodes) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getFileDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
String[] favoredNodeStrs = null;
if (favoredNodes != null) {
favoredNodeStrs = new String[favoredNodes.length];
for (int i = 0; i < favoredNodes.length; i++) {
favoredNodeStrs[i] = favoredNodes[i].getHostName() + ":"+ favoredNodes[i].getPort();
}
}
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
beginFileLease(src, result);
return result;
}
在步驟三Client寫入資料時,由于create()調用建立了一個空檔案,是以,DFSOutputStream執行個體首先需要想NN申請資料塊,addBlock()方法成功執行後,傳回一個LocatedBlock對象。該對象包含了新資料塊的資料塊辨別和版本好,同時,它的成員變量locs提供了資料流管道的資訊,通過上述資訊,DFSOutputStream就可以和DN連接配接,通過些資料接口建立資料流管道。Client寫入FSDataOutputStream流中的資料,被分成一個一個的檔案包,放入DFSOutputStream對象的内部隊列。該隊列中的檔案包最後打包成資料包,發往資料流管道,流經管道上的各個DN,并持久化,确認包逆流而上,從資料流管道依次發往Client,當Client收到應答時,它将對應的包從内部隊列删除。
public class LocatedBlock {
private ExtendedBlock b;
private long offset;
// offset of the first byte of the block in the file private DatanodeInfo[] locs;
/** Storage ID for each replica */
private String[] storageIDs;
// Storage type for each replica, if reported. private StorageType[] storageTypes;
// corrupt flag is true if all of the replicas of a block are corrupt.
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
private boolean corrupt;
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
/** * List of cached datanode locations */
private DatanodeInfo[] cachedLocs;
// Used when there are no locations
private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
DFSOutputStream在寫完一個資料塊後,資料流管道上的節點,會通過和NN的DatanodeProtocol遠端接口的blockReceived()方法,向NN送出資料塊。如果資料隊列還有等到輸出的資料,DFSOutputStream會再次調用addBlock(),為檔案添加新的資料塊。
Client完成資料的寫入後,會調用close()方法關閉流,關閉流意味着Client不會再向流中寫入資料。是以,當DFSOutputStream資料隊列的檔案包都收到應答後,就可以使用ClientProtocol.complete()方法通知NN關閉檔案,完成一次正常的檔案寫入。
如果在檔案寫入期間DN發生故障,則會執行下面的操作(注意,這些操作對于寫入資料的Client是透明的):
- 資料流管道會被關閉,已經發送到管道但是還沒有收到确認的檔案包,會被重新添加到DFSOutputStream的輸出隊列,這樣就保證了無路資料流管道的哪個DN發生故障,都不會丢失資料。目前正常工作的DN的資料塊會被賦予一個新的版本号,并通知NN。這樣,失敗的DN在從故障恢複過來以後,上面隻有部分資料的Block會因為版本号和NN儲存的版本号不比對而被删除。
- 在資料流管道中删除錯誤的DN并建立新的管道,繼續寫資料到正常工作的DN。
- 檔案關閉後,NN會發現該Block的副本數沒有達到要求,會選擇一個新的DN并複制Block,建立新的副本。DN的故障隻會影響一個Block的寫操作,後續Block的寫入不會受到影響。
4. DataNode的啟動與心跳機制
本節讨論DN的啟動及其與NN之間的互動。包括DN從啟動到進入正常工作狀态的注冊,Block上報,以及正常工作過程中的心跳等與NN相關的遠端調用。這部分雖然隻涉及DatanodeProtocol的接口,但是有助于進一步了解DN與NN的關系。
正常啟動的DN或者為更新而啟動的DN,都會向NN發送遠端調用versionRequest(),進行必要的版本檢查。這裡的版本檢查,隻涉及建構版本号,保證它們間的HDFS版本是一緻的。
在版本檢查結束後,DN會接着通過遠端調用register(),向NN注冊。DatanodeProtocol.register()的主要工作也是檢查,确認該DN是NN所管理叢集的成員。也就是說,使用者不能把某一個叢集中的某個node直接注冊到另外一個叢集中去,保證了整個系統的資料一緻性。
注冊成功後,DN會将它所管理的所有Block的資訊,通過blockRequest()方法上報到NN(步驟三),以幫助NN建立HDFS檔案資料塊到DN的映射關系。在此後,DN才正式的提供服務。
由于NN和DN是簡單的主從關系,DN需要每隔一段時間發送心跳到NN(步驟四和步驟五)。如果NN長時間收不到DN的心跳,它會認為DN已經失效。如果NN需要一些DN需要配合的動作,則會通過sendHeartbeat()的方法傳回。該傳回值是一個DatanodeCommand數組,它是NN的指令。
應該說,DN和NN的互動邏輯非常簡單。大部分是通過DN到NN的心跳來完成的。但是考慮到一定規模的HDFS叢集,一個NN會管理上千個DN,這樣的設計也就非常自然了。
5. SNN節點的中繼資料合并
當Client對HDFS的檔案目錄進行修改時,NN都會在edits中留下記錄,以保證在系統出現問題時,通過日志可以進行恢複。
fsimage是某一個時刻的檢查點(checkpoint)。由于fsimage很大,是以不會在每次的中繼資料修改都寫入到它裡邊,而隻是存在到edits中。在系統啟動時,會首先狀态最近時刻的fsimage,然後在通過edits,恢複系統的最新狀态。
當時如果edits太大,那麼節點啟動時将用很長的時間來執行日志的每一個操作,使得系統恢複最近的狀态。在啟動恢複的這段時間,服務是不可用的。為了避免edits多大,增加叢集的可用時間,HDFS引入了第二名位元組點,即SNN(Secondary NameNode)。SNN不是NN的standby,它隻是輔助NN完成合并(merge)fsimage和edits。過程涉及NamenodeProtocol和NN與SNN之間的流式接口。
該過程由SNN發起,首先通過遠端方法NamenodeProtocol.getEditLogSize()獲得NN上edits的大小。如果日志很小,SNN就會在指定的時間後重新檢查。否則,繼續通過遠端接口rollEditLog(),啟動一次檢查點的過程。這時,NN需要建立一個新的編輯日志edits.new,後續對中繼資料的改動,都會記錄到這個新日志中。而原有的fsimage和edits,會由SNN通過HTTP下載下傳到本地(步驟三和步驟四),在記憶體中進行merge。合并的結果就是fsimage.ckpt。然後SNN通過HTTP接口通知NN fsimage已經準備好。NN會通過HTTP get擷取merge好的fsimage。在NN下載下傳完成後,SNN會通過NamenodeProtocol.rollFsImage(),完成這次檢查點。NN在處理這個遠端方法時,會用fsimage.ckpt 覆寫原來的fsimage,并且将新的edits.new改名為edit。