天天看點

Hadoop-0.20.0源代碼分析(13)

通過對org.apache.hadoop.ipc包中,Hadoop實作了基于IPC模型的RPC機制,可以不需要像Java中實作的RMI機制一樣,在RPC調用的C/S兩端分别建立Stub和Skeleton,而是通過一組協定來進行RPC調用就可以實作通信。這主要是由于Hadoop所采用的序列化機制簡化了RPC調用的複雜性。Hadoop定義了自己的通信協定,這些協定都是建立在TCP/IP協定之上的,規範了通信兩端的約定。

為了閱讀分析org.apache.hadoop.hdfs.DFSClient類的實作,我們還需要對Hadoop定義的通信協定簇來進行了解,因為DFSClient所執行的操作都是基于協定的規定來實作的,了解協定的内容可能會對DFSClient執行個體連接配接到HDFS執行的任務更好了解一點。

首先,了解一下Hadoop定義的通信雙方需要遵循的一組協定,下面是協定接口的繼承層次關系,并作簡單介紹:

。org.apache.hadoop.ipc.VersionedProtocol

。org.apache.hadoop.hdfs.protocol.ClientProtocol

。org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol

。org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol

。org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol

。org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol

上述協定接口的基本含義簡述如下:

VersionedProtocol協定是Hadoop的最頂層協定接口的抽象;

ClientProtocol協定是使用者程序(包括用戶端程序與Datanode程序)與Namenode程序之間進行通信所使用的協定,例如,(1)用戶端程序需要向Datanode資料結點複制資料塊,需要與Namenode程序通信,擷取Datanode結點清單;(2)Datanode程序向Namenode程序發送心跳狀态報告和塊狀态報告需要與Namenode程序互動;

ClientDatanodeProtocol協定是用戶端程序與Datanode程序之間進行通信所使用的協定;

DatanodeProtocol協定是當Datanode程序需要與NameNode程序進行通信是需要基于此協定,例如發送心跳報告和塊狀态報告;

InterDatanodeProtocol協定是Datanode程序之間進行通信的協定,例如用戶端程序啟動複制資料塊,此時可能需要在Datanode結點之間進行塊副本的流水線複制操作。

下面我們一個一個來看:

  • VersionedProtocol協定

該接口的定義如下:

package org.apache.hadoop.ipc;

import java.io.IOException;

/**

* 使用Hadoop RPC機制的所有協定的超類

* 該接口的子類同樣支援具有一個static final long的版本屬性字段

*/

public interface VersionedProtocol {

/**

* 傳回與指定協定protocol相關的協定版本

* @param protocol 協定接口的類名

* @param clientVersion 用戶端欲與伺服器進行互動,它所使用的協定版本

* @return 傳回伺服器将要與用戶端進行互動,所需要使用的協定版本

*/

public long getProtocolVersion(String protocol, long clientVersion) throws IOException;

}

該接口是所有與Hadoop RPC調用相關的協定的最高層抽象。

  • ClientProtocol協定

當用戶端程序想要與Namenode程序進行通信的時候,需要通過org.apache.hadoop.hdfs.DistributedFileSystem類,基于ClientProtocol協定來實作互動過程。使用者代碼通過ClientProtocol協定,可以操縱HDFS的目錄命名空間、打開與關閉檔案流等。

該接口中定義了一個static final的versionID字段,如下所示:

/**

* 與先前的版本進行比較

* 41: saveNamespace introduced.

*/

public static final long versionID = 41L;

下面介紹,該接口協定中定義的與檔案内容相關的操作:

1、擷取塊位置資訊

定義的方法為getBlockLocations,該方法的參數如下:

/**

* @param src 檔案名稱

* @param offset 範圍的起始偏移位置

* @param length 範圍大小,即offset+length

*/

public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException;

getBlockLocations方法的主要功能說明如下:

擷取到指定範圍内,指定檔案的所有塊的位置資訊。 其中,調用該方法傳回的org.apache.hadoop.hdfs.protocol.LocatedBlocks對象包含的内容為:檔案長度、組成檔案的塊及其存儲位置(所在的Datanode資料結點)。對于每個塊所在DataNode結點的位置,市基于“到檔案系統用戶端位址的距離最近”的原則進行了排序。檔案系統用戶端必須與制定的Datanode進行互動,才能擷取到所需要的實際的塊據塊。

2、在命名空間中建立一個檔案入口(entry)

定義的方法為create,該方法的參數如下所示:

/**

* @param src 被建立檔案的路徑

* @param masked 權限

* @param clientName 目前用戶端名稱

* @param overwrite 如果待建立的檔案已經存在,指定是否重寫該檔案

* @param replication 塊副本因子

* @param blockSize 塊的最大長度

*/

public void create(

String src,

FsPermission masked,

String clientName,

boolean overwrite,

short replication,

long blockSize) throws IOException;

create方法的主要功能說明如下:

在命名空間中建立一個檔案入口。該方法将建立一個由src路徑指定的空檔案,該路徑src應該反映了從root目錄開始的一個完整路徑名稱。從用戶端的角度,Namenode并沒有“目前”目錄的概念。一旦檔案建立成功,該檔案就是可見的,并可以被其它用戶端來執行讀操作。但是,其它用戶端不能夠對該檔案進行删除、重命名、重寫,而這些操作隻有在該檔案被完全或明确指定為租約到期,才可以執行。

每個塊都具有最大長度限制,如果用戶端想要建立多個塊,可以調用addBlock(String, String)方法來實作。

3、追加寫檔案操作

定義的方法為append,該方法 的參數如下所示:

/**

* @param src 被建立檔案的路徑

* @param clientName 目前用戶端的名稱

*/

public LocatedBlock append(String src, String clientName) throws IOException;

append方法的主要功能說明如下:

向檔案src中追加寫入内容,傳回一個org.apache.hadoop.hdfs.protocol.LocatedBlock對象,該對象封裝了Block(Hadoop基本檔案塊)和DatanodeInfo[](Datanode的狀态資訊),通過追加寫操作後的傳回資訊,可以定位到追加寫入最後部分塊的資訊。

4、設定副本因子

定義的方法為setReplication,該方法 的參數如下所示:

/**

* Set replication for an existing file.

* <p>

* The NameNode sets replication to the new value and returns.

* The actual block replication is not expected to be performed during

* this method call. The blocks will be populated or removed in the

* background as the result of the routine block maintenance procedures.

*

* @param src 檔案名稱

* @param replication 新的副本因子

*/

public boolean setReplication(String src, short replication) throws IOException;

setReplication方法的主要功能說明如下:

該方法為一個指定的檔案修改塊副本因子。Namenode會為指定檔案設定副本因子,但是,不期望在調用該方法的過程修改實際塊的副本因子,而是由背景塊維護程序來執行:如果目前副本因子小于設定的新副本因子,需要增加一些塊副本,如果目前副本因子大于設定的新副本因子,就會删除一些塊副本。

另外,基于檔案操作的方法,還有如下一些:

/**

* 為已經存在的目錄或者檔案,設定給定的操作權限

*/

public void setPermission(String src, FsPermission permission) throws IOException;

/**

* 設定檔案或目錄屬主

*/

public void setOwner(String src, String username, String groupname) throws IOException;

/**

* 用戶端放棄對指定塊的操作

*/

public void abandonBlock(Block b, String src, String holder) throws IOException;

/**

* 用戶端向一個目前為寫操作打開的檔案寫入資料塊

*/

public LocatedBlock addBlock(String src, String clientName) throws IOException;

/**

* 用戶端完成對指定檔案的寫操作,并期望能夠寫完,在寫完以後關閉檔案

*/

public boolean complete(String src, String clientName) throws IOException;

/**

* 用戶端向Namenode報告corrupted塊的資訊(塊在Datanode上的位置資訊)

* @param blocks 待報告的塊資訊的數組

*/

public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;

/**

* 在檔案系統指令空間中重命名一個檔案或目錄

*/

public boolean rename(String src, String dst) throws IOException;

/**

* 删除檔案或目錄src

*/

public boolean delete(String src) throws IOException;

/**

* 删除檔案或目錄src,根據recursive選項來執行

*/

public boolean delete(String src, boolean recursive) throws IOException;

/**

* 建立目錄src,并賦予目錄src指定的nasked權限

*/

public boolean mkdirs(String src, FsPermission masked) throws IOException;

/**

* 擷取指定目錄src中的檔案清單

*/

public FileStatus[] getListing(String src) throws IOException;

下面是接口ClientProtocol定義的與系統的管理相關的方法的定義:

1、監聽用戶端

方法renewLease定義為:

public void renewLease(String clientName) throws IOException;

該方法主要是Namenode監聽到某個用戶端發送的心跳狀态,如果再一段時間内無法擷取到某個用戶端的心跳狀态,很可能是該用戶端因為某些異常崩潰掉了,被加上了不能繼續正常工作的狀态鎖,Namenode程序會周期地調用該方法來确定指定的用戶端clientName是否确實挂掉了,如果又重新接收到該用戶端發送的心跳報告,則為該用戶端進行解鎖操作,恢複其正常的工作。

2、擷取檔案系統的狀态統計資料

定義的方法為getStats,如下所示:

/**

* 傳回一組辨別檔案系統不同資訊的索引數組:

* <ul>

* <li> [0] 包含檔案系統總存儲容量(按位元組計算)</li>

* <li> [1] 包含檔案系統已使用空間(按位元組計算)</li>

* <li> [2] 包含檔案系統可使用空間(按位元組計算)</li>

* <li> [3] 包含檔案系統中不滿足副本因子數量的塊的數量</li>

* <li> [4] 包含corrupt副本的塊的數量 </li>

* <li> [5] 包含沒有任何可以用的塊副本的塊的數量</li>

* </ul>

*/

public long[] getStats() throws IOException;

上面傳回的long[]對應定義的如下幾個常量:

public int GET_STATS_CAPACITY_IDX = 0;

public int GET_STATS_USED_IDX = 1;

public int GET_STATS_REMAINING_IDX = 2;

public int GET_STATS_UNDER_REPLICATED_IDX = 3;

public int GET_STATS_CORRUPT_BLOCKS_IDX = 4;

public int GET_STATS_MISSING_BLOCKS_IDX = 5;

3、安全模式開關操作

定義的方法為setSafeMode,如下所示:

public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException;

通過調用該方法可以執行如下操作:進入安全模式、退出安全模式、擷取安全模式。

當一個Namenode啟動時候,首先會進入到安全模式這種特殊狀态,在該狀态下不能夠進行資料塊的複制操作。Namenode接收HDFS叢集中所有的Datanode結點的心跳狀态報告和資料塊狀态報告,根據狀态報告來決定是否開始進入工作狀态。如果某些Datanode結點發送的心跳狀态報告不正常或者根本無從接收到,Namenode會将這些Datanode視為故障結點,在進入工作狀态的時候,将這些故障結點排除在工作叢集之外。如果某些Datanode結點上的資料塊狀态報告存在問題,會根據要求進行處理,比如某Datanode據結點上資料塊的塊副本未達到副本因子,則會在退出安全模式之後,進行塊複制操作,滿足副本要求。

4、儲存命名空間映像

定義的方法為saveNamespace,如下所示:

/**

* 儲存命名空間映像

*

* 儲存FsImage映像,同時将更新同步到EditLog中。

* 要求具有超級權限,并且在安全模式下進行。

*/

public void saveNamespace() throws IOException;

5、持久化檔案系統中繼資料

定義的方法為metaSave,如下所示:

/**

* 将Namenode結點上的資料結構寫入到指定的檔案中,如果指定檔案已經存在,則追加到該檔案中

*/

public void metaSave(String filename) throws IOException;

這裡,引用Hadoop架構設計要點中的一段文字來描述一下,檔案系統中繼資料的持久化:

“Namenode上儲存着HDFS的名字空間。對于任何對檔案系統中繼資料産生修改的操作,Namenode都會使用一種稱為EditLog的事務日志記錄下來。例如,在HDFS中建立一個檔案,Namenode就會在Editlog中插入一條記錄來表示;同樣地,修改檔案的副本系數也将往Editlog插入一條記錄。Namenode在本地作業系統的檔案系統中存儲這個Editlog。整個檔案系統的名字空間,包括資料塊到檔案的映射、檔案的屬性等,都存儲在一個稱為FsImage的檔案中,這個檔案也是放在Namenode所在的本地檔案系統上。

Namenode在記憶體中儲存着整個檔案系統的名字空間和檔案資料塊映射(Blockmap)的映像。這個關鍵的中繼資料結構設計得很緊湊,因而一個有4G記憶體的Namenode足夠支撐大量的檔案和目錄。當Namenode啟動時,它從硬碟中讀取Editlog和FsImage,将所有Editlog中的事務作用在記憶體中的FsImage上,并将這個新版本的FsImage從記憶體中儲存到本地磁盤上,然後删除舊的Editlog,因為這個舊的Editlog的事務都已經作用在FsImage上了。這個過程稱為一個檢查點(checkpoint)。在目前實作中,檢查點隻發生在Namenode啟動時,在不久的将來将實作支援周期性的檢查點。

Datanode将HDFS資料以檔案的形式存儲在本地的檔案系統中,它并不知道有關HDFS檔案的資訊。它把每個HDFS資料塊存儲在本地檔案系統的一個單獨的檔案中。Datanode并不在同一個目錄建立所有的檔案,實際上,它用試探的方法來确定每個目錄的最佳檔案數目,并且在适當的時候建立子目錄。在同一個目錄中建立所有的本地檔案并不是最優的選擇,這是因為本地檔案系統可能無法高效地在單個目錄中支援大量的檔案。當一個Datanode啟動時,它會掃描本地檔案系統,産生一個這些本地檔案對應的所有HDFS資料塊的清單,然後作為報告發送到Namenode,這個報告就是塊狀态報告。”

另外,ClientProtocol協定接口還定義了其它一些方法,如下所示:

/**

* 擷取叢集系統中目前的Datanode的狀态報告

* 每個Datanode傳回一個DatanodeInfo對象,包括Datanode的類型LIVE、DEAD或ALL.

*/

public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException;

/**

* 擷取指定檔案的塊大小

*/

public long getPreferredBlockSize(String filename) throws IOException;

/**

* 告訴Namenode重新讀取叢集結點清單

*/

public void refreshNodes() throws IOException;

/**

* 完成之前的更新操作

* 删除在更新期間儲存的檔案系統的狀态,一旦執行該方法,改變将不可逆轉。

*/

public void finalizeUpgrade() throws IOException;

/**

* 報告分布式更新程序,或強制使目前更新執行

*/

public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException;

/**

* 擷取指定檔案或目錄的狀态

*/

public FileStatus getFileInfo(String src) throws IOException;

/**

* 擷取某個目錄的ContentSummary資訊

*/

public ContentSummary getContentSummary(String path) throws IOException;

/**

* 将指定檔案的全部中繼資料寫入到持久存儲中

* 其中,必須保證檔案為寫操作而打開。

*/

public void fsync(String src, String client) throws IOException;

/**

* 設定指定檔案的修改和通路時間

*/

public void setTimes(String src, long mtime, long atime) throws IOException;

/**

* 為指定目錄設定配置設定配額資訊

*/

public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException;

  • NamenodeProtocol協定

該協定接口定義了次級Namenode(Secondary NameNode)與Namenode進行通信所需進行的操作。其中,Secondary NameNode是一個用來輔助Namenode的伺服器端程序,主要是對映像檔案執行特定的操作,另外還包括擷取指定Datanode上塊的操作。

該接口定義的操作如下所示:

/**

* 擷取datanode上大小為size的塊

*/

public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException;

/**

* 擷取目前EditLog檔案的大小(in bytes)

*/

public long getEditLogSize() throws IOException;

/**

* 關閉目前EditLog檔案,并重新打開一個新的

* 當系統處于安全模式下,執行該方法會失敗

*/

public CheckpointSignature rollEditLog() throws IOException;

/**

* 復原FsImage日志:删除舊的FsImage,拷貝新的映像到FsImage檔案中,删除舊的EditLog檔案并重命名edits.new為edits。

*/

public void rollFsImage() throws IOException;

  • ClientDatanodeProtocol協定

當用戶端程序需要與Datanode程序進行通信的時候,需要基于該協定。該協定接口定義資料塊恢複的方法,如下所示:

public interface ClientDatanodeProtocol extends VersionedProtocol {

public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);

/**

* 3: add keepLength parameter.

*/

public static final long versionID = 3L;

/**

* @param block 指定的資料塊

* @param keepLength 保持資料塊的長度

* @param targets 指定的塊的可能位置清單

* @return 如果恢複成功,傳回塊ID和時間戳

*/

LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets) throws IOException;

}

  • DatanodeProtocol協定

該協定用于一個DFS Datanode使用者與Namenode進行通信的協定。該接口定義如下所示:

package org.apache.hadoop.hdfs.server.protocol;

import java.io.*;

import org.apache.hadoop.hdfs.protocol.Block;

import org.apache.hadoop.hdfs.protocol.DatanodeID;

import org.apache.hadoop.hdfs.protocol.LocatedBlock;

import org.apache.hadoop.ipc.VersionedProtocol;

public interface DatanodeProtocol extends VersionedProtocol {

/**

* 19: 發送心跳,傳回一個DatanodeCommand對象數組

*/

public static final long versionID = 19L;

// 定義錯誤代碼

final static int NOTIFY = 0;

final static int DISK_ERROR = 1;

final static int INVALID_BLOCK = 2;

/**

* 當接收到Datanode的指令的時候,根據下述狀态碼确定Datanode應該執行何種操作,

*/

final static int DNA_UNKNOWN = 0; // 未知

final static int DNA_TRANSFER = 1; // 将資料塊從一個Datanode轉移到另一個Datanode

final static int DNA_INVALIDATE = 2; // 未驗證資料塊

final static int DNA_SHUTDOWN = 3; // 關閉Datanode

final static int DNA_REGISTER = 4; // 重新注冊

final static int DNA_FINALIZE = 5; // 完成先前執行的更新操作

final static int DNA_RECOVERBLOCK = 6; // 資料塊恢複操作請求

/**

* 注冊Datanode

*

* @return 更新後的{@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration},包含:一個新的storageID(如果Datanode沒有storageID)、新的registration ID

*/

public DatanodeRegistration register(DatanodeRegistration registration) throws IOException;

/**

* Datanode向Namenode發送心跳狀态報告

*/

public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount) throws IOException;

/**

* Datanode向Namenode發送塊狀态報告

*/

public DatanodeCommand blockReport(DatanodeRegistration registration, long[] blocks) throws IOException;

/**

* Datanode向Namenode報告最近接收到的資料塊、删除的多餘塊副本

*/

public void blockReceived(DatanodeRegistration registration, Block blocks[], String[] delHints) throws IOException;

/**

* 向Namenode報告錯誤資訊

*/

public void errorReport(DatanodeRegistration registration, int errorCode, String msg) throws IOException;

public NamespaceInfo versionRequest() throws IOException;

/**

* Datanode向Namenode發送一個更新指令

*/

UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException;

/**

* Datanode向Namenode報告Bad Blocks

*/

public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;

/**

* 為資料塊生成新的時間戳

*/

public long nextGenerationStamp(Block block) throws IOException;

/**

* 在恢複資料塊期間,送出事務:資料塊同步

*/

public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) throws IOException;

}

一般來說Namenode不直接對Datanode進行RPC調用,如果一個Namenode需要與Datanode通信,唯一的方式就通過調用該協定接口定義的方法。

  • InterDatanodeProtocol協定

該協定用于Datanode程序之間進行通信。

該接口的定義如下所示:

package org.apache.hadoop.hdfs.server.protocol;

import java.io.IOException;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.hdfs.protocol.Block;

import org.apache.hadoop.ipc.VersionedProtocol;

public interface InterDatanodeProtocol extends VersionedProtocol {

public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);

/**

* 3: 向更新塊中增加一個表示完成的參數

*/

public static final long versionID = 3L;

/**

* 擷取指定塊的中繼資料

*/

BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;

/**

* 更新資料塊

*/

void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;

}

上面,對不同程序之間通信所使用的協定的接口進行了閱讀分析,應該能夠了解每種協定應用的場景,如果想要基于某種場景實作端端通信,可以選擇合适的協定接口來實作它。

繼續閱讀