這裡,繼續對FsShell類中一些指令進行閱讀分析,主要是看與拷貝檔案有關的幾個指令。
- cp指令
該指令實作對檔案的拷貝操作,并且支援在不同的檔案系統之間進行檔案的拷貝。拷貝檔案涉及的操作比較複雜,核心拷貝操作還是調用了org.apache.hadoop.fs.FileUtil類的copy方法實作的。 先看該類中定義的其中一個copy方法的實作:
private int copy(String argv[], Configuration conf) throws IOException {
int i = 0;
int exitCode = 0;
String cmd = argv[i++];
String dest = argv[argv.length-1]; // 指令行中最後一個參數
// 若指定了多個輸入源檔案,則最後一個參數必須是一個目錄
if (argv.length > 3) {
Path dst = new Path(dest);
if (!fs.isDirectory(dst)) { // 最後一個參數必須是目錄
throw new IOException("When copying multiple files, " + "destination " + dest + " should be a directory.");
}
}
// 循環對每一個檔案進行拷貝操作
for (; i < argv.length - 1; i++) {
try {
copy(argv[i], dest, conf); // 将檔案argv[i]拷貝到dest目錄中
} catch (RemoteException e) {
// 捕獲指令執行發生的異常資訊
exitCode = -1;
try {
String[] content;
content = e.getLocalizedMessage().split("/n");
System.err.println(cmd.substring(1) + ": " +
content[0]);
} catch (Exception ex) {
System.err.println(cmd.substring(1) + ": " +
ex.getLocalizedMessage());
}
} catch (IOException e) {
// 捕獲IO異常資訊
exitCode = -1;
System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());
}
}
return exitCode;
}
該指令的實作與mv指令類似,這裡調用了一個重載的copy指令,實作對一個檔案執行拷貝操作。該重載的拷貝方法如下所示:
void copy(String srcf, String dstf, Configuration conf) throws IOException {
Path srcPath = new Path(srcf); // 構造Path
FileSystem srcFs = srcPath.getFileSystem(getConf()); // 擷取到srcPath所在的檔案系統srcFs
Path dstPath = new Path(dstf);
FileSystem dstFs = dstPath.getFileSystem(getConf()); // 擷取到dstPath所在的檔案系統dstFs
Path [] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPath), srcPath); // 擷取到srcFs中滿足srcPath模式的FileStatus[]并轉換成為Path[]
if (srcs.length > 1 && !dstFs.isDirectory(dstPath)) {
throw new IOException("When copying multiple files, " + "destination should be a directory.");
}
for(int i=0; i<srcs.length; i++) { // 循環拷貝操作
FileUtil.copy(srcFs, srcs[i], dstFs, dstPath, false, conf); // 調用FileUtil類的拷貝方法copy完成檔案在srcFs與dstFs檔案系統之間拷貝檔案的操作
}
}
現在,我們開始追蹤 org.apache.hadoop.fs.FileUtil類的copy方法,看一看拷貝到底是如何實作的。FileUtil類中定義了多個重載的拷貝方法copy,我們隻從FsShell類中調用的copy方法開始追蹤其實作。上面調用的FileUtil類的copy實作如下所示:
public static boolean copy(FileSystem srcFS, Path src,
FileSystem dstFS, Path dst,
boolean deleteSource,
Configuration conf) throws IOException {
return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); // 調用了一個重載的copy方法實作檔案在srcFS與dstFS之間進行複制
}
看重載的copy方法的實作,如下所示:
public static boolean copy(FileSystem srcFS, Path src,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
dst = checkDest(src.getName(), dstFS, dst, overwrite); // 檢查目的檔案系統dstFS中dst目錄是否合法(參照src)
if (srcFS.getFileStatus(src).isDir()) { // 若源檔案系統srcFS中src是目錄
checkDependencies(srcFS, src, dstFS, dst); // 檢查檔案依賴性:如果srcFS=dstFS,并且dst不是src的子目錄,檢查通過;如果srcFS與dstFS不是同一個檔案系統,依賴性檢查通過
if (!dstFS.mkdirs(dst)) { // 在dstFS中建立dst目錄,準備向其中拷貝資料
return false;
}
FileStatus contents[] = srcFS.listStatus(src); // 擷取srcFS中src目錄下的檔案清單
for (int i = 0; i < contents.length; i++) { // 分治思想:分治後執行遞歸拷貝檔案操作
copy(srcFS, contents[i].getPath(), dstFS,
new Path(dst, contents[i].getPath().getName()),
deleteSource, overwrite, conf); // 遞歸調用
}
} else if (srcFS.isFile(src)) { // 遞歸出口(如果src是一個普通檔案)
InputStream in=null;
OutputStream out = null;
try {
in = srcFS.open(src); // 打開srcFS中的src檔案,并傳回輸入流對象
out = dstFS.create(dst, overwrite); // 在目的檔案系統dstFS中建立dst檔案,并傳回輸出流,等待寫入
IOUtils.copyBytes(in, out, conf, true); // 調用:通過調用IOUtils類的copyBytes方法實作流式拷貝
} catch (IOException e) {
IOUtils.closeStream(out); // 關閉out
IOUtils.closeStream(in); // 關閉in
throw e;
}
} else {
throw new IOException(src.toString() + ": No such file or directory");
}
if (deleteSource) { // 如果設定了拷貝完成後删除源檔案選項
return srcFS.delete(src, true); // 删除源檔案系統srcFS的源檔案src
} else {
return true;
}
}
IOUtils類中實作了Hadoop檔案系統中檔案的 流式拷貝操作,我們追蹤該工具類的copyBytes方法,分析實作的過程。該方法如下所示:
/**
* 從一個流拷貝到另一個流中
*/
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
throws IOException {
copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); // 調用:重載的copyBytes方法實作流式拷貝
}
我們看重載流式拷貝實作方法copyBytes,如下所示:
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; // 使用PrintStream為out流增加便捷功能
byte buf[] = new byte[buffSize]; // 位元組緩沖區
try {
int bytesRead = in.read(buf); // 從輸入流in讀取bytesRead個位元組到buf緩沖區中
while (bytesRead >= 0) { // 确實讀取到了位元組
out.write(buf, 0, bytesRead); // 将從in中讀取到的位元組,通過buf緩沖區寫入到輸出流out中
if ((ps != null) && ps.checkError()) { // 如果ps=(PrintStream)out,測試内部标志,并自動重新整理
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf); // 繼續從in讀取位元組
}
} finally {
if(close) {
out.close(); // 關閉out
in.close(); // 關閉in
}
}
}
上面在從InputStream in拷貝到OutputStream out中的過程中,使用了更加高效的PrintStream流類,它能夠為OutputStream增加友善列印各種資料值的表示形式,而且,它不會抛出IO異常,而是将流式拷貝過程中發生的異常,設定為通過調用checkError方法來檢測内部的标志。另外,它還可以實作自動重新整理,在向輸出流中寫入位元組(通過位元組緩沖區)之後,自動重新整理。
cp指令的具體實作都在上面進行分析了,應該能夠了解在Hadoop中如何在不同檔案系統之間執行流式拷貝檔案的過程。
- copyFromLocal指令
該指令實作了從本地檔案系統拷貝檔案的操作。實作方法為,如下所示:
/**
* 從本地檔案系統(srcs在本地檔案系統中)拷貝srcs到目的檔案系統(對應Path為dstf)
*/
void copyFromLocal(Path[] srcs, String dstf) throws IOException {
Path dstPath = new Path(dstf);
FileSystem dstFs = dstPath.getFileSystem(getConf()); // 擷取到目的檔案系統dstFs
if (srcs.length == 1 && srcs[0].toString().equals("-")) // 如果隻指定了一個參數“-”
copyFromStdin(dstPath, dstFs); // 調用:從标準輸入流中進行流式拷貝操作
else // 否則
dstFs.copyFromLocalFile(false, false, srcs, dstPath); // 調用目的檔案系統dstFs的copyFromLocalFile方法執行拷貝操作
}
我們關注一下copyFromStdin方法拷貝的實作,如下所示:
private void copyFromStdin(Path dst, FileSystem dstFs) throws IOException {
if (dstFs.isDirectory(dst)) { // 如果目的檔案是目錄,不支援源為标準輸入流的情況
throw new IOException("When source is stdin, destination must be a file.");
}
if (dstFs.exists(dst)) { // 如果目的檔案系統dstFs中存在檔案dst,出錯
throw new IOException("Target " + dst.toString() + " already exists.");
}
FSDataOutputStream out = dstFs.create(dst); // 滿足拷貝要求,執行流式拷貝操作
try {
IOUtils.copyBytes(System.in, out, getConf(), false); // 調用IOUtils類的copyBytes方法實作,前面已經分析過拷貝過程
}
finally {
out.close(); // 拷貝完成,關閉輸出流out
}
}
再看一下,如果指定的是待拷貝的檔案源不是标準輸入流的情況,檔案系統FileSystem是如何實作拷貝操作的。實作的方法copyFromLocalFile如下所示:
/**
* 将本地的srcs,拷貝到目的檔案系統中的dst
* delSrc訓示了拷貝檔案完成之後,是否删除源檔案srcs
*/
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf); // 調用FileUtil工具類實作拷貝操作
}
關于FileUtil的copy方法,前面已經詳細分析過,不再累述。
像moveFromLocal、moveFromLocal、copyToLocal、moveToLocal、copyMergeToLocal等指令的實作都非常類似,也不做過多的解釋了。