天天看点

Hadoop-0.20.0源代码分析(08)

这里,继续对FsShell类中一些命令进行阅读分析,主要是看与拷贝文件有关的几个命令。

  • cp命令

该命令实现对文件的拷贝操作,并且支持在不同的文件系统之间进行文件的拷贝。拷贝文件涉及的操作比较复杂,核心拷贝操作还是调用了org.apache.hadoop.fs.FileUtil类的copy方法实现的。 先看该类中定义的其中一个copy方法的实现:

private int copy(String argv[], Configuration conf) throws IOException {

int i = 0;

int exitCode = 0;

String cmd = argv[i++];

String dest = argv[argv.length-1]; // 命令行中最后一个参数

// 若指定了多个输入源文件,则最后一个参数必须是一个目录

if (argv.length > 3) {

Path dst = new Path(dest);

if (!fs.isDirectory(dst)) { // 最后一个参数必须是目录

throw new IOException("When copying multiple files, " + "destination " + dest + " should be a directory.");

}

}

// 循环对每一个文件进行拷贝操作

for (; i < argv.length - 1; i++) {

try {

copy(argv[i], dest, conf); // 将文件argv[i]拷贝到dest目录中

} catch (RemoteException e) {

// 捕获命令执行发生的异常信息

exitCode = -1;

try {

String[] content;

content = e.getLocalizedMessage().split("/n");

System.err.println(cmd.substring(1) + ": " +

content[0]);

} catch (Exception ex) {

System.err.println(cmd.substring(1) + ": " +

ex.getLocalizedMessage());

}

} catch (IOException e) {

// 捕获IO异常信息

exitCode = -1;

System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());

}

}

return exitCode;

}

该命令的实现与mv命令类似,这里调用了一个重载的copy命令,实现对一个文件执行拷贝操作。该重载的拷贝方法如下所示:

void copy(String srcf, String dstf, Configuration conf) throws IOException {

Path srcPath = new Path(srcf); // 构造Path

FileSystem srcFs = srcPath.getFileSystem(getConf()); // 获取到srcPath所在的文件系统srcFs

Path dstPath = new Path(dstf);

FileSystem dstFs = dstPath.getFileSystem(getConf()); // 获取到dstPath所在的文件系统dstFs

Path [] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPath), srcPath); // 获取到srcFs中满足srcPath模式的FileStatus[]并转换成为Path[]

if (srcs.length > 1 && !dstFs.isDirectory(dstPath)) {

throw new IOException("When copying multiple files, " + "destination should be a directory.");

}

for(int i=0; i<srcs.length; i++) { // 循环拷贝操作

FileUtil.copy(srcFs, srcs[i], dstFs, dstPath, false, conf); // 调用FileUtil类的拷贝方法copy完成文件在srcFs与dstFs文件系统之间拷贝文件的操作

}

}

现在,我们开始追踪 org.apache.hadoop.fs.FileUtil类的copy方法,看一看拷贝到底是如何实现的。FileUtil类中定义了多个重载的拷贝方法copy,我们只从FsShell类中调用的copy方法开始追踪其实现。上面调用的FileUtil类的copy实现如下所示:

public static boolean copy(FileSystem srcFS, Path src,

FileSystem dstFS, Path dst,

boolean deleteSource,

Configuration conf) throws IOException {

return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); // 调用了一个重载的copy方法实现文件在srcFS与dstFS之间进行复制

}

看重载的copy方法的实现,如下所示:

public static boolean copy(FileSystem srcFS, Path src,

FileSystem dstFS, Path dst,

boolean deleteSource,

boolean overwrite,

Configuration conf) throws IOException {

dst = checkDest(src.getName(), dstFS, dst, overwrite); // 检查目的文件系统dstFS中dst目录是否合法(参照src)

if (srcFS.getFileStatus(src).isDir()) { // 若源文件系统srcFS中src是目录

checkDependencies(srcFS, src, dstFS, dst); // 检查文件依赖性:如果srcFS=dstFS,并且dst不是src的子目录,检查通过;如果srcFS与dstFS不是同一个文件系统,依赖性检查通过

if (!dstFS.mkdirs(dst)) { // 在dstFS中创建dst目录,准备向其中拷贝数据

return false;

}

FileStatus contents[] = srcFS.listStatus(src); // 获取srcFS中src目录下的文件列表

for (int i = 0; i < contents.length; i++) { // 分治思想:分治后执行递归拷贝文件操作

copy(srcFS, contents[i].getPath(), dstFS,

new Path(dst, contents[i].getPath().getName()),

deleteSource, overwrite, conf); // 递归调用

}

} else if (srcFS.isFile(src)) { // 递归出口(如果src是一个普通文件)

InputStream in=null;

OutputStream out = null;

try {

in = srcFS.open(src); // 打开srcFS中的src文件,并返回输入流对象

out = dstFS.create(dst, overwrite); // 在目的文件系统dstFS中创建dst文件,并返回输出流,等待写入

IOUtils.copyBytes(in, out, conf, true); // 调用:通过调用IOUtils类的copyBytes方法实现流式拷贝

} catch (IOException e) {

IOUtils.closeStream(out); // 关闭out

IOUtils.closeStream(in); // 关闭in

throw e;

}

} else {

throw new IOException(src.toString() + ": No such file or directory");

}

if (deleteSource) { // 如果设置了拷贝完成后删除源文件选项

return srcFS.delete(src, true); // 删除源文件系统srcFS的源文件src

} else {

return true;

}

}

IOUtils类中实现了Hadoop文件系统中文件的 流式拷贝操作,我们追踪该工具类的copyBytes方法,分析实现的过程。该方法如下所示:

/**

* 从一个流拷贝到另一个流中

*/

public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)

throws IOException {

copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); // 调用:重载的copyBytes方法实现流式拷贝

}

我们看重载流式拷贝实现方法copyBytes,如下所示:

public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {

PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; // 使用PrintStream为out流增加便捷功能

byte buf[] = new byte[buffSize]; // 字节缓冲区

try {

int bytesRead = in.read(buf); // 从输入流in读取bytesRead个字节到buf缓冲区中

while (bytesRead >= 0) { // 确实读取到了字节

out.write(buf, 0, bytesRead); // 将从in中读取到的字节,通过buf缓冲区写入到输出流out中

if ((ps != null) && ps.checkError()) { // 如果ps=(PrintStream)out,测试内部标志,并自动刷新

throw new IOException("Unable to write to output stream.");

}

bytesRead = in.read(buf); // 继续从in读取字节

}

} finally {

if(close) {

out.close(); // 关闭out

in.close(); // 关闭in

}

}

}

上面在从InputStream in拷贝到OutputStream out中的过程中,使用了更加高效的PrintStream流类,它能够为OutputStream增加方便打印各种数据值的表示形式,而且,它不会抛出IO异常,而是将流式拷贝过程中发生的异常,设置为通过调用checkError方法来检测内部的标志。另外,它还可以实现自动刷新,在向输出流中写入字节(通过字节缓冲区)之后,自动刷新。

cp命令的具体实现都在上面进行分析了,应该能够理解在Hadoop中如何在不同文件系统之间执行流式拷贝文件的过程。

  • copyFromLocal命令

该命令实现了从本地文件系统拷贝文件的操作。实现方法为,如下所示:

/**

* 从本地文件系统(srcs在本地文件系统中)拷贝srcs到目的文件系统(对应Path为dstf)

*/

void copyFromLocal(Path[] srcs, String dstf) throws IOException {

Path dstPath = new Path(dstf);

FileSystem dstFs = dstPath.getFileSystem(getConf()); // 获取到目的文件系统dstFs

if (srcs.length == 1 && srcs[0].toString().equals("-")) // 如果只指定了一个参数“-”

copyFromStdin(dstPath, dstFs); // 调用:从标准输入流中进行流式拷贝操作

else // 否则

dstFs.copyFromLocalFile(false, false, srcs, dstPath); // 调用目的文件系统dstFs的copyFromLocalFile方法执行拷贝操作

}  

我们关注一下copyFromStdin方法拷贝的实现,如下所示:

private void copyFromStdin(Path dst, FileSystem dstFs) throws IOException {

if (dstFs.isDirectory(dst)) { // 如果目的文件是目录,不支持源为标准输入流的情况

throw new IOException("When source is stdin, destination must be a file.");

}

if (dstFs.exists(dst)) { // 如果目的文件系统dstFs中存在文件dst,出错

throw new IOException("Target " + dst.toString() + " already exists.");

}

FSDataOutputStream out = dstFs.create(dst); // 满足拷贝要求,执行流式拷贝操作

try {

IOUtils.copyBytes(System.in, out, getConf(), false); // 调用IOUtils类的copyBytes方法实现,前面已经分析过拷贝过程

}

finally {

out.close(); // 拷贝完成,关闭输出流out

}

}

再看一下,如果指定的是待拷贝的文件源不是标准输入流的情况,文件系统FileSystem是如何实现拷贝操作的。实现的方法copyFromLocalFile如下所示:

/**

* 将本地的srcs,拷贝到目的文件系统中的dst

* delSrc指示了拷贝文件完成之后,是否删除源文件srcs

*/

public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)

throws IOException {

Configuration conf = getConf();

FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf); // 调用FileUtil工具类实现拷贝操作

}

关于FileUtil的copy方法,前面已经详细分析过,不再累述。

像moveFromLocal、moveFromLocal、copyToLocal、moveToLocal、copyMergeToLocal等命令的实现都非常类似,也不做过多的解释了。