天天看點

Hadoop-0.20.0源代碼分析(06)

在閱讀Hadoop源代碼過程中,在org.apache.hadoop.security.UnixUserGroupInformation類中,需要擷取到Unix系統的使用者名和所屬組的資訊,就需要通過執行Shell指令得到相應的結果,這裡,通過閱讀Hadoop項目org.apache.hadoop.util包、org.apache.hadoop.fs.shell包、org.apache.hadoop.fs包中檔案來了解,Hadoop基于Shell指令與底層Unix作業系統的互動,以及在MapReduce模型中通過指令行的方式送出管理計算任務的一些詳細情況。

首先看一下,與Unix系統指令行執行有關的一些類的繼承層次關系:

◦org.apache.hadoop.util.Shell

◦org.apache.hadoop.util.Shell.ShellCommandExecutor

◦org.apache.hadoop.fs.DF

◦org.apache.hadoop.fs.DU

◦org.apache.hadoop.fs.FileUtil.CygPathCommand

Shell指令最頂層的抽象類是org.apache.hadoop.util.Shell,它定義了如何在目前檔案系統環境中,與底層的Unix系統通過指令行進行必要的互動。

從org.apache.hadoop.util.Shell類定義的屬性來看,可以分為兩種類型的屬性,一種是static final的字元串指令,另一種是與實作指令的執行相關的屬性。第一種屬性(我把兩個static final的擷取指令的方式也列出,放到這裡的屬性的後面)如下所示:

/** Unix指令whoami :執行指令得到目前使用者名 */

public final static String USER_NAME_COMMAND = "whoami";

/** Unix指令chmod :執行指令設定使用者操作權限 */

public static final String SET_PERMISSION_COMMAND = "chmod";

/** Unix指令chown :執行指令設定屬主 */

public static final String SET_OWNER_COMMAND = "chown";

/** Unix指令chgrp :執行指令設定組 */

public static final String SET_GROUP_COMMAND = "chgrp";

/** Unix指令bash -c groups :執行指令得到目前使用者所屬的組清單 */

public static String[] getGROUPS_COMMAND() {

return new String[]{"bash", "-c", "groups"};

}

/** Unix指令ls -ld :執行指令設定組,不支援Windows系統,但可以支援Cygwin */

public static String[] getGET_PERMISSION_COMMAND() {

return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};

}

看到這些Unix的指令,應該非常熟悉。

第二種屬性,都屬于與如何實作定義的上述指令行的執行有關的,如下所示:

private long interval; // 重新整理間隔

private long lastTime; // 最後執行指令的時間

private Map<String, String> environment; // 指令行執行所需要的作業系統環境

private File dir;

private Process process; // 執行指令行的子程序

private int exitCode; // 執行指令行完成後,退出狀态碼

dir屬性表示目前執行指令所在的工作目錄,environment屬性表示目前指令執行的環境,它們在Shell類中都提供了一個受保護的設定操作,可以在Shell抽象類的子類中根據需要設定不同工作目錄和環境,其中,dir預設為系統“user.dir”變量值,environment使用系統預設的環境。

通過interval與lastTime屬性來檢查,是否有必要重新執行一次,如果是就執行,否則重置退出狀态碼exitCode為0,正常退出,可以在Shell類的run方法中看到:

protected void run() throws IOException {

if (lastTime + interval > System.currentTimeMillis())

return; // 不需要重新執行指令行,傳回

exitCode = 0;

runCommand(); // 調用:需要重新執行指令行

}

通過runCommand方法就可以執行指定的Shell指令,它是Shell類的核心。在分析runCommand方法之前,先了解一下與Shell指令執行相關的環境資訊。

當在Windows系統下,打開一個cmd視窗的時候,執行set指令,就能看到目前系統的環境變量的資訊,如下所示:

ALLUSERSPROFILE=C:/Documents and Settings/All Users

APPDATA=C:/Documents and Settings/Administrator/Application Data

CLASSPATH=.;E:/Program Files/Java/jdk1.6.0_14/lib/tools.jar;E:/Program Files/Java/jdk1.6.0_14/lib/dt.jar;E:/Program Files/Java/jdk1.6.0_14/jre/lib/rt.jar;E:/Program Files/Java/jdk1.6.0_14/jre/lib/charsets.jar

CLIENTNAME=Console

CommonProgramFiles=C:/Program Files/Common Files

COMPUTERNAME=SYJ

ComSpec=C:/WINDOWS/system32/cmd.exe

DEVMGR_SHOW_NONPRESENT_DEVICES=1

FP_NO_HOST_CHECK=NO

HERITRIX_HOME=E:/MyEclipse/workspace/heritrix

HOME=C:/Documents and Settings/Administrator

HOMEDRIVE=C:

HOMEPATH=/Documents and Settings/Administrator

JAVA_HOME=E:/Program Files/Java/jdk1.6.0_14

JSERV=D:/oracle/ora92/Apache/Jserv/conf

LOGONSERVER=//SYJ

NUMBER_OF_PROCESSORS=2

NUTSUFFIX=1

NUT_SUFFIXED_SEARCHING=1

OS=Windows_NT

Path=D:/oracle/ora92/bin;C:/Program Files/Oracle/jre/1.3.1/bin;C:/Program Files/Oracle/jre/1.1.8/bin;E:/Program Files/CollabNet Subversion Client;E:/Program Files/Java/jdk1.6.0_14/bin;C:/WINDOWS/system32;C:/WINDOWS;C:/WINDOWS/System32/Wbem;C:/Program Files/Microsoft SQL Server/80/Tools/Binn/;C:/Program Files/Microsoft SQL Server/90/Tools/binn/;C:/Program Files/MyEclipse 7.0M1/jre/bin;E:/Program Files/TortoiseSVN/bin;E:/PROGRA~1/F-Secure/SSHTRI~1;D:/Program Files/MySQL/MySQL Server 5.1/bin;F:/Program Files/Rational/common;C:/Program Files/StormII/Codec;C:/Program Files/StormII;C:/Program Files/SSH Communications Security/SSH Secure Shell;C:/Program Files/IDM Computer Solutions/UltraEdit/

PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH

PROCESSOR_ARCHITECTURE=x86

PROCESSOR_IDENTIFIER=x86 Family 6 Model 23 Stepping 10, GenuineIntel

PROCESSOR_LEVEL=6

PROCESSOR_REVISION=170a

ProgramFiles=C:/Program Files

PROMPT=$P$G

RATL_RTHOME=F:/Program Files/Rational/Rational Test

SESSIONNAME=Console

SystemDrive=C:

SystemRoot=C:/WINDOWS

TEMP=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp

TMP=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp

TMPDIR=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp

USERDOMAIN=SYJ

USERNAME=Administrator

USERPROFILE=C:/Documents and Settings/Administrator

VBOX_INSTALL_PATH=E:/Program Files/Sun/xVM VirtualBox/

VS90COMNTOOLS=d:/Microsoft Visual Studio 9.0/Common7/Tools/

windir=C:/WINDOWS

WV_GATEWAY_CFG=D:/oracle/ora92/Apache/modplsql/cfg/wdbsvr.app

這些環境變量的資訊都是以鍵值對的形式出現的,當在作業系統上運作相關的應用程式的時候,其實就是在上述環境變量所設定的一個上下文中運作,環境是應用程式運作不可缺少的條件。你在Unix系統中執行env指令,同樣也能得到與上面類似的鍵值對的環境變量資訊。

是以,org.apache.hadoop.util.Shell作為Shell指令的抽象,一定要擷取到目前所在作業系統(Unix系統) 的環境變量,在這樣一個上下文中,才能如同從Unix系統中輸入執行Shell指令進行執行一樣。

在Java中,實作了一個java.lang.ProcessBuilder類,該類能夠建立一個作業系統的程序,通過為該程序設定運作環境變量,進而啟動進行執行。預設情況下ProcessBuilder類已經實作了設定目前作業系統環境的功能,可以通過environment()方法檢視它的執行個體所具有的環境資訊,這等價于使用System.getenv()擷取到的環境變量,都是以鍵值對的形式存在于ProcessBuilder類執行個體的上下文中。

下面,分析Shell類實作執行指令的過程,實作方法為runCommand,如下所示:

private void runCommand() throws IOException {

ProcessBuilder builder = new ProcessBuilder(getExecString()); // 方法getExecString()在該類中式抽象的,需要在實作類中實作,擷取到一個指令名稱及其參數,進而基于此 構造一個ProcessBuilder程序執行個體

boolean completed = false; // 辨別執行指令完成情況

if (environment != null) {

builder.environment().putAll(this.environment); // 如果有必要,設定指令行執行環境

}

if (dir != null) {

builder.directory(this.dir); // 如果必要,設定指令行執行所在工作目錄

}

process = builder.start(); // 啟動ProcessBuilder builder程序,傳回一個用來管理指令行執行情況的子程序process

final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); // 當builder程序啟動後,檢查送出的指令行是否合法,如果不合法或者執行出錯,将出錯資訊寫入到緩沖流中,可以從其中解析讀取出來

BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); // 執行指令傳回執行結果,通過process管理子線程來擷取執行流中的執行結果資訊

final StringBuffer errMsg = new StringBuffer(); // 存放執行指令出錯資訊的String緩沖區

// 定義解析線程,解析指令行執行出錯資訊所在的流,解析完成後釋放流緩沖區

Thread errThread = new Thread() {

@Override

public void run() {

try {

String line = errReader.readLine();

while((line != null) && !isInterrupted()) {

errMsg.append(line);

errMsg.append(System.getProperty("line.separator"));

line = errReader.readLine();

}

} catch(IOException ioe) {

LOG.warn("Error reading the error stream", ioe);

}

}

};

try {

errThread.start(); // 啟動線程,處理出錯資訊

} catch (IllegalStateException ise) { }

try {

parseExecResult(inReader); // 調用:解析執行指令傳回的結果資訊

String line = inReader.readLine();

while(line != null) {

line = inReader.readLine();

}

exitCode = process.waitFor(); // 等待程序process處理完畢,置exitCode狀态碼

try {

errThread.join(); // 等待出錯資訊處理線程執行完成

} catch (InterruptedException ie) {

LOG.warn("Interrupted while reading the error stream", ie);

}

completed = true; // 置指令行執行完成狀态

if (exitCode != 0) {

throw new ExitCodeException(exitCode, errMsg.toString());

}

} catch (InterruptedException ie) {

throw new IOException(ie.toString());

} finally {

try {

inReader.close(); // 最後,需要關閉流對象

} catch (IOException ioe) {

LOG.warn("Error while closing the input stream", ioe);

}

if (!completed) {

errThread.interrupt(); // 可能處理錯誤資訊的線程發生異常,未能置completed=true,最後終止要該線程

}

try {

errReader.close(); // 關閉流對象

} catch (IOException ioe) {

LOG.warn("Error while closing the error stream", ioe);

}

process.destroy(); // 終止子程序process

lastTime = System.currentTimeMillis(); // 設定目前時間為該指令行執行的最後時間,為了判斷一個指令是否需要被重新執行

}

}

上面已經做了詳細的注釋,基本上闡明了一個指令行的執行過程。

在類中,還提供了一個static方法execCommand,為執行指令提供入口:

public static String execCommand(String ... cmd) throws IOException {

return execCommand(null, cmd);

}

執行該方法,調用了另一個重載的execCommand方法,傳回指令執行結果的資訊。

注意,在Shell抽象類中并沒有實作該怎樣擷取一個指令名稱及其參數的方法,需要在實作類中給出,是以,在Shell類内部定義了一個靜态内部類ShellCommandExecutor,該類實作了擷取指令名稱及其參數的方法。在上面方法execCommand中,調用了一個重載的execCommand方法,該方法中通過執行個體化一個ShellCommandExecutor類,來提供擷取指令名稱及其參數,進而構造一個ProcessBuilder執行個體,建立一個作業系統線程來執行指令行。

? extends Shell 

下面看實作Shell抽象類的一些子類的實作。

  • ShellCommandExecutor類

ShellComandExecutor類的實作如下所示:

public static class ShellCommandExecutor extends Shell {

private String[] command; // 指令名稱及其參數

private StringBuffer output; // 存放執行指令行傳回結果的String緩沖區

public ShellCommandExecutor(String[] execString) {

command = execString.clone();

}

public ShellCommandExecutor(String[] execString, File dir) {

this(execString);

this.setWorkingDirectory(dir);

}

public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) {

this(execString, dir);

this.setEnvironment(env);

}

/** 繼承自Shell基類,執行指令行 */

public void execute() throws IOException {

this.run();

}

protected String[] getExecString() {

return command; // 輸入的就是指令名稱+參數的格式,直接得到

}

/**

* 解析指令行執行後的輸出結果流,存放到String緩沖區中

*/

protected void parseExecResult(BufferedReader lines) throws IOException {

output = new StringBuffer();

char[] buf = new char[512];

int nRead;

while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {

output.append(buf, 0, nRead);

}

}

  • DF類

org.apache.hadoop.fs.DF類實作了Unix系統中Shell指令df,用來擷取磁盤使用情況的統計資料。該Shell實作類中定義域df指令操作相關的内容,可以從屬性來看:

public static final long DF_INTERVAL_DEFAULT = 3 * 1000; // 設定df指令重新整理間隔為3s

private String dirPath; // 執行df指令所在工作目錄

private String filesystem; // 磁盤裝置名

private long capacity; // 磁盤總容量

private long used; // 磁盤使用量

private long available; // 磁盤可用量

private int percentUsed; // 磁盤使用率

private String mount; // 磁盤挂載位置

隻需要實作Shell類定義的getExecString與parseExecResult方法即可。比較簡單,getExecString方法實作如下:

protected String[] getExecString() {

return new String[] {"bash","-c","exec 'df' '-k' '" + dirPath + "' 2>/dev/null"};

}

該方法傳回的字元串數組,用來構造一個ProcessBuilder程序執行個體。

parseExecResult方法實作如下所示:

protected void parseExecResult(BufferedReader lines) throws IOException {

lines.readLine(); // 去掉流中的首行

String line = lines.readLine();

if (line == null) {

throw new IOException( "Expecting a line not the end of stream" );

}

StringTokenizer tokens = new StringTokenizer(line, " /t/n/r/f%");

this.filesystem = tokens.nextToken();

if (!tokens.hasMoreTokens()) {

line = lines.readLine();

if (line == null) {

throw new IOException( "Expecting a line not the end of stream" );

}

tokens = new StringTokenizer(line, " /t/n/r/f%");

}

/**

* 下面處理并設定執行df -k指令的結果資訊

*/

this.capacity = Long.parseLong(tokens.nextToken()) * 1024;

this.used = Long.parseLong(tokens.nextToken()) * 1024;

this.available = Long.parseLong(tokens.nextToken()) * 1024;

this.percentUsed = Integer.parseInt(tokens.nextToken());

this.mount = tokens.nextToken();

}

  • DU類

DU類實作了Unix的du指令,顯示目錄或者檔案大小的資訊,具體實作可以參考org.apache.hadoop.fs.DU類,這裡跳過。

  • CygPathCommand類

CygPathCommand類是org.apache.hadoop.fs.FileUtil類的一個内部靜态類,實作了Windows系統上模拟Unix系統的Cygwin系統的cygpath指令,這裡跳過。