天天看點

Hadoop-0.20.0源代碼分析(03)

在Hadoop架構源代碼org.apache.hadoop.fs包中,都是關于Hadoop檔案系統實作的相關類,主要包括檔案系統模型的建立,及其在該檔案系統定義、實作基本的檔案操作。例如給出檔案系統抽象,對檔案系統上存儲的檔案執行基本操作進行抽象,等等。

在該包中,類的繼承關系如下所示:

◦java.lang.Object

◦org.apache.hadoop.fs.BlockLocation (implements org.apache.hadoop.io.Writable)

◦org.apache.hadoop.conf.Configured (implements org.apache.hadoop.conf.Configurable)

◦org.apache.hadoop.fs.FileSystem (implements java.io.Closeable)

◦org.apache.hadoop.fs.FilterFileSystem

◦org.apache.hadoop.fs.ChecksumFileSystem

◦org.apache.hadoop.fs.InMemoryFileSystem

◦org.apache.hadoop.fs.LocalFileSystem

◦org.apache.hadoop.fs.HarFileSystem

◦org.apache.hadoop.fs.RawLocalFileSystem

◦org.apache.hadoop.fs.FsShell (implements org.apache.hadoop.util.Tool)

◦org.apache.hadoop.fs.Trash

◦org.apache.hadoop.fs.ContentSummary (implements org.apache.hadoop.io.Writable)

◦org.apache.hadoop.fs.FileChecksum (implements org.apache.hadoop.io.Writable)

◦org.apache.hadoop.fs.MD5MD5CRC32FileChecksum

◦org.apache.hadoop.fs.FileStatus (implements java.lang.Comparable<T>, org.apache.hadoop.io.Writable)

◦org.apache.hadoop.fs.FileSystem.Statistics

◦org.apache.hadoop.fs.FileUtil

◦org.apache.hadoop.fs.FileUtil.HardLink

◦org.apache.hadoop.fs.FsUrlStreamHandlerFactory (implements java.net.URLStreamHandlerFactory)

◦java.io.InputStream (implements java.io.Closeable)

◦java.io.FilterInputStream

◦java.io.BufferedInputStream

◦org.apache.hadoop.fs.BufferedFSInputStream (implements org.apache.hadoop.fs.PositionedReadable, org.apache.hadoop.fs.Seekable)

◦java.io.DataInputStream (implements java.io.DataInput)

◦org.apache.hadoop.fs.FSDataInputStream (implements org.apache.hadoop.fs.PositionedReadable, org.apache.hadoop.fs.Seekable)

◦org.apache.hadoop.fs.FSInputStream (implements org.apache.hadoop.fs.PositionedReadable, org.apache.hadoop.fs.Seekable)

◦org.apache.hadoop.fs.FSInputChecker

◦org.apache.hadoop.fs.LocalDirAllocator

◦java.io.OutputStream (implements java.io.Closeable, java.io.Flushable)

◦java.io.FilterOutputStream

◦java.io.DataOutputStream (implements java.io.DataOutput)

◦org.apache.hadoop.fs.FSDataOutputStream (implements org.apache.hadoop.fs.Syncable)

◦org.apache.hadoop.fs.FSOutputSummer

◦org.apache.hadoop.fs.Path (implements java.lang.Comparable<T>)

◦org.apache.hadoop.util.Shell

◦org.apache.hadoop.fs.DF

◦org.apache.hadoop.fs.DU

◦java.lang.Throwable (implements java.io.Serializable)

◦java.lang.Error

◦org.apache.hadoop.fs.FSError

◦java.lang.Exception

◦java.io.IOException

◦org.apache.hadoop.fs.ChecksumException

首先對檔案系統最頂層抽象類FileSystem進行源代碼的閱讀分析。

FileSystem抽象類繼承自org.apache.hadoop.conf.Configured配置基類,實作了java.io.Closeable接口,通過這一點,可以了解到,FileSystem抽象類作為一個檔案系統的抽象定義,它是可配置的,也就是說可以通過指定的配置檔案中的一些配置項來描述一個檔案系統,實際上,最重要的配置類是org.apache.hadoop.conf.Configuration,org.apache.hadoop.conf.Configured中定義的方法就是對org.apache.hadoop.conf.Configuration配置類進行設定或擷取,滿足一個基于org.apache.hadoop.conf.Configuration配置類的其它類的需要。

FileSystem抽象類定義了檔案系統所具有的基本特征和基本操作。首先從該抽象類的屬性定義來看,這些屬性描述了檔案系統的靜态特性。該類中定義了如下屬性:

private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";

/** 檔案系統緩存 */

private static final Cache CACHE = new Cache();

/** 該檔案系統(this)在緩存中的鍵執行個體 */

private Cache.Key key;

/** 記錄檔案系統類的統計資訊的Map */

private static final Map<Class<? extends FileSystem>, Statistics> statisticsTable = new IdentityHashMap<Class<? extends FileSystem>, Statistics>();

/**

* 該檔案系統(this)的統計資訊的執行個體

*/

protected Statistics statistics;

/**

* 當檔案系統關閉或者JVM退出以後,需要将緩存中的檔案清空。該Set<Path>中的内容是,對緩存中檔案的Path,并且是排好序的。

*/

private Set<Path> deleteOnExit = new TreeSet<Path>();

Hadoop架構實作的檔案系統,從FileSystem的Cache CACHE的含義可以看出,一個檔案系統可以管理與它相關的并被緩存的多個檔案系統的執行個體,這一組檔案系統協調存儲工作,并為Hadoop實作的MapReduce并行計算架構的機制提供便利的存儲基礎。

檔案系統緩存

 FileSystem抽象類定義了一個檔案系統緩存Cache CACHE,用來緩存檔案系統對象。也就是可能存在多個檔案系統對象,進而可知,每個檔案系統除了管理基于其上的内容之外,還可能要管理緩存的一組檔案系統執行個體,這要看具體的檔案系統是如何實作的。

當然,也可能是在分布式環境中,一個檔案系統管理遠端的和本地的檔案系統執行個體。

為了能夠快速擷取到一個存在于緩存中的檔案系統對象,Hadoop采用了Hash算法,将檔案系統對象以鍵值對的方式存儲到HashMap中,也就是org.apache.hadoop.fs.FileSystem.Cache緩存類定義的map屬性,如下所示:

private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();

  其中,org.apache.hadoop.fs.FileSystem.Cache.Key是org.apache.hadoop.fs.FileSystem.Cache的一個内部靜态類,作為緩存Cache中Map的鍵,一個Key所包含的内容就是一個URI的資訊及其使用者名,下面是Key類的屬性:

final String scheme;

final String authority;

final String username;

緩存org.apache.hadoop.fs.FileSystem.Cache的Map的值是繼承自FileSystem抽象類的子類。可以看出,可以通過一個合法的URI資訊與使用者名快速擷取到緩存中存在的一個檔案系統的對象,進而能夠擷取到指定檔案系統中檔案資訊。該緩存類提供了3個基本的操作,如下所示:

/** 根據URI與Configuration,從緩存中取出一個FileSystem執行個體,要求同步緩存操作。 */

synchronized FileSystem get(URI uri, Configuration conf) throws IOException;

/** 根據指定的緩存Key執行個體,從緩存中删除該Key對應的FileSystem執行個體,要求同步緩存操作。 */

synchronized void remove(Key key, FileSystem fs);

/** 疊代緩存Map,删除緩存中的緩存的全部檔案系統執行個體,要求同步緩存操作。 */

synchronized void closeAll() throws IOException;

檔案系統統計資訊

上面statisticsTable是一個IdentityHashMap<Class<? extends FileSystem>, Statistics>,鍵是繼承自FileSystem的Class,值是統計資訊Statistics類。為了在一個并行計算環境中進行安全的計算,Statistics類使用了java.util.concurrent.atomic包中的原子變量屬性,保證線程安全的原子讀寫操作的同時,提高并行性能。如下所示:

private AtomicLong bytesRead = new AtomicLong();

private AtomicLong bytesWritten = new AtomicLong();

 其中,bytesRead是從統計資料中讀取指定數量的位元組,加到目前讀取位元組數上。同理,bytesRead是基于原子寫操作的。

另外一個統計資料屬性protected Statistics statistics,是對目前(this)的FileSystem的統計資訊執行個體。該屬性是在該檔案系統(this)的執行個體被構造完成之後被初始化的,通過調用initialize方法實作統計資訊初始化:

public void initialize(URI name, Configuration conf) throws IOException {

statistics = getStatistics(name.getScheme(), getClass());

}

 然後又在initialize方法内部調用了getStatistics方法擷取到一個初始化的Statistics執行個體。在該方法中,在執行個體化一個Statistics執行個體以後,需要将它加入到統計資訊執行個體的緩存statisticsTable中,以便能夠通過給定的URI快速擷取到對應的檔案系統的統計資訊。

為了便捷操作檔案系統的統計資訊,Filesystem類實作了幾個非常友善的方法,下面隻列出方法聲明:

public static synchronized Map<String, Statistics> getStatistics();

public static synchronized List<Statistics> getAllStatistics();

public static synchronized Statistics getStatistics(String scheme, Class<? extends FileSystem> cls);

public static synchronized void clearStatistics();

public static synchronized void printStatistics() throws IOException;

這幾個方法,都是從statisticsTable中擷取到檔案系統的統計資訊。

檔案緩存

屬性Set<Path> deleteOnExit是一個檔案緩存,它用來收集目前緩存中的檔案Path。當檔案系統關閉,或者JVM退出的時候,需要将緩存中的檔案全部删除。删除緩存檔案的方法是在processDeleteOnExit方法中,如下所示:

/**

* 删除緩存deleteOnExit中的全部檔案,需要同步deleteOnExit。

*/

protected void processDeleteOnExit() {

synchronized (deleteOnExit) {

for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {

Path path = iter.next();

try {

delete(path, true); // 調用,删除目錄,及其子目錄和檔案

}

catch (IOException e) {

LOG.info("Ignoring failure to deleteOnExit for path " + path);

}

iter.remove();

}

}

}

當一個FileSystem關閉以後,需要将該檔案系統對應的Path加入到檔案緩存deleteOnExit中,以便在檔案系統關閉或JVM退出時,調用processDeleteOnExit方法删除這些檔案。向檔案緩存中加入一個可能在檔案系統關閉或JVM退出時删除的檔案,在deleteOnExit方法中實作的。

檔案系統抽象

下面,從FileSystem抽象類“抽象”的切面橫向了解一個FileSystem定義了哪些基于檔案系統的操作,使我們能夠知道如果實作一個基于檔案系統,需要實作哪些基本操作。如下所示,FileSystem抽象類中定義了12個抽象方法:

/** 擷取能夠唯一辨別一個FileSystem的URI*/

public abstract URI getUri();

/**

* 根據給定的Path f,打開一個檔案的FSDataInputStream輸入流。

* @param f 待打開的檔案

* @param bufferSize 緩沖區大小

*/

public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;

/**

* 為寫入程序打開一個FSDataOutputStream。

* @param f 待寫入的檔案

* @param permission 權限

* @param overwrite 是否重寫

* @param bufferSize 緩沖區大小

* @param replication 檔案的塊副本數量

* @param blockSize 塊大小

* @param progress 用于報告Hadoop架構工作狀況的程序

* @throws IOException

*/

public abstract FSDataOutputStream create(Path f,

FsPermission permission,

boolean overwrite,

int bufferSize,

short replication,

long blockSize,

Progressable progress) throws IOException;

/**

* 向一個已經存在的檔案中執行追加操作

* @param f 存在的檔案

* @param bufferSize 緩沖區大小

* @param progress 報告程序

* @throws IOException

*/

public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException;

/**

* 重命名檔案src為dst

*/

public abstract boolean rename(Path src, Path dst) throws IOException;

/**

* 删除檔案

*/

public abstract boolean delete(Path f) throws IOException;

/**

* 删除檔案

*/

public abstract boolean delete(Path f, boolean recursive) throws IOException;

/**

* 如果f是一個目錄,列出該目錄中的檔案

*/

public abstract FileStatus[] listStatus(Path f) throws IOException;

/**

* 為給定的檔案系統設定目前工作目錄

*/

public abstract void setWorkingDirectory(Path new_dir);

/**

* 擷取檔案系統的目前工作目錄

*/

public abstract Path getWorkingDirectory();

/**

* 建立一個目錄f

*/

public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException;

/**

* 擷取與f對應的統計資訊執行個體

*/

public abstract FileStatus getFileStatus(Path f) throws IOException;

上面這些抽象方法應該是一個檔案系統應該具備的基本操作,可能根據不同的需要設計一個基于FileSystem抽象類的子類實作類,這個檔案系統的實作中,對于某些操作的實作細節可能因為檔案系統的特點而不全相同。是以,可以靈活設計你所需要的檔案系統。

檔案操作

在Filesystem檔案系統上,與檔案相關的操作很多,主要包括檔案的建立、讀寫、重命名、拷貝、删除這幾個基本操作。

檔案的建立,包括目錄的建立和非目錄檔案的建立,建立目錄的方法如下:

public boolean mkdirs(Path f) throws IOException {

return mkdirs(f, FsPermission.getDefault());

}

public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException;

Filesystem抽象類沒有實作如何建立目錄的細節。

另外,還有一個跨檔案系統執行建立目錄操作的實作:

public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) throws IOException {

boolean result = fs.mkdirs(dir); // 基于預設權限建立一個目錄,傳回檔案輸出流對象

fs.setPermission(dir, permission); // 設定fs中建立dir目錄的權限

return result;

}

通過這個方法可以看出,是在目前檔案系統(this)中,在另一個檔案系統fs中根據指定的權限來建立一個目錄,顯然這是在分布式地進行目錄的遠端建立操作。

對于非目錄檔案的建立,主要是為了讀或寫操作而打開一個檔案,傳回檔案的流對象,可以進行流式讀寫與追加。對建立檔案的操作,有10個重載的方法都是基于一個create抽象方法的:

public abstract FSDataOutputStream create(Path f,

FsPermission permission,

boolean overwrite,

int bufferSize,

short replication,

long blockSize,

Progressable progress) throws IOException;

還有一個比較特殊的create方法,如下所示:

public static FSDataOutputStream create(FileSystem fs,

Path file, FsPermission permission) throws IOException {

FSDataOutputStream out = fs.create(file); // 基于預設權限建立一個檔案,傳回檔案輸出流對象

fs.setPermission(file, permission); // 設定fs中建立file檔案的權限

return out;

}

通過這個方法的參數可以看出,是在目前檔案系統(this)中,在另一個檔案系統fs中根據指定的權限來建立一個檔案,顯然這是在分布式地進行檔案的遠端建立操作。隻要該檔案系統的的權限滿足遠端檔案系統fs的建立要求,并滿足必要的通信條件,就可以執行分布式檔案操作。

另外還有兩個open方法是用來打開已經存在的檔案而且傳回檔案流對象;一個createNewFile方法内部實作也是調用了create方法。

檔案的追加操作,是通過三個重載的append方法實作的,追加寫操作成功完成之後,傳回org.apache.hadoop.fs.FSDataOutputStream流對象。

檔案的重命名操作,是通過抽象方法rename(Path, Path)定義的。

檔案的删除操作,是通過delete方法定義的。

本地檔案的拷貝操作,主要是通過兩組重載的方法實作的。一組是重載的copyFromLocalFile方法:拷貝源檔案到目的檔案,保留源檔案(複制操作);另一組是重載的moveFromLocalFile方法:拷貝源檔案到目的檔案,删除源檔案,這是檔案的移動操作(就是剪切操作)。

檔案、塊、副本

關于檔案和塊,可以通過Hadoop的架構設計中了解到一些相關資訊,一些參數的含義及其設定。

關于塊(Block),FileSystem中定義了如下兩個方法:

/**

* 擷取檔案f的塊大小

*/

public long getBlockSize(Path f) throws IOException {

return getFileStatus(f).getBlockSize();

}

/**

* 擷取預設塊大小

*/

public long getDefaultBlockSize() {

// default to 32MB

return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);

}

為了保證Hadoop分布式檔案系統的可靠性與可用性,使用了檔案副本備援存儲和流水線複制技術。那麼對于檔案副本的設定也是有一定要求的。下面是關于副本的一些參數的操作:

/**

* 設定檔案src的replication因子為replication

*/

public boolean setReplication(Path src, short replication) hrows IOException {

return true;

}

/**

* 擷取檔案src的replication因子

*/

@Deprecated

public short getReplication(Path src) throws IOException {

return getFileStatus(src).getReplication();

}

/**

* 擷取檔案的預設副本個數,亦即replication因子

*/

public short getDefaultReplication() { return 1; }

關于檔案的狀态資訊,可以通過一組重載的listStatus方法來擷取,檔案狀态資訊通過org.apache.hadoop.fs.FileStatus實體類來統計,該類實作了org.apache.hadoop.io.Writable接口,是以是可序列化的。它主要包含檔案的下述資訊:

private Path path; // 檔案路徑

private long length; // 檔案長度

private boolean isdir; // 是否是目錄

private short block_replication; // 塊副本因子

private long blocksize; // 塊大小

private long modification_time; // 修改時間

private long access_time; // 通路時間

private FsPermission permission; //在指定檔案系統中的操作權限

private String owner; // 檔案屬主

private String group; // 所屬組

對于塊,塊是組成檔案的基本機關,那麼給定一個檔案,它就應該具有一個塊的清單,可以通過getFileBlockLocations方法擷取到一個檔案對應的塊所在主機的清單、所在檔案中的偏移位置等資訊,如下:

/**

* 傳回一個BlockLocation[],它包含了主機名清單、偏移位置、檔案大小的資訊

*/

public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {

if (file == null) {

return null;

}

if ( (start<0) || (len < 0) ) {

throw new IllegalArgumentException("Invalid start or len parameter");

}

if (file.getLen() < start) {

return new BlockLocation[0];

}

String[] name = { "localhost:50010" };

String[] host = { "localhost" };

return new BlockLocation[] { new BlockLocation(name, host, 0, file.getLen()) };

}

其中,org.apache.hadoop.fs.BlockLocation類具有一個指定檔案的塊的資訊,它實作了org.apache.hadoop.io.Writable接口,是以是可序列化的,它具有的資訊如下所示:

private String[] hosts; // hostnames of datanodes

private String[] names; // hostname:portNumber of datanodes

private String[] topologyPaths; // full path name in network topology

private long offset; // 塊在檔案中的偏移位置

private long length;

另外,Filesystem類中還定義了globStatus方法,用于根據指定的PathFilter來過濾檔案系統中的檔案Path,進而傳回滿足過濾條件的Path的檔案狀态資訊的數組FileStatus[]。

繼續閱讀