天天看點

hadoop未修複bug6287的解決辦法(ttprivte to 0700的bug、setPermission failed)

hadoop-0.20.2以上版本,若在windows下使用cygwin模拟,進行開發和測試。可能導緻

setPermission失敗,報異常導緻tasktracker無法啟動,在https://issues.apache.org/jira/browse/HADOOP-7682上有詳細的描述,但檢視hadoop的relese Note中還未對此作出修改(目前版本已經到了hadoop-1.0.2),是以采用自己修改源碼中相關代碼的方法,來修複此bug。

通過以上文章,或檢視啟動的報錯資訊(可通過指令hadoop tasttracker啟動tasktracker直接輸出到控制台看到報錯資訊,當然這是使用僞分布式的配置方式)可以了解到引起該異常的類為hadoop的common項目中org.apache.hadoop.fs.RawLocalFileSystem.java。

1、下載下傳源碼

可以到http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java下載下傳源碼

[code]

package org.apache.hadoop.fs;

import java.io.BufferedOutputStream;

import java.io.DataOutput;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.net.URI;

import java.nio.ByteBuffer;

import java.util.Arrays;

import java.util.StringTokenizer;

import org.apache.hadoop.classification.InterfaceAudience;

import org.apache.hadoop.classification.InterfaceStability;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.permission.FsPermission;

import org.apache.hadoop.io.nativeio.NativeIO;

import org.apache.hadoop.util.Progressable;

import org.apache.hadoop.util.Shell;

import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Public

@InterfaceStability.Stable

public class RawLocalFileSystem extends FileSystem {

static final URI NAME = URI.create("file:///");

private Path workingDir;

public RawLocalFileSystem() {

workingDir = getInitialWorkingDirectory();

}

private Path makeAbsolute(Path f) {

if (f.isAbsolute()) {

return f;

} else {

return new Path(workingDir, f);

}

}

public File pathToFile(Path path) {

checkPath(path);

if (!path.isAbsolute()) {

path = new Path(getWorkingDirectory(), path);

}

return new File(path.toUri().getPath());

}

public URI getUri() { return NAME; }

public void initialize(URI uri, Configuration conf) throws IOException {

super.initialize(uri, conf);

setConf(conf);

}

class TrackingFileInputStream extends FileInputStream {

public TrackingFileInputStream(File f) throws IOException {

super(f);

}

public int read() throws IOException {

int result = super.read();

if (result != -1) {

statistics.incrementBytesRead(1);

}

return result;

}

public int read(byte[] data) throws IOException {

int result = super.read(data);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

public int read(byte[] data, int offset, int length) throws IOException {

int result = super.read(data, offset, length);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

}

class LocalFSFileInputStream extends FSInputStream {

private FileInputStream fis;

private long position;

public LocalFSFileInputStream(Path f) throws IOException {

this.fis = new TrackingFileInputStream(pathToFile(f));

}

public void seek(long pos) throws IOException {

fis.getChannel().position(pos);

this.position = pos;

}

public long getPos() throws IOException {

return this.position;

}

public boolean seekToNewSource(long targetPos) throws IOException {

return false;

}

public int available() throws IOException { return fis.available(); }

public void close() throws IOException { fis.close(); }

@Override

public boolean markSupported() { return false; }

public int read() throws IOException {

try {

int value = fis.read();

if (value >= 0) {

this.position++;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(byte[] b, int off, int len) throws IOException {

try {

int value = fis.read(b, off, len);

if (value > 0) {

this.position += value;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(long position, byte[] b, int off, int len)

throws IOException {

ByteBuffer bb = ByteBuffer.wrap(b, off, len);

try {

return fis.getChannel().read(bb, position);

} catch (IOException e) {

throw new FSError(e);

}

}

public long skip(long n) throws IOException {

long value = fis.skip(n);

if (value > 0) {

this.position += value;

}

return value;

}

}

public FSDataInputStream open(Path f, int bufferSize) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException(f.toString());

}

return new FSDataInputStream(new BufferedFSInputStream(

new LocalFSFileInputStream(f), bufferSize));

}

class LocalFSFileOutputStream extends OutputStream {

private FileOutputStream fos;

private LocalFSFileOutputStream(Path f, boolean append) throws IOException {

this.fos = new FileOutputStream(pathToFile(f), append);

}

public void close() throws IOException { fos.close(); }

public void flush() throws IOException { fos.flush(); }

public void write(byte[] b, int off, int len) throws IOException {

try {

fos.write(b, off, len);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public void write(int b) throws IOException {

try {

fos.write(b);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

}

public FSDataOutputStream append(Path f, int bufferSize,

Progressable progress) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException("File " + f + " not found");

}

if (getFileStatus(f).isDirectory()) {

throw new IOException("Cannot append to a diretory (=" + f + " )");

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, true), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,

short replication, long blockSize, Progressable progress)

throws IOException {

return create(f, overwrite, true, bufferSize, replication, blockSize, progress);

}

private FSDataOutputStream create(Path f, boolean overwrite,

boolean createParent, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

if (exists(f) && !overwrite) {

throw new IOException("File already exists: "+f);

}

Path parent = f.getParent();

if (parent != null && !mkdirs(parent)) {

throw new IOException("Mkdirs failed to create " + parent.toString());

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, false), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, FsPermission permission,

boolean overwrite, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

@Override

public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,

boolean overwrite,

int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, false, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

public boolean rename(Path src, Path dst) throws IOException {

if (pathToFile(src).renameTo(pathToFile(dst))) {

return true;

}

return FileUtil.copy(this, src, this, dst, true, getConf());

}

public boolean delete(Path p, boolean recursive) throws IOException {

File f = pathToFile(p);

if (f.isFile()) {

return f.delete();

} else if (!recursive && f.isDirectory() &&

(FileUtil.listFiles(f).length != 0)) {

throw new IOException("Directory " + f.toString() + " is not empty");

}

return FileUtil.fullyDelete(f);

}

public FileStatus[] listStatus(Path f) throws IOException {

File localf = pathToFile(f);

FileStatus[] results;

if (!localf.exists()) {

throw new FileNotFoundException("File " + f + " does not exist");

}

if (localf.isFile()) {

return new FileStatus[] {

new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };

}

String[] names = localf.list();

if (names == null) {

return null;

}

results = new FileStatus[names.length];

int j = 0;

for (int i = 0; i < names.length; i++) {

try {

results[j] = getFileStatus(new Path(f, names[i]));

j++;

} catch (FileNotFoundException e) {

// ignore the files not found since the dir list may have have changed

// since the names[] list was generated.

}

}

if (j == names.length) {

return results;

}

return Arrays.copyOf(results, j);

}

public boolean mkdirs(Path f) throws IOException {

if(f == null) {

throw new IllegalArgumentException("mkdirs path arg is null");

}

Path parent = f.getParent();

File p2f = pathToFile(f);

if(parent != null) {

File parent2f = pathToFile(parent);

if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {

throw new FileAlreadyExistsException("Parent path is not a directory: "

+ parent);

}

}

return (parent == null || mkdirs(parent)) &&

(p2f.mkdir() || p2f.isDirectory());

}

@Override

public boolean mkdirs(Path f, FsPermission permission) throws IOException {

boolean b = mkdirs(f);

if(b) {

setPermission(f, permission);

}

return b;

}

@Override

protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)

throws IOException {

boolean b = mkdirs(f);

setPermission(f, absolutePermission);

return b;

}

@Override

public Path getHomeDirectory() {

return this.makeQualified(new Path(System.getProperty("user.home")));

}

@Override

public void setWorkingDirectory(Path newDir) {

workingDir = makeAbsolute(newDir);

checkPath(workingDir);

}

@Override

public Path getWorkingDirectory() {

return workingDir;

}

@Override

protected Path getInitialWorkingDirectory() {

return this.makeQualified(new Path(System.getProperty("user.dir")));

}

@Override

public FsStatus getStatus(Path p) throws IOException {

File partition = pathToFile(p == null ? new Path("/") : p);

//File provides getUsableSpace() and getFreeSpace()

//File provides no API to obtain used space, assume used = total - free

return new FsStatus(partition.getTotalSpace(),

partition.getTotalSpace() - partition.getFreeSpace(),

partition.getFreeSpace());

}

// In the case of the local filesystem, we can just rename the file.

public void moveFromLocalFile(Path src, Path dst) throws IOException {

rename(src, dst);

}

// We can write output directly to the final location

public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)

throws IOException {

return fsOutputFile;

}

// It's in the right place - nothing to do.

public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)

throws IOException {

}

public void close() throws IOException {

super.close();

}

public String toString() {

return "LocalFS";

}

public FileStatus getFileStatus(Path f) throws IOException {

File path = pathToFile(f);

if (path.exists()) {

return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);

} else {

throw new FileNotFoundException("File " + f + " does not exist");

}

}

static class RawLocalFileStatus extends FileStatus {

private boolean isPermissionLoaded() {

return !super.getOwner().equals("");

}

RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {

super(f.length(), f.isDirectory(), 1, defaultBlockSize,

f.lastModified(), fs.makeQualified(new Path(f.getPath())));

}

@Override

public FsPermission getPermission() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getPermission();

}

@Override

public String getOwner() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getOwner();

}

@Override

public String getGroup() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getGroup();

}

/// loads permissions, owner, and group from `ls -ld`

private void loadPermissionInfo() {

IOException e = null;

try {

StringTokenizer t = new StringTokenizer(

execCommand(new File(getPath().toUri()),

Shell.getGET_PERMISSION_COMMAND()));

//expected format

//-rw------- 1 username groupname ...

String permission = t.nextToken();

if (permission.length() > 10) { //files with ACLs might have a '+'

permission = permission.substring(0, 10);

}

setPermission(FsPermission.valueOf(permission));

t.nextToken();

setOwner(t.nextToken());

setGroup(t.nextToken());

} catch (Shell.ExitCodeException ioe) {

if (ioe.getExitCode() != 1) {

e = ioe;

} else {

setPermission(null);

setOwner(null);

setGroup(null);

}

} catch (IOException ioe) {

e = ioe;

} finally {

if (e != null) {

throw new RuntimeException("Error while running command to get " +

"file permissions : " +

StringUtils.stringifyException(e));

}

}

}

@Override

public void write(DataOutput out) throws IOException {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

super.write(out);

}

}

@Override

public void setOwner(Path p, String username, String groupname)

throws IOException {

if (username == null && groupname == null) {

throw new IOException("username == null && groupname == null");

}

if (username == null) {

execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);

} else {

//OWNER[:[GROUP]]

String s = username + (groupname == null? "": ":" + groupname);

execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);

}

}

@Override

public void setPermission(Path p, FsPermission permission)

throws IOException {

if (NativeIO.isAvailable()) {

NativeIO.chmod(pathToFile(p).getCanonicalPath(),

permission.toShort());

} else {

execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,

String.format("%05o", permission.toShort()));

}

}

private static String execCommand(File f, String... cmd) throws IOException {

String[] args = new String[cmd.length + 1];

System.arraycopy(cmd, 0, args, 0, cmd.length);

args[cmd.length] = FileUtil.makeShellPath(f, true);

String output = Shell.execCommand(args);

return output;

}

}

[/code]

2、修改源碼

(1)在Eclipse任意建立一個測試項目,在項目中建立包org.apache.hadoop.fs

(2)将以前下載下傳到的hadoop相關jar包導入到項目的類路徑中,本例中使用的是hadoop-1.0.1,将解壓後根路徑中的hadoop-core-1.0.1.jar等、lib目錄下所有的jar包都導入到項目的類路徑(classpath)中

(3)在第一步不建立的包org.apache.hadoop.fs下建立檔案RawLocalFileSystem.java檔案,将源碼複制到該檔案中。

(4)可以看到有幾處錯誤,分别修改即可:

a、The type RawLocalFileSystem must implement the inherited abstract method FileSystem.delete(Path),根據提示,重寫方法即可,ctrl+1,你懂的。

b、getFileStatus(f).isDirectory(),提示傳回的FileStatus類沒有方法IsDirectory(),将這裡的修改為new File(f.toString()).isDirectory(),注意需要導入java.io.File類,開發工具會提示你的。

c、primitiveMkdir(Path f, FsPermission absolutePermission)方法上提示The method primitiveMkdir(Path, FsPermission) of type RawLocalFileSystem must override or implement a supertype method,根據提示,去掉該方法前的@override注解,同理還有方法getInitialWorkingDirectory()也做同樣處理

d、getStatus(Path p) throws IOException,該方法需要新增加類FsStatus,目前沒有使用,是以直接将該方式删除

【e】、重要:以上都為去除編譯時候錯誤,這一步才是核心

setPermission(Path p, FsPermission permission) throws IOException方法,我們在内部捕捉IOException,捕捉到後,不再向外抛出該異常。

[code]

@Override

public void setPermission(Path p, FsPermission permission)

throws IOException {

try{

if (NativeIO.isAvailable()) {

NativeIO.chmod(pathToFile(p).getCanonicalPath(),

permission.toShort());

} else {

execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,

String.format("%05o", permission.toShort()));

}

}catch (IOException e) {

e.printStackTrace();

}

}

[/code]

3、編譯以及将編譯後的檔案加入hadoop的核心包中,替換原來的檔案

(1)使用windows壓縮包工具(一般為rar工具)打開hadoop-core-1.0.1.jar檔案,定位到org\apache\hadoop\fs

(2)到項目的編譯後的檔案,一般在項目的bin目錄中,将org\apache\hadoop\fs\檔案夾中的所有檔案拖動到第一步打開的壓縮封包件中,在提示覆寫的時候選擇【是】

ok,修改完成,可以正常使用hadoop(無論是單機、僞分布式)

附件中附上hadoop.apache.org上源碼庫上針對hadoop-1.0.1的RawLocalFileSystem修改、編譯後的檔案,以後若版本變化也可按照上述方法自行修改。

2012-04-06

附上修改之後的源碼

[code]

package org.apache.hadoop.fs;

import java.io.BufferedOutputStream;

import java.io.DataOutput;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.net.URI;

import java.nio.ByteBuffer;

import java.util.Arrays;

import java.util.StringTokenizer;

import org.apache.hadoop.classification.InterfaceAudience;

import org.apache.hadoop.classification.InterfaceStability;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.permission.FsPermission;

import org.apache.hadoop.io.nativeio.NativeIO;

import org.apache.hadoop.util.Progressable;

import org.apache.hadoop.util.Shell;

import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Public

@InterfaceStability.Stable

public class RawLocalFileSystem extends FileSystem {

static final URI NAME = URI.create("file:///");

private Path workingDir;

public RawLocalFileSystem() {

workingDir = getInitialWorkingDirectory();

}

private Path makeAbsolute(Path f) {

if (f.isAbsolute()) {

return f;

} else {

return new Path(workingDir, f);

}

}

public File pathToFile(Path path) {

checkPath(path);

if (!path.isAbsolute()) {

path = new Path(getWorkingDirectory(), path);

}

return new File(path.toUri().getPath());

}

public URI getUri() { return NAME; }

public void initialize(URI uri, Configuration conf) throws IOException {

super.initialize(uri, conf);

setConf(conf);

}

class TrackingFileInputStream extends FileInputStream {

public TrackingFileInputStream(File f) throws IOException {

super(f);

}

public int read() throws IOException {

int result = super.read();

if (result != -1) {

statistics.incrementBytesRead(1);

}

return result;

}

public int read(byte[] data) throws IOException {

int result = super.read(data);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

public int read(byte[] data, int offset, int length) throws IOException {

int result = super.read(data, offset, length);

if (result != -1) {

statistics.incrementBytesRead(result);

}

return result;

}

}

class LocalFSFileInputStream extends FSInputStream {

private FileInputStream fis;

private long position;

public LocalFSFileInputStream(Path f) throws IOException {

this.fis = new TrackingFileInputStream(pathToFile(f));

}

public void seek(long pos) throws IOException {

fis.getChannel().position(pos);

this.position = pos;

}

public long getPos() throws IOException {

return this.position;

}

public boolean seekToNewSource(long targetPos) throws IOException {

return false;

}

public int available() throws IOException { return fis.available(); }

public void close() throws IOException { fis.close(); }

@Override

public boolean markSupported() { return false; }

public int read() throws IOException {

try {

int value = fis.read();

if (value >= 0) {

this.position++;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(byte[] b, int off, int len) throws IOException {

try {

int value = fis.read(b, off, len);

if (value > 0) {

this.position += value;

}

return value;

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public int read(long position, byte[] b, int off, int len)

throws IOException {

ByteBuffer bb = ByteBuffer.wrap(b, off, len);

try {

return fis.getChannel().read(bb, position);

} catch (IOException e) {

throw new FSError(e);

}

}

public long skip(long n) throws IOException {

long value = fis.skip(n);

if (value > 0) {

this.position += value;

}

return value;

}

}

public FSDataInputStream open(Path f, int bufferSize) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException(f.toString());

}

return new FSDataInputStream(new BufferedFSInputStream(

new LocalFSFileInputStream(f), bufferSize));

}

class LocalFSFileOutputStream extends OutputStream {

private FileOutputStream fos;

private LocalFSFileOutputStream(Path f, boolean append) throws IOException {

this.fos = new FileOutputStream(pathToFile(f), append);

}

public void close() throws IOException { fos.close(); }

public void flush() throws IOException { fos.flush(); }

public void write(byte[] b, int off, int len) throws IOException {

try {

fos.write(b, off, len);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

public void write(int b) throws IOException {

try {

fos.write(b);

} catch (IOException e) { // unexpected exception

throw new FSError(e); // assume native fs error

}

}

}

public FSDataOutputStream append(Path f, int bufferSize,

Progressable progress) throws IOException {

if (!exists(f)) {

throw new FileNotFoundException("File " + f + " not found");

}

if (new File(f.toString()).isDirectory()) {

throw new IOException("Cannot append to a diretory (=" + f + " )");

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, true), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,

short replication, long blockSize, Progressable progress)

throws IOException {

return create(f, overwrite, true, bufferSize, replication, blockSize, progress);

}

private FSDataOutputStream create(Path f, boolean overwrite,

boolean createParent, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

if (exists(f) && !overwrite) {

throw new IOException("File already exists: "+f);

}

Path parent = f.getParent();

if (parent != null && !mkdirs(parent)) {

throw new IOException("Mkdirs failed to create " + parent.toString());

}

return new FSDataOutputStream(new BufferedOutputStream(

new LocalFSFileOutputStream(f, false), bufferSize), statistics);

}

@Override

public FSDataOutputStream create(Path f, FsPermission permission,

boolean overwrite, int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

@Override

public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,

boolean overwrite,

int bufferSize, short replication, long blockSize,

Progressable progress) throws IOException {

FSDataOutputStream out = create(f,

overwrite, false, bufferSize, replication, blockSize, progress);

setPermission(f, permission);

return out;

}

public boolean rename(Path src, Path dst) throws IOException {

if (pathToFile(src).renameTo(pathToFile(dst))) {

return true;

}

return FileUtil.copy(this, src, this, dst, true, getConf());

}

public boolean delete(Path p, boolean recursive) throws IOException {

File f = pathToFile(p);

if (f.isFile()) {

return f.delete();

} else if (!recursive && f.isDirectory() &&

(FileUtil.listFiles(f).length != 0)) {

throw new IOException("Directory " + f.toString() + " is not empty");

}

return FileUtil.fullyDelete(f);

}

public FileStatus[] listStatus(Path f) throws IOException {

File localf = pathToFile(f);

FileStatus[] results;

if (!localf.exists()) {

throw new FileNotFoundException("File " + f + " does not exist");

}

if (localf.isFile()) {

return new FileStatus[] {

new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };

}

String[] names = localf.list();

if (names == null) {

return null;

}

results = new FileStatus[names.length];

int j = 0;

for (int i = 0; i < names.length; i++) {

try {

results[j] = getFileStatus(new Path(f, names[i]));

j++;

} catch (FileNotFoundException e) {

// ignore the files not found since the dir list may have have changed

// since the names[] list was generated.

}

}

if (j == names.length) {

return results;

}

return Arrays.copyOf(results, j);

}

public boolean mkdirs(Path f) throws IOException {

if(f == null) {

throw new IllegalArgumentException("mkdirs path arg is null");

}

Path parent = f.getParent();

File p2f = pathToFile(f);

if(parent != null) {

File parent2f = pathToFile(parent);

if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {

throw new FileAlreadyExistsException("Parent path is not a directory: "

+ parent);

}

}

return (parent == null || mkdirs(parent)) &&

(p2f.mkdir() || p2f.isDirectory());

}

@Override

public boolean mkdirs(Path f, FsPermission permission) throws IOException {

boolean b = mkdirs(f);

if(b) {

setPermission(f, permission);

}

return b;

}

protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)

throws IOException {

boolean b = mkdirs(f);

setPermission(f, absolutePermission);

return b;

}

@Override

public Path getHomeDirectory() {

return this.makeQualified(new Path(System.getProperty("user.home")));

}

@Override

public void setWorkingDirectory(Path newDir) {

workingDir = makeAbsolute(newDir);

checkPath(workingDir);

}

@Override

public Path getWorkingDirectory() {

return workingDir;

}

protected Path getInitialWorkingDirectory() {

return this.makeQualified(new Path(System.getProperty("user.dir")));

}

// In the case of the local filesystem, we can just rename the file.

public void moveFromLocalFile(Path src, Path dst) throws IOException {

rename(src, dst);

}

// We can write output directly to the final location

public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)

throws IOException {

return fsOutputFile;

}

// It's in the right place - nothing to do.

public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)

throws IOException {

}

public void close() throws IOException {

super.close();

}

public String toString() {

return "LocalFS";

}

public FileStatus getFileStatus(Path f) throws IOException {

File path = pathToFile(f);

if (path.exists()) {

return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);

} else {

throw new FileNotFoundException("File " + f + " does not exist");

}

}

static class RawLocalFileStatus extends FileStatus {

private boolean isPermissionLoaded() {

return !super.getOwner().equals("");

}

RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {

super(f.length(), f.isDirectory(), 1, defaultBlockSize,

f.lastModified(), fs.makeQualified(new Path(f.getPath())));

}

@Override

public FsPermission getPermission() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getPermission();

}

@Override

public String getOwner() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getOwner();

}

@Override

public String getGroup() {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

return super.getGroup();

}

/// loads permissions, owner, and group from `ls -ld`

private void loadPermissionInfo() {

IOException e = null;

try {

StringTokenizer t = new StringTokenizer(

execCommand(new File(getPath().toUri()),

Shell.getGET_PERMISSION_COMMAND()));

//expected format

//-rw------- 1 username groupname ...

String permission = t.nextToken();

if (permission.length() > 10) { //files with ACLs might have a '+'

permission = permission.substring(0, 10);

}

setPermission(FsPermission.valueOf(permission));

t.nextToken();

setOwner(t.nextToken());

setGroup(t.nextToken());

} catch (Shell.ExitCodeException ioe) {

if (ioe.getExitCode() != 1) {

e = ioe;

} else {

setPermission(null);

setOwner(null);

setGroup(null);

}

} catch (IOException ioe) {

e = ioe;

} finally {

if (e != null) {

throw new RuntimeException("Error while running command to get " +

"file permissions : " +

StringUtils.stringifyException(e));

}

}

}

@Override

public void write(DataOutput out) throws IOException {

if (!isPermissionLoaded()) {

loadPermissionInfo();

}

super.write(out);

}

}

@Override

public void setOwner(Path p, String username, String groupname)

throws IOException {

if (username == null && groupname == null) {

throw new IOException("username == null && groupname == null");

}

if (username == null) {

execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);

} else {

//OWNER[:[GROUP]]

String s = username + (groupname == null? "": ":" + groupname);

execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);

}

}

@Override

public void setPermission(Path p, FsPermission permission)

throws IOException {

try{

if (NativeIO.isAvailable()) {

NativeIO.chmod(pathToFile(p).getCanonicalPath(),

permission.toShort());

} else {

execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,

String.format("%05o", permission.toShort()));

}

}catch (IOException e) {

e.printStackTrace();

}

}

private static String execCommand(File f, String... cmd) throws IOException {

String[] args = new String[cmd.length + 1];

System.arraycopy(cmd, 0, args, 0, cmd.length);

args[cmd.length] = FileUtil.makeShellPath(f, true);

String output = Shell.execCommand(args);

return output;

}

@Override

public boolean delete(Path arg0) throws IOException {

// TODO Auto-generated method stub

return false;

}

}

[/code]

繼續閱讀