天天看点

hadoop未修复bug6287的解决办法(ttprivte to 0700的bug、setPermission failed)

hadoop-0.20.2以上版本,若在windows下使用cygwin模拟,进行开发和测试。可能导致

setPermission失败,报异常导致tasktracker无法启动,在https://issues.apache.org/jira/browse/HADOOP-7682上有详细的描述,但查看hadoop的relese Note中还未对此作出修改(目前版本已经到了hadoop-1.0.2),因此采用自己修改源码中相关代码的方法,来修复此bug。

通过以上文章,或查看启动的报错信息(可通过命令hadoop tasttracker启动tasktracker直接输出到控制台看到报错信息,当然这是使用伪分布式的配置方式)可以了解到引起该异常的类为hadoop的common项目中org.apache.hadoop.fs.RawLocalFileSystem.java。

1、下载源码

可以到http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java下载源码

[code]

package org.apache.hadoop.fs;

import java.io.BufferedOutputStream;

import java.io.DataOutput;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.net.URI;

import java.nio.ByteBuffer;

import java.util.Arrays;

import java.util.StringTokenizer;

import org.apache.hadoop.classification.InterfaceAudience;

import org.apache.hadoop.classification.InterfaceStability;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.permission.FsPermission;

import org.apache.hadoop.io.nativeio.NativeIO;

import org.apache.hadoop.util.Progressable;

import org.apache.hadoop.util.Shell;

import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Public

@InterfaceStability.Stable

public class RawLocalFileSystem extends FileSystem {

static final URI NAME = URI.create("file:///");

private Path workingDir;

public RawLocalFileSystem() {

workingDir = getInitialWorkingDirectory();

}

private Path makeAbsolute(Path f) {

if (f.isAbsolute()) {

return f;

} else {

return new Path(workingDir, f);

}

}

public File pathToFile(Path path) {

checkPath(path);

if (!path.isAbsolute()) {

path = new Path(getWorkingDirectory(), path);

}

return new File(path.toUri().getPath());

}

public URI getUri() { return NAME; }

public void initialize(URI uri, Configuration conf) throws IOException {

super.initialize(uri, conf);

setConf(conf);

}

class TrackingFileInputStream extends FileInputStream {

public TrackingFileInputStream(File f) throws IOException {

super(f);

}

public int read() throws IOException {

int result = super.read();

if (result != -1) {

statistics.incrementBytesRead(1);

}

return result;

}

public int read(byte[] data) throws IOException {

int result = super.read(data);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

public int read(byte[] data, int offset, int length) throws IOException {

int result = super.read(data, offset, length);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

}

class LocalFSFileInputStream extends FSInputStream {

private FileInputStream fis;

private long position;

public LocalFSFileInputStream(Path f) throws IOException {

this.fis = new TrackingFileInputStream(pathToFile(f));

}

public void seek(long pos) throws IOException {

fis.getChannel().position(pos);

this.position = pos;

}

public long getPos() throws IOException {

return this.position;

}

public boolean seekToNewSource(long targetPos) throws IOException {

return false;

}

public int available() throws IOException { return fis.available(); }

public void close() throws IOException { fis.close(); }

@Override

public boolean markSupported() { return false; }

public int read() throws IOException {

try {

int value = fis.read();

if (value >= 0) {

this.position++;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(byte[] b, int off, int len) throws IOException {

try {

int value = fis.read(b, off, len);

if (value > 0) {

this.position += value;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(long position, byte[] b, int off, int len)

throws IOException {

ByteBuffer bb = ByteBuffer.wrap(b, off, len);

try {

return fis.getChannel().read(bb, position);

} catch (IOException e) {

throw new FSError(e);

}

}

public long skip(long n) throws IOException {

long value = fis.skip(n);

if (value > 0) {

this.position += value;

}

return value;

}

}

public FSDataInputStream open(Path f, int bufferSize) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException(f.toString());

}

return new FSDataInputStream(new BufferedFSInputStream(

new LocalFSFileInputStream(f), bufferSize));

}

class LocalFSFileOutputStream extends OutputStream {

private FileOutputStream fos;

private LocalFSFileOutputStream(Path f, boolean append) throws IOException {

this.fos = new FileOutputStream(pathToFile(f), append);

}

public void close() throws IOException { fos.close(); }

public void flush() throws IOException { fos.flush(); }

public void write(byte[] b, int off, int len) throws IOException {

try {

fos.write(b, off, len);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public void write(int b) throws IOException {

try {

fos.write(b);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

}

public FSDataOutputStream append(Path f, int bufferSize,

Progressable progress) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException("File " + f + " not found");

}

if (getFileStatus(f).isDirectory()) {

throw new IOException("Cannot append to a diretory (=" + f + " )");

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, true), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,

short replication, long blockSize, Progressable progress)

throws IOException {

return create(f, overwrite, true, bufferSize, replication, blockSize, progress);

}

private FSDataOutputStream create(Path f, boolean overwrite,

boolean createParent, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

if (exists(f) && !overwrite) {

throw new IOException("File already exists: "+f);

}

Path parent = f.getParent();

if (parent != null && !mkdirs(parent)) {

throw new IOException("Mkdirs failed to create " + parent.toString());

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, false), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, FsPermission permission,

boolean overwrite, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

@Override

public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,

boolean overwrite,

int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, false, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

public boolean rename(Path src, Path dst) throws IOException {

if (pathToFile(src).renameTo(pathToFile(dst))) {

return true;

}

return FileUtil.copy(this, src, this, dst, true, getConf());

}

public boolean delete(Path p, boolean recursive) throws IOException {

File f = pathToFile(p);

if (f.isFile()) {

return f.delete();

} else if (!recursive && f.isDirectory() &&

(FileUtil.listFiles(f).length != 0)) {

throw new IOException("Directory " + f.toString() + " is not empty");

}

return FileUtil.fullyDelete(f);

}

public FileStatus[] listStatus(Path f) throws IOException {

File localf = pathToFile(f);

FileStatus[] results;

if (!localf.exists()) {

throw new FileNotFoundException("File " + f + " does not exist");

}

if (localf.isFile()) {

return new FileStatus[] {

new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };

}

String[] names = localf.list();

if (names == null) {

return null;

}

results = new FileStatus[names.length];

int j = 0;

for (int i = 0; i < names.length; i++) {

try {

results[j] = getFileStatus(new Path(f, names[i]));

j++;

} catch (FileNotFoundException e) {

// ignore the files not found since the dir list may have have changed

// since the names[] list was generated.

}

}

if (j == names.length) {

return results;

}

return Arrays.copyOf(results, j);

}

public boolean mkdirs(Path f) throws IOException {

if(f == null) {

throw new IllegalArgumentException("mkdirs path arg is null");

}

Path parent = f.getParent();

File p2f = pathToFile(f);

if(parent != null) {

File parent2f = pathToFile(parent);

if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {

throw new FileAlreadyExistsException("Parent path is not a directory: "

+ parent);

}

}

return (parent == null || mkdirs(parent)) &&

(p2f.mkdir() || p2f.isDirectory());

}

@Override

public boolean mkdirs(Path f, FsPermission permission) throws IOException {

boolean b = mkdirs(f);

if(b) {

setPermission(f, permission);

}

return b;

}

@Override

protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)

throws IOException {

boolean b = mkdirs(f);

setPermission(f, absolutePermission);

return b;

}

@Override

public Path getHomeDirectory() {

return this.makeQualified(new Path(System.getProperty("user.home")));

}

@Override

public void setWorkingDirectory(Path newDir) {

workingDir = makeAbsolute(newDir);

checkPath(workingDir);

}

@Override

public Path getWorkingDirectory() {

return workingDir;

}

@Override

protected Path getInitialWorkingDirectory() {

return this.makeQualified(new Path(System.getProperty("user.dir")));

}

@Override

public FsStatus getStatus(Path p) throws IOException {

File partition = pathToFile(p == null ? new Path("/") : p);

//File provides getUsableSpace() and getFreeSpace()

//File provides no API to obtain used space, assume used = total - free

return new FsStatus(partition.getTotalSpace(),

partition.getTotalSpace() - partition.getFreeSpace(),

partition.getFreeSpace());

}

// In the case of the local filesystem, we can just rename the file.

public void moveFromLocalFile(Path src, Path dst) throws IOException {

rename(src, dst);

}

// We can write output directly to the final location

public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)

throws IOException {

return fsOutputFile;

}

// It's in the right place - nothing to do.

public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)

throws IOException {

}

public void close() throws IOException {

super.close();

}

public String toString() {

return "LocalFS";

}

public FileStatus getFileStatus(Path f) throws IOException {

File path = pathToFile(f);

if (path.exists()) {

return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);

} else {

throw new FileNotFoundException("File " + f + " does not exist");

}

}

static class RawLocalFileStatus extends FileStatus {

private boolean isPermissionLoaded() {

return !super.getOwner().equals("");

}

RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {

super(f.length(), f.isDirectory(), 1, defaultBlockSize,

f.lastModified(), fs.makeQualified(new Path(f.getPath())));

}

@Override

public FsPermission getPermission() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getPermission();

}

@Override

public String getOwner() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getOwner();

}

@Override

public String getGroup() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getGroup();

}

/// loads permissions, owner, and group from `ls -ld`

private void loadPermissionInfo() {

IOException e = null;

try {

StringTokenizer t = new StringTokenizer(

execCommand(new File(getPath().toUri()),

Shell.getGET_PERMISSION_COMMAND()));

//expected format

//-rw------- 1 username groupname ...

String permission = t.nextToken();

if (permission.length() > 10) { //files with ACLs might have a '+'

permission = permission.substring(0, 10);

}

setPermission(FsPermission.valueOf(permission));

t.nextToken();

setOwner(t.nextToken());

setGroup(t.nextToken());

} catch (Shell.ExitCodeException ioe) {

if (ioe.getExitCode() != 1) {

e = ioe;

} else {

setPermission(null);

setOwner(null);

setGroup(null);

}

} catch (IOException ioe) {

e = ioe;

} finally {

if (e != null) {

throw new RuntimeException("Error while running command to get " +

"file permissions : " +

StringUtils.stringifyException(e));

}

}

}

@Override

public void write(DataOutput out) throws IOException {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

super.write(out);

}

}

@Override

public void setOwner(Path p, String username, String groupname)

throws IOException {

if (username == null && groupname == null) {

throw new IOException("username == null && groupname == null");

}

if (username == null) {

execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);

} else {

//OWNER[:[GROUP]]

String s = username + (groupname == null? "": ":" + groupname);

execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);

}

}

@Override

public void setPermission(Path p, FsPermission permission)

throws IOException {

if (NativeIO.isAvailable()) {

NativeIO.chmod(pathToFile(p).getCanonicalPath(),

permission.toShort());

} else {

execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,

String.format("%05o", permission.toShort()));

}

}

private static String execCommand(File f, String... cmd) throws IOException {

String[] args = new String[cmd.length + 1];

System.arraycopy(cmd, 0, args, 0, cmd.length);

args[cmd.length] = FileUtil.makeShellPath(f, true);

String output = Shell.execCommand(args);

return output;

}

}

[/code]

2、修改源码

(1)在Eclipse任意建立一个测试项目,在项目中建立包org.apache.hadoop.fs

(2)将以前下载到的hadoop相关jar包导入到项目的类路径中,本例中使用的是hadoop-1.0.1,将解压后根路径中的hadoop-core-1.0.1.jar等、lib目录下所有的jar包都导入到项目的类路径(classpath)中

(3)在第一步不建立的包org.apache.hadoop.fs下建立文件RawLocalFileSystem.java文件,将源码复制到该文件中。

(4)可以看到有几处错误,分别修改即可:

a、The type RawLocalFileSystem must implement the inherited abstract method FileSystem.delete(Path),根据提示,重写方法即可,ctrl+1,你懂的。

b、getFileStatus(f).isDirectory(),提示返回的FileStatus类没有方法IsDirectory(),将这里的修改为new File(f.toString()).isDirectory(),注意需要导入java.io.File类,开发工具会提示你的。

c、primitiveMkdir(Path f, FsPermission absolutePermission)方法上提示The method primitiveMkdir(Path, FsPermission) of type RawLocalFileSystem must override or implement a supertype method,根据提示,去掉该方法前的@override注解,同理还有方法getInitialWorkingDirectory()也做同样处理

d、getStatus(Path p) throws IOException,该方法需要新增加类FsStatus,目前没有使用,因此直接将该方式删除

【e】、重要:以上都为去除编译时候错误,这一步才是核心

setPermission(Path p, FsPermission permission) throws IOException方法,我们在内部捕捉IOException,捕捉到后,不再向外抛出该异常。

[code]

@Override

public void setPermission(Path p, FsPermission permission)

throws IOException {

try{

if (NativeIO.isAvailable()) {

NativeIO.chmod(pathToFile(p).getCanonicalPath(),

permission.toShort());

} else {

execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,

String.format("%05o", permission.toShort()));

}

}catch (IOException e) {

e.printStackTrace();

}

}

[/code]

3、编译以及将编译后的文件加入hadoop的核心包中,替换原来的文件

(1)使用windows压缩包工具(一般为rar工具)打开hadoop-core-1.0.1.jar文件,定位到org\apache\hadoop\fs

(2)到项目的编译后的文件,一般在项目的bin目录中,将org\apache\hadoop\fs\文件夹中的所有文件拖动到第一步打开的压缩包文件中,在提示覆盖的时候选择【是】

ok,修改完成,可以正常使用hadoop(无论是单机、伪分布式)

附件中附上hadoop.apache.org上源码库上针对hadoop-1.0.1的RawLocalFileSystem修改、编译后的文件,以后若版本变化也可按照上述方法自行修改。

2012-04-06

附上修改之后的源码

[code]

package org.apache.hadoop.fs;

import java.io.BufferedOutputStream;

import java.io.DataOutput;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.net.URI;

import java.nio.ByteBuffer;

import java.util.Arrays;

import java.util.StringTokenizer;

import org.apache.hadoop.classification.InterfaceAudience;

import org.apache.hadoop.classification.InterfaceStability;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.permission.FsPermission;

import org.apache.hadoop.io.nativeio.NativeIO;

import org.apache.hadoop.util.Progressable;

import org.apache.hadoop.util.Shell;

import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Public

@InterfaceStability.Stable

public class RawLocalFileSystem extends FileSystem {

static final URI NAME = URI.create("file:///");

private Path workingDir;

public RawLocalFileSystem() {

workingDir = getInitialWorkingDirectory();

}

private Path makeAbsolute(Path f) {

if (f.isAbsolute()) {

return f;

} else {

return new Path(workingDir, f);

}

}

public File pathToFile(Path path) {

checkPath(path);

if (!path.isAbsolute()) {

path = new Path(getWorkingDirectory(), path);

}

return new File(path.toUri().getPath());

}

public URI getUri() { return NAME; }

public void initialize(URI uri, Configuration conf) throws IOException {

super.initialize(uri, conf);

setConf(conf);

}

class TrackingFileInputStream extends FileInputStream {

public TrackingFileInputStream(File f) throws IOException {

super(f);

}

public int read() throws IOException {

int result = super.read();

if (result != -1) {

statistics.incrementBytesRead(1);

}

return result;

}

public int read(byte[] data) throws IOException {

int result = super.read(data);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

public int read(byte[] data, int offset, int length) throws IOException {

int result = super.read(data, offset, length);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

}

class LocalFSFileInputStream extends FSInputStream {

private FileInputStream fis;

private long position;

public LocalFSFileInputStream(Path f) throws IOException {

this.fis = new TrackingFileInputStream(pathToFile(f));

}

public void seek(long pos) throws IOException {

fis.getChannel().position(pos);

this.position = pos;

}

public long getPos() throws IOException {

return this.position;

}

public boolean seekToNewSource(long targetPos) throws IOException {

return false;

}

public int available() throws IOException { return fis.available(); }

public void close() throws IOException { fis.close(); }

@Override

public boolean markSupported() { return false; }

public int read() throws IOException {

try {

int value = fis.read();

if (value >= 0) {

this.position++;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(byte[] b, int off, int len) throws IOException {

try {

int value = fis.read(b, off, len);

if (value > 0) {

this.position += value;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(long position, byte[] b, int off, int len)

throws IOException {

ByteBuffer bb = ByteBuffer.wrap(b, off, len);

try {

return fis.getChannel().read(bb, position);

} catch (IOException e) {

throw new FSError(e);

}

}

public long skip(long n) throws IOException {

long value = fis.skip(n);

if (value > 0) {

this.position += value;

}

return value;

}

}

public FSDataInputStream open(Path f, int bufferSize) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException(f.toString());

}

return new FSDataInputStream(new BufferedFSInputStream(

new LocalFSFileInputStream(f), bufferSize));

}

class LocalFSFileOutputStream extends OutputStream {

private FileOutputStream fos;

private LocalFSFileOutputStream(Path f, boolean append) throws IOException {

this.fos = new FileOutputStream(pathToFile(f), append);

}

public void close() throws IOException { fos.close(); }

public void flush() throws IOException { fos.flush(); }

public void write(byte[] b, int off, int len) throws IOException {

try {

fos.write(b, off, len);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public void write(int b) throws IOException {

try {

fos.write(b);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

}

public FSDataOutputStream append(Path f, int bufferSize,

Progressable progress) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException("File " + f + " not found");

}

if (new File(f.toString()).isDirectory()) {

throw new IOException("Cannot append to a diretory (=" + f + " )");

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, true), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,

short replication, long blockSize, Progressable progress)

throws IOException {

return create(f, overwrite, true, bufferSize, replication, blockSize, progress);

}

private FSDataOutputStream create(Path f, boolean overwrite,

boolean createParent, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

if (exists(f) && !overwrite) {

throw new IOException("File already exists: "+f);

}

Path parent = f.getParent();

if (parent != null && !mkdirs(parent)) {

throw new IOException("Mkdirs failed to create " + parent.toString());

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, false), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, FsPermission permission,

boolean overwrite, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

@Override

public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,

boolean overwrite,

int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, false, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

public boolean rename(Path src, Path dst) throws IOException {

if (pathToFile(src).renameTo(pathToFile(dst))) {

return true;

}

return FileUtil.copy(this, src, this, dst, true, getConf());

}

public boolean delete(Path p, boolean recursive) throws IOException {

File f = pathToFile(p);

if (f.isFile()) {

return f.delete();

} else if (!recursive && f.isDirectory() &&

(FileUtil.listFiles(f).length != 0)) {

throw new IOException("Directory " + f.toString() + " is not empty");

}

return FileUtil.fullyDelete(f);

}

public FileStatus[] listStatus(Path f) throws IOException {

File localf = pathToFile(f);

FileStatus[] results;

if (!localf.exists()) {

throw new FileNotFoundException("File " + f + " does not exist");

}

if (localf.isFile()) {

return new FileStatus[] {

new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };

}

String[] names = localf.list();

if (names == null) {

return null;

}

results = new FileStatus[names.length];

int j = 0;

for (int i = 0; i < names.length; i++) {

try {

results[j] = getFileStatus(new Path(f, names[i]));

j++;

} catch (FileNotFoundException e) {

// ignore the files not found since the dir list may have have changed

// since the names[] list was generated.

}

}

if (j == names.length) {

return results;

}

return Arrays.copyOf(results, j);

}

public boolean mkdirs(Path f) throws IOException {

if(f == null) {

throw new IllegalArgumentException("mkdirs path arg is null");

}

Path parent = f.getParent();

File p2f = pathToFile(f);

if(parent != null) {

File parent2f = pathToFile(parent);

if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {

throw new FileAlreadyExistsException("Parent path is not a directory: "

+ parent);

}

}

return (parent == null || mkdirs(parent)) &&

(p2f.mkdir() || p2f.isDirectory());

}

@Override

public boolean mkdirs(Path f, FsPermission permission) throws IOException {

boolean b = mkdirs(f);

if(b) {

setPermission(f, permission);

}

return b;

}

protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)

throws IOException {

boolean b = mkdirs(f);

setPermission(f, absolutePermission);

return b;

}

@Override

public Path getHomeDirectory() {

return this.makeQualified(new Path(System.getProperty("user.home")));

}

@Override

public void setWorkingDirectory(Path newDir) {

workingDir = makeAbsolute(newDir);

checkPath(workingDir);

}

@Override

public Path getWorkingDirectory() {

return workingDir;

}

protected Path getInitialWorkingDirectory() {

return this.makeQualified(new Path(System.getProperty("user.dir")));

}

// In the case of the local filesystem, we can just rename the file.

public void moveFromLocalFile(Path src, Path dst) throws IOException {

rename(src, dst);

}

// We can write output directly to the final location

public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)

throws IOException {

return fsOutputFile;

}

// It's in the right place - nothing to do.

public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)

throws IOException {

}

public void close() throws IOException {

super.close();

}

public String toString() {

return "LocalFS";

}

public FileStatus getFileStatus(Path f) throws IOException {

File path = pathToFile(f);

if (path.exists()) {

return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);

} else {

throw new FileNotFoundException("File " + f + " does not exist");

}

}

static class RawLocalFileStatus extends FileStatus {

private boolean isPermissionLoaded() {

return !super.getOwner().equals("");

}

RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {

super(f.length(), f.isDirectory(), 1, defaultBlockSize,

f.lastModified(), fs.makeQualified(new Path(f.getPath())));

}

@Override

public FsPermission getPermission() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getPermission();

}

@Override

public String getOwner() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getOwner();

}

@Override

public String getGroup() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getGroup();

}

/// loads permissions, owner, and group from `ls -ld`

private void loadPermissionInfo() {

IOException e = null;

try {

StringTokenizer t = new StringTokenizer(

execCommand(new File(getPath().toUri()),

Shell.getGET_PERMISSION_COMMAND()));

//expected format

//-rw------- 1 username groupname ...

String permission = t.nextToken();

if (permission.length() > 10) { //files with ACLs might have a '+'

permission = permission.substring(0, 10);

}

setPermission(FsPermission.valueOf(permission));

t.nextToken();

setOwner(t.nextToken());

setGroup(t.nextToken());

} catch (Shell.ExitCodeException ioe) {

if (ioe.getExitCode() != 1) {

e = ioe;

} else {

setPermission(null);

setOwner(null);

setGroup(null);

}

} catch (IOException ioe) {

e = ioe;

} finally {

if (e != null) {

throw new RuntimeException("Error while running command to get " +

"file permissions : " +

StringUtils.stringifyException(e));

}

}

}

@Override

public void write(DataOutput out) throws IOException {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

super.write(out);

}

}

@Override

public void setOwner(Path p, String username, String groupname)

throws IOException {

if (username == null && groupname == null) {

throw new IOException("username == null && groupname == null");

}

if (username == null) {

execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);

} else {

//OWNER[:[GROUP]]

String s = username + (groupname == null? "": ":" + groupname);

execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);

}

}

@Override

public void setPermission(Path p, FsPermission permission)

throws IOException {

try{

if (NativeIO.isAvailable()) {

NativeIO.chmod(pathToFile(p).getCanonicalPath(),

permission.toShort());

} else {

execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,

String.format("%05o", permission.toShort()));

}

}catch (IOException e) {

e.printStackTrace();

}

}

private static String execCommand(File f, String... cmd) throws IOException {

String[] args = new String[cmd.length + 1];

System.arraycopy(cmd, 0, args, 0, cmd.length);

args[cmd.length] = FileUtil.makeShellPath(f, true);

String output = Shell.execCommand(args);

return output;

}

@Override

public boolean delete(Path arg0) throws IOException {

// TODO Auto-generated method stub

return false;

}

}

[/code]

继续阅读