天天看點

Hadoop-0.20.0源代碼分析(08)

這裡,繼續對FsShell類中一些指令進行閱讀分析,主要是看與拷貝檔案有關的幾個指令。

  • cp指令

該指令實作對檔案的拷貝操作,并且支援在不同的檔案系統之間進行檔案的拷貝。拷貝檔案涉及的操作比較複雜,核心拷貝操作還是調用了org.apache.hadoop.fs.FileUtil類的copy方法實作的。 先看該類中定義的其中一個copy方法的實作:

private int copy(String argv[], Configuration conf) throws IOException {

int i = 0;

int exitCode = 0;

String cmd = argv[i++];

String dest = argv[argv.length-1]; // 指令行中最後一個參數

// 若指定了多個輸入源檔案,則最後一個參數必須是一個目錄

if (argv.length > 3) {

Path dst = new Path(dest);

if (!fs.isDirectory(dst)) { // 最後一個參數必須是目錄

throw new IOException("When copying multiple files, " + "destination " + dest + " should be a directory.");

}

}

// 循環對每一個檔案進行拷貝操作

for (; i < argv.length - 1; i++) {

try {

copy(argv[i], dest, conf); // 将檔案argv[i]拷貝到dest目錄中

} catch (RemoteException e) {

// 捕獲指令執行發生的異常資訊

exitCode = -1;

try {

String[] content;

content = e.getLocalizedMessage().split("/n");

System.err.println(cmd.substring(1) + ": " +

content[0]);

} catch (Exception ex) {

System.err.println(cmd.substring(1) + ": " +

ex.getLocalizedMessage());

}

} catch (IOException e) {

// 捕獲IO異常資訊

exitCode = -1;

System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());

}

}

return exitCode;

}

該指令的實作與mv指令類似,這裡調用了一個重載的copy指令,實作對一個檔案執行拷貝操作。該重載的拷貝方法如下所示:

void copy(String srcf, String dstf, Configuration conf) throws IOException {

Path srcPath = new Path(srcf); // 構造Path

FileSystem srcFs = srcPath.getFileSystem(getConf()); // 擷取到srcPath所在的檔案系統srcFs

Path dstPath = new Path(dstf);

FileSystem dstFs = dstPath.getFileSystem(getConf()); // 擷取到dstPath所在的檔案系統dstFs

Path [] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPath), srcPath); // 擷取到srcFs中滿足srcPath模式的FileStatus[]并轉換成為Path[]

if (srcs.length > 1 && !dstFs.isDirectory(dstPath)) {

throw new IOException("When copying multiple files, " + "destination should be a directory.");

}

for(int i=0; i<srcs.length; i++) { // 循環拷貝操作

FileUtil.copy(srcFs, srcs[i], dstFs, dstPath, false, conf); // 調用FileUtil類的拷貝方法copy完成檔案在srcFs與dstFs檔案系統之間拷貝檔案的操作

}

}

現在,我們開始追蹤 org.apache.hadoop.fs.FileUtil類的copy方法,看一看拷貝到底是如何實作的。FileUtil類中定義了多個重載的拷貝方法copy,我們隻從FsShell類中調用的copy方法開始追蹤其實作。上面調用的FileUtil類的copy實作如下所示:

public static boolean copy(FileSystem srcFS, Path src,

FileSystem dstFS, Path dst,

boolean deleteSource,

Configuration conf) throws IOException {

return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); // 調用了一個重載的copy方法實作檔案在srcFS與dstFS之間進行複制

}

看重載的copy方法的實作,如下所示:

public static boolean copy(FileSystem srcFS, Path src,

FileSystem dstFS, Path dst,

boolean deleteSource,

boolean overwrite,

Configuration conf) throws IOException {

dst = checkDest(src.getName(), dstFS, dst, overwrite); // 檢查目的檔案系統dstFS中dst目錄是否合法(參照src)

if (srcFS.getFileStatus(src).isDir()) { // 若源檔案系統srcFS中src是目錄

checkDependencies(srcFS, src, dstFS, dst); // 檢查檔案依賴性:如果srcFS=dstFS,并且dst不是src的子目錄,檢查通過;如果srcFS與dstFS不是同一個檔案系統,依賴性檢查通過

if (!dstFS.mkdirs(dst)) { // 在dstFS中建立dst目錄,準備向其中拷貝資料

return false;

}

FileStatus contents[] = srcFS.listStatus(src); // 擷取srcFS中src目錄下的檔案清單

for (int i = 0; i < contents.length; i++) { // 分治思想:分治後執行遞歸拷貝檔案操作

copy(srcFS, contents[i].getPath(), dstFS,

new Path(dst, contents[i].getPath().getName()),

deleteSource, overwrite, conf); // 遞歸調用

}

} else if (srcFS.isFile(src)) { // 遞歸出口(如果src是一個普通檔案)

InputStream in=null;

OutputStream out = null;

try {

in = srcFS.open(src); // 打開srcFS中的src檔案,并傳回輸入流對象

out = dstFS.create(dst, overwrite); // 在目的檔案系統dstFS中建立dst檔案,并傳回輸出流,等待寫入

IOUtils.copyBytes(in, out, conf, true); // 調用:通過調用IOUtils類的copyBytes方法實作流式拷貝

} catch (IOException e) {

IOUtils.closeStream(out); // 關閉out

IOUtils.closeStream(in); // 關閉in

throw e;

}

} else {

throw new IOException(src.toString() + ": No such file or directory");

}

if (deleteSource) { // 如果設定了拷貝完成後删除源檔案選項

return srcFS.delete(src, true); // 删除源檔案系統srcFS的源檔案src

} else {

return true;

}

}

IOUtils類中實作了Hadoop檔案系統中檔案的 流式拷貝操作,我們追蹤該工具類的copyBytes方法,分析實作的過程。該方法如下所示:

/**

* 從一個流拷貝到另一個流中

*/

public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)

throws IOException {

copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); // 調用:重載的copyBytes方法實作流式拷貝

}

我們看重載流式拷貝實作方法copyBytes,如下所示:

public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {

PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; // 使用PrintStream為out流增加便捷功能

byte buf[] = new byte[buffSize]; // 位元組緩沖區

try {

int bytesRead = in.read(buf); // 從輸入流in讀取bytesRead個位元組到buf緩沖區中

while (bytesRead >= 0) { // 确實讀取到了位元組

out.write(buf, 0, bytesRead); // 将從in中讀取到的位元組,通過buf緩沖區寫入到輸出流out中

if ((ps != null) && ps.checkError()) { // 如果ps=(PrintStream)out,測試内部标志,并自動重新整理

throw new IOException("Unable to write to output stream.");

}

bytesRead = in.read(buf); // 繼續從in讀取位元組

}

} finally {

if(close) {

out.close(); // 關閉out

in.close(); // 關閉in

}

}

}

上面在從InputStream in拷貝到OutputStream out中的過程中,使用了更加高效的PrintStream流類,它能夠為OutputStream增加友善列印各種資料值的表示形式,而且,它不會抛出IO異常,而是将流式拷貝過程中發生的異常,設定為通過調用checkError方法來檢測内部的标志。另外,它還可以實作自動重新整理,在向輸出流中寫入位元組(通過位元組緩沖區)之後,自動重新整理。

cp指令的具體實作都在上面進行分析了,應該能夠了解在Hadoop中如何在不同檔案系統之間執行流式拷貝檔案的過程。

  • copyFromLocal指令

該指令實作了從本地檔案系統拷貝檔案的操作。實作方法為,如下所示:

/**

* 從本地檔案系統(srcs在本地檔案系統中)拷貝srcs到目的檔案系統(對應Path為dstf)

*/

void copyFromLocal(Path[] srcs, String dstf) throws IOException {

Path dstPath = new Path(dstf);

FileSystem dstFs = dstPath.getFileSystem(getConf()); // 擷取到目的檔案系統dstFs

if (srcs.length == 1 && srcs[0].toString().equals("-")) // 如果隻指定了一個參數“-”

copyFromStdin(dstPath, dstFs); // 調用:從标準輸入流中進行流式拷貝操作

else // 否則

dstFs.copyFromLocalFile(false, false, srcs, dstPath); // 調用目的檔案系統dstFs的copyFromLocalFile方法執行拷貝操作

}  

我們關注一下copyFromStdin方法拷貝的實作,如下所示:

private void copyFromStdin(Path dst, FileSystem dstFs) throws IOException {

if (dstFs.isDirectory(dst)) { // 如果目的檔案是目錄,不支援源為标準輸入流的情況

throw new IOException("When source is stdin, destination must be a file.");

}

if (dstFs.exists(dst)) { // 如果目的檔案系統dstFs中存在檔案dst,出錯

throw new IOException("Target " + dst.toString() + " already exists.");

}

FSDataOutputStream out = dstFs.create(dst); // 滿足拷貝要求,執行流式拷貝操作

try {

IOUtils.copyBytes(System.in, out, getConf(), false); // 調用IOUtils類的copyBytes方法實作,前面已經分析過拷貝過程

}

finally {

out.close(); // 拷貝完成,關閉輸出流out

}

}

再看一下,如果指定的是待拷貝的檔案源不是标準輸入流的情況,檔案系統FileSystem是如何實作拷貝操作的。實作的方法copyFromLocalFile如下所示:

/**

* 将本地的srcs,拷貝到目的檔案系統中的dst

* delSrc訓示了拷貝檔案完成之後,是否删除源檔案srcs

*/

public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)

throws IOException {

Configuration conf = getConf();

FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf); // 調用FileUtil工具類實作拷貝操作

}

關于FileUtil的copy方法,前面已經詳細分析過,不再累述。

像moveFromLocal、moveFromLocal、copyToLocal、moveToLocal、copyMergeToLocal等指令的實作都非常類似,也不做過多的解釋了。