hadoop-0.20.2以上版本,若在windows下使用cygwin模拟,進行開發和測試。可能導緻
setPermission失敗,報異常導緻tasktracker無法啟動,在https://issues.apache.org/jira/browse/HADOOP-7682上有詳細的描述,但檢視hadoop的relese Note中還未對此作出修改(目前版本已經到了hadoop-1.0.2),是以采用自己修改源碼中相關代碼的方法,來修複此bug。
通過以上文章,或檢視啟動的報錯資訊(可通過指令hadoop tasttracker啟動tasktracker直接輸出到控制台看到報錯資訊,當然這是使用僞分布式的配置方式)可以了解到引起該異常的類為hadoop的common項目中org.apache.hadoop.fs.RawLocalFileSystem.java。
1、下載下傳源碼
可以到http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java下載下傳源碼
[code]
package org.apache.hadoop.fs;
import java.io.BufferedOutputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.StringTokenizer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class RawLocalFileSystem extends FileSystem {
static final URI NAME = URI.create("file:///");
private Path workingDir;
public RawLocalFileSystem() {
workingDir = getInitialWorkingDirectory();
}
private Path makeAbsolute(Path f) {
if (f.isAbsolute()) {
return f;
} else {
return new Path(workingDir, f);
}
}
public File pathToFile(Path path) {
checkPath(path);
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
}
return new File(path.toUri().getPath());
}
public URI getUri() { return NAME; }
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
}
class TrackingFileInputStream extends FileInputStream {
public TrackingFileInputStream(File f) throws IOException {
super(f);
}
public int read() throws IOException {
int result = super.read();
if (result != -1) {
statistics.incrementBytesRead(1);
}
return result;
}
public int read(byte[] data) throws IOException {
int result = super.read(data);
if (result != -1) {
statistics.incrementBytesRead(result);
}
return result;
}
public int read(byte[] data, int offset, int length) throws IOException {
int result = super.read(data, offset, length);
if (result != -1) {
statistics.incrementBytesRead(result);
}
return result;
}
}
class LocalFSFileInputStream extends FSInputStream {
private FileInputStream fis;
private long position;
public LocalFSFileInputStream(Path f) throws IOException {
this.fis = new TrackingFileInputStream(pathToFile(f));
}
public void seek(long pos) throws IOException {
fis.getChannel().position(pos);
this.position = pos;
}
public long getPos() throws IOException {
return this.position;
}
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
public int available() throws IOException { return fis.available(); }
public void close() throws IOException { fis.close(); }
@Override
public boolean markSupported() { return false; }
public int read() throws IOException {
try {
int value = fis.read();
if (value >= 0) {
this.position++;
}
return value;
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
public int read(byte[] b, int off, int len) throws IOException {
try {
int value = fis.read(b, off, len);
if (value > 0) {
this.position += value;
}
return value;
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
public int read(long position, byte[] b, int off, int len)
throws IOException {
ByteBuffer bb = ByteBuffer.wrap(b, off, len);
try {
return fis.getChannel().read(bb, position);
} catch (IOException e) {
throw new FSError(e);
}
}
public long skip(long n) throws IOException {
long value = fis.skip(n);
if (value > 0) {
this.position += value;
}
return value;
}
}
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
if (!exists(f)) {
throw new FileNotFoundException(f.toString());
}
return new FSDataInputStream(new BufferedFSInputStream(
new LocalFSFileInputStream(f), bufferSize));
}
class LocalFSFileOutputStream extends OutputStream {
private FileOutputStream fos;
private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
this.fos = new FileOutputStream(pathToFile(f), append);
}
public void close() throws IOException { fos.close(); }
public void flush() throws IOException { fos.flush(); }
public void write(byte[] b, int off, int len) throws IOException {
try {
fos.write(b, off, len);
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
public void write(int b) throws IOException {
try {
fos.write(b);
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
}
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
if (!exists(f)) {
throw new FileNotFoundException("File " + f + " not found");
}
if (getFileStatus(f).isDirectory()) {
throw new IOException("Cannot append to a diretory (=" + f + " )");
}
return new FSDataOutputStream(new BufferedOutputStream(
new LocalFSFileOutputStream(f, true), bufferSize), statistics);
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress)
throws IOException {
return create(f, overwrite, true, bufferSize, replication, blockSize, progress);
}
private FSDataOutputStream create(Path f, boolean overwrite,
boolean createParent, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
if (exists(f) && !overwrite) {
throw new IOException("File already exists: "+f);
}
Path parent = f.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent.toString());
}
return new FSDataOutputStream(new BufferedOutputStream(
new LocalFSFileOutputStream(f, false), bufferSize), statistics);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
FSDataOutputStream out = create(f,
overwrite, bufferSize, replication, blockSize, progress);
setPermission(f, permission);
return out;
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
FSDataOutputStream out = create(f,
overwrite, false, bufferSize, replication, blockSize, progress);
setPermission(f, permission);
return out;
}
public boolean rename(Path src, Path dst) throws IOException {
if (pathToFile(src).renameTo(pathToFile(dst))) {
return true;
}
return FileUtil.copy(this, src, this, dst, true, getConf());
}
public boolean delete(Path p, boolean recursive) throws IOException {
File f = pathToFile(p);
if (f.isFile()) {
return f.delete();
} else if (!recursive && f.isDirectory() &&
(FileUtil.listFiles(f).length != 0)) {
throw new IOException("Directory " + f.toString() + " is not empty");
}
return FileUtil.fullyDelete(f);
}
public FileStatus[] listStatus(Path f) throws IOException {
File localf = pathToFile(f);
FileStatus[] results;
if (!localf.exists()) {
throw new FileNotFoundException("File " + f + " does not exist");
}
if (localf.isFile()) {
return new FileStatus[] {
new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };
}
String[] names = localf.list();
if (names == null) {
return null;
}
results = new FileStatus[names.length];
int j = 0;
for (int i = 0; i < names.length; i++) {
try {
results[j] = getFileStatus(new Path(f, names[i]));
j++;
} catch (FileNotFoundException e) {
// ignore the files not found since the dir list may have have changed
// since the names[] list was generated.
}
}
if (j == names.length) {
return results;
}
return Arrays.copyOf(results, j);
}
public boolean mkdirs(Path f) throws IOException {
if(f == null) {
throw new IllegalArgumentException("mkdirs path arg is null");
}
Path parent = f.getParent();
File p2f = pathToFile(f);
if(parent != null) {
File parent2f = pathToFile(parent);
if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
throw new FileAlreadyExistsException("Parent path is not a directory: "
+ parent);
}
}
return (parent == null || mkdirs(parent)) &&
(p2f.mkdir() || p2f.isDirectory());
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
boolean b = mkdirs(f);
if(b) {
setPermission(f, permission);
}
return b;
}
@Override
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
throws IOException {
boolean b = mkdirs(f);
setPermission(f, absolutePermission);
return b;
}
@Override
public Path getHomeDirectory() {
return this.makeQualified(new Path(System.getProperty("user.home")));
}
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = makeAbsolute(newDir);
checkPath(workingDir);
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
protected Path getInitialWorkingDirectory() {
return this.makeQualified(new Path(System.getProperty("user.dir")));
}
@Override
public FsStatus getStatus(Path p) throws IOException {
File partition = pathToFile(p == null ? new Path("/") : p);
//File provides getUsableSpace() and getFreeSpace()
//File provides no API to obtain used space, assume used = total - free
return new FsStatus(partition.getTotalSpace(),
partition.getTotalSpace() - partition.getFreeSpace(),
partition.getFreeSpace());
}
// In the case of the local filesystem, we can just rename the file.
public void moveFromLocalFile(Path src, Path dst) throws IOException {
rename(src, dst);
}
// We can write output directly to the final location
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
return fsOutputFile;
}
// It's in the right place - nothing to do.
public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
throws IOException {
}
public void close() throws IOException {
super.close();
}
public String toString() {
return "LocalFS";
}
public FileStatus getFileStatus(Path f) throws IOException {
File path = pathToFile(f);
if (path.exists()) {
return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
} else {
throw new FileNotFoundException("File " + f + " does not exist");
}
}
static class RawLocalFileStatus extends FileStatus {
private boolean isPermissionLoaded() {
return !super.getOwner().equals("");
}
RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
super(f.length(), f.isDirectory(), 1, defaultBlockSize,
f.lastModified(), fs.makeQualified(new Path(f.getPath())));
}
@Override
public FsPermission getPermission() {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
return super.getPermission();
}
@Override
public String getOwner() {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
return super.getOwner();
}
@Override
public String getGroup() {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
return super.getGroup();
}
/// loads permissions, owner, and group from `ls -ld`
private void loadPermissionInfo() {
IOException e = null;
try {
StringTokenizer t = new StringTokenizer(
execCommand(new File(getPath().toUri()),
Shell.getGET_PERMISSION_COMMAND()));
//expected format
//-rw------- 1 username groupname ...
String permission = t.nextToken();
if (permission.length() > 10) { //files with ACLs might have a '+'
permission = permission.substring(0, 10);
}
setPermission(FsPermission.valueOf(permission));
t.nextToken();
setOwner(t.nextToken());
setGroup(t.nextToken());
} catch (Shell.ExitCodeException ioe) {
if (ioe.getExitCode() != 1) {
e = ioe;
} else {
setPermission(null);
setOwner(null);
setGroup(null);
}
} catch (IOException ioe) {
e = ioe;
} finally {
if (e != null) {
throw new RuntimeException("Error while running command to get " +
"file permissions : " +
StringUtils.stringifyException(e));
}
}
}
@Override
public void write(DataOutput out) throws IOException {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
super.write(out);
}
}
@Override
public void setOwner(Path p, String username, String groupname)
throws IOException {
if (username == null && groupname == null) {
throw new IOException("username == null && groupname == null");
}
if (username == null) {
execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
} else {
//OWNER[:[GROUP]]
String s = username + (groupname == null? "": ":" + groupname);
execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
}
}
@Override
public void setPermission(Path p, FsPermission permission)
throws IOException {
if (NativeIO.isAvailable()) {
NativeIO.chmod(pathToFile(p).getCanonicalPath(),
permission.toShort());
} else {
execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
String.format("%05o", permission.toShort()));
}
}
private static String execCommand(File f, String... cmd) throws IOException {
String[] args = new String[cmd.length + 1];
System.arraycopy(cmd, 0, args, 0, cmd.length);
args[cmd.length] = FileUtil.makeShellPath(f, true);
String output = Shell.execCommand(args);
return output;
}
}
[/code]
2、修改源碼
(1)在Eclipse任意建立一個測試項目,在項目中建立包org.apache.hadoop.fs
(2)将以前下載下傳到的hadoop相關jar包導入到項目的類路徑中,本例中使用的是hadoop-1.0.1,将解壓後根路徑中的hadoop-core-1.0.1.jar等、lib目錄下所有的jar包都導入到項目的類路徑(classpath)中
(3)在第一步不建立的包org.apache.hadoop.fs下建立檔案RawLocalFileSystem.java檔案,将源碼複制到該檔案中。
(4)可以看到有幾處錯誤,分别修改即可:
a、The type RawLocalFileSystem must implement the inherited abstract method FileSystem.delete(Path),根據提示,重寫方法即可,ctrl+1,你懂的。
b、getFileStatus(f).isDirectory(),提示傳回的FileStatus類沒有方法IsDirectory(),将這裡的修改為new File(f.toString()).isDirectory(),注意需要導入java.io.File類,開發工具會提示你的。
c、primitiveMkdir(Path f, FsPermission absolutePermission)方法上提示The method primitiveMkdir(Path, FsPermission) of type RawLocalFileSystem must override or implement a supertype method,根據提示,去掉該方法前的@override注解,同理還有方法getInitialWorkingDirectory()也做同樣處理
d、getStatus(Path p) throws IOException,該方法需要新增加類FsStatus,目前沒有使用,是以直接将該方式删除
【e】、重要:以上都為去除編譯時候錯誤,這一步才是核心
setPermission(Path p, FsPermission permission) throws IOException方法,我們在内部捕捉IOException,捕捉到後,不再向外抛出該異常。
[code]
@Override
public void setPermission(Path p, FsPermission permission)
throws IOException {
try{
if (NativeIO.isAvailable()) {
NativeIO.chmod(pathToFile(p).getCanonicalPath(),
permission.toShort());
} else {
execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
String.format("%05o", permission.toShort()));
}
}catch (IOException e) {
e.printStackTrace();
}
}
[/code]
3、編譯以及将編譯後的檔案加入hadoop的核心包中,替換原來的檔案
(1)使用windows壓縮包工具(一般為rar工具)打開hadoop-core-1.0.1.jar檔案,定位到org\apache\hadoop\fs
(2)到項目的編譯後的檔案,一般在項目的bin目錄中,将org\apache\hadoop\fs\檔案夾中的所有檔案拖動到第一步打開的壓縮封包件中,在提示覆寫的時候選擇【是】
ok,修改完成,可以正常使用hadoop(無論是單機、僞分布式)
附件中附上hadoop.apache.org上源碼庫上針對hadoop-1.0.1的RawLocalFileSystem修改、編譯後的檔案,以後若版本變化也可按照上述方法自行修改。
2012-04-06
附上修改之後的源碼
[code]
package org.apache.hadoop.fs;
import java.io.BufferedOutputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.StringTokenizer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class RawLocalFileSystem extends FileSystem {
static final URI NAME = URI.create("file:///");
private Path workingDir;
public RawLocalFileSystem() {
workingDir = getInitialWorkingDirectory();
}
private Path makeAbsolute(Path f) {
if (f.isAbsolute()) {
return f;
} else {
return new Path(workingDir, f);
}
}
public File pathToFile(Path path) {
checkPath(path);
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
}
return new File(path.toUri().getPath());
}
public URI getUri() { return NAME; }
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
}
class TrackingFileInputStream extends FileInputStream {
public TrackingFileInputStream(File f) throws IOException {
super(f);
}
public int read() throws IOException {
int result = super.read();
if (result != -1) {
statistics.incrementBytesRead(1);
}
return result;
}
public int read(byte[] data) throws IOException {
int result = super.read(data);
if (result != -1) {
statistics.incrementBytesRead(result);
}
return result;
}
public int read(byte[] data, int offset, int length) throws IOException {
int result = super.read(data, offset, length);
if (result != -1) {
statistics.incrementBytesRead(result);
}
return result;
}
}
class LocalFSFileInputStream extends FSInputStream {
private FileInputStream fis;
private long position;
public LocalFSFileInputStream(Path f) throws IOException {
this.fis = new TrackingFileInputStream(pathToFile(f));
}
public void seek(long pos) throws IOException {
fis.getChannel().position(pos);
this.position = pos;
}
public long getPos() throws IOException {
return this.position;
}
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
public int available() throws IOException { return fis.available(); }
public void close() throws IOException { fis.close(); }
@Override
public boolean markSupported() { return false; }
public int read() throws IOException {
try {
int value = fis.read();
if (value >= 0) {
this.position++;
}
return value;
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
public int read(byte[] b, int off, int len) throws IOException {
try {
int value = fis.read(b, off, len);
if (value > 0) {
this.position += value;
}
return value;
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
public int read(long position, byte[] b, int off, int len)
throws IOException {
ByteBuffer bb = ByteBuffer.wrap(b, off, len);
try {
return fis.getChannel().read(bb, position);
} catch (IOException e) {
throw new FSError(e);
}
}
public long skip(long n) throws IOException {
long value = fis.skip(n);
if (value > 0) {
this.position += value;
}
return value;
}
}
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
if (!exists(f)) {
throw new FileNotFoundException(f.toString());
}
return new FSDataInputStream(new BufferedFSInputStream(
new LocalFSFileInputStream(f), bufferSize));
}
class LocalFSFileOutputStream extends OutputStream {
private FileOutputStream fos;
private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
this.fos = new FileOutputStream(pathToFile(f), append);
}
public void close() throws IOException { fos.close(); }
public void flush() throws IOException { fos.flush(); }
public void write(byte[] b, int off, int len) throws IOException {
try {
fos.write(b, off, len);
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
public void write(int b) throws IOException {
try {
fos.write(b);
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
}
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
if (!exists(f)) {
throw new FileNotFoundException("File " + f + " not found");
}
if (new File(f.toString()).isDirectory()) {
throw new IOException("Cannot append to a diretory (=" + f + " )");
}
return new FSDataOutputStream(new BufferedOutputStream(
new LocalFSFileOutputStream(f, true), bufferSize), statistics);
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress)
throws IOException {
return create(f, overwrite, true, bufferSize, replication, blockSize, progress);
}
private FSDataOutputStream create(Path f, boolean overwrite,
boolean createParent, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
if (exists(f) && !overwrite) {
throw new IOException("File already exists: "+f);
}
Path parent = f.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent.toString());
}
return new FSDataOutputStream(new BufferedOutputStream(
new LocalFSFileOutputStream(f, false), bufferSize), statistics);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
FSDataOutputStream out = create(f,
overwrite, bufferSize, replication, blockSize, progress);
setPermission(f, permission);
return out;
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
FSDataOutputStream out = create(f,
overwrite, false, bufferSize, replication, blockSize, progress);
setPermission(f, permission);
return out;
}
public boolean rename(Path src, Path dst) throws IOException {
if (pathToFile(src).renameTo(pathToFile(dst))) {
return true;
}
return FileUtil.copy(this, src, this, dst, true, getConf());
}
public boolean delete(Path p, boolean recursive) throws IOException {
File f = pathToFile(p);
if (f.isFile()) {
return f.delete();
} else if (!recursive && f.isDirectory() &&
(FileUtil.listFiles(f).length != 0)) {
throw new IOException("Directory " + f.toString() + " is not empty");
}
return FileUtil.fullyDelete(f);
}
public FileStatus[] listStatus(Path f) throws IOException {
File localf = pathToFile(f);
FileStatus[] results;
if (!localf.exists()) {
throw new FileNotFoundException("File " + f + " does not exist");
}
if (localf.isFile()) {
return new FileStatus[] {
new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };
}
String[] names = localf.list();
if (names == null) {
return null;
}
results = new FileStatus[names.length];
int j = 0;
for (int i = 0; i < names.length; i++) {
try {
results[j] = getFileStatus(new Path(f, names[i]));
j++;
} catch (FileNotFoundException e) {
// ignore the files not found since the dir list may have have changed
// since the names[] list was generated.
}
}
if (j == names.length) {
return results;
}
return Arrays.copyOf(results, j);
}
public boolean mkdirs(Path f) throws IOException {
if(f == null) {
throw new IllegalArgumentException("mkdirs path arg is null");
}
Path parent = f.getParent();
File p2f = pathToFile(f);
if(parent != null) {
File parent2f = pathToFile(parent);
if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
throw new FileAlreadyExistsException("Parent path is not a directory: "
+ parent);
}
}
return (parent == null || mkdirs(parent)) &&
(p2f.mkdir() || p2f.isDirectory());
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
boolean b = mkdirs(f);
if(b) {
setPermission(f, permission);
}
return b;
}
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
throws IOException {
boolean b = mkdirs(f);
setPermission(f, absolutePermission);
return b;
}
@Override
public Path getHomeDirectory() {
return this.makeQualified(new Path(System.getProperty("user.home")));
}
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = makeAbsolute(newDir);
checkPath(workingDir);
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
protected Path getInitialWorkingDirectory() {
return this.makeQualified(new Path(System.getProperty("user.dir")));
}
// In the case of the local filesystem, we can just rename the file.
public void moveFromLocalFile(Path src, Path dst) throws IOException {
rename(src, dst);
}
// We can write output directly to the final location
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
return fsOutputFile;
}
// It's in the right place - nothing to do.
public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
throws IOException {
}
public void close() throws IOException {
super.close();
}
public String toString() {
return "LocalFS";
}
public FileStatus getFileStatus(Path f) throws IOException {
File path = pathToFile(f);
if (path.exists()) {
return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
} else {
throw new FileNotFoundException("File " + f + " does not exist");
}
}
static class RawLocalFileStatus extends FileStatus {
private boolean isPermissionLoaded() {
return !super.getOwner().equals("");
}
RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
super(f.length(), f.isDirectory(), 1, defaultBlockSize,
f.lastModified(), fs.makeQualified(new Path(f.getPath())));
}
@Override
public FsPermission getPermission() {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
return super.getPermission();
}
@Override
public String getOwner() {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
return super.getOwner();
}
@Override
public String getGroup() {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
return super.getGroup();
}
/// loads permissions, owner, and group from `ls -ld`
private void loadPermissionInfo() {
IOException e = null;
try {
StringTokenizer t = new StringTokenizer(
execCommand(new File(getPath().toUri()),
Shell.getGET_PERMISSION_COMMAND()));
//expected format
//-rw------- 1 username groupname ...
String permission = t.nextToken();
if (permission.length() > 10) { //files with ACLs might have a '+'
permission = permission.substring(0, 10);
}
setPermission(FsPermission.valueOf(permission));
t.nextToken();
setOwner(t.nextToken());
setGroup(t.nextToken());
} catch (Shell.ExitCodeException ioe) {
if (ioe.getExitCode() != 1) {
e = ioe;
} else {
setPermission(null);
setOwner(null);
setGroup(null);
}
} catch (IOException ioe) {
e = ioe;
} finally {
if (e != null) {
throw new RuntimeException("Error while running command to get " +
"file permissions : " +
StringUtils.stringifyException(e));
}
}
}
@Override
public void write(DataOutput out) throws IOException {
if (!isPermissionLoaded()) {
loadPermissionInfo();
}
super.write(out);
}
}
@Override
public void setOwner(Path p, String username, String groupname)
throws IOException {
if (username == null && groupname == null) {
throw new IOException("username == null && groupname == null");
}
if (username == null) {
execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
} else {
//OWNER[:[GROUP]]
String s = username + (groupname == null? "": ":" + groupname);
execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
}
}
@Override
public void setPermission(Path p, FsPermission permission)
throws IOException {
try{
if (NativeIO.isAvailable()) {
NativeIO.chmod(pathToFile(p).getCanonicalPath(),
permission.toShort());
} else {
execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
String.format("%05o", permission.toShort()));
}
}catch (IOException e) {
e.printStackTrace();
}
}
private static String execCommand(File f, String... cmd) throws IOException {
String[] args = new String[cmd.length + 1];
System.arraycopy(cmd, 0, args, 0, cmd.length);
args[cmd.length] = FileUtil.makeShellPath(f, true);
String output = Shell.execCommand(args);
return output;
}
@Override
public boolean delete(Path arg0) throws IOException {
// TODO Auto-generated method stub
return false;
}
}
[/code]