在閱讀Hadoop源代碼過程中,在org.apache.hadoop.security.UnixUserGroupInformation類中,需要擷取到Unix系統的使用者名和所屬組的資訊,就需要通過執行Shell指令得到相應的結果,這裡,通過閱讀Hadoop項目org.apache.hadoop.util包、org.apache.hadoop.fs.shell包、org.apache.hadoop.fs包中檔案來了解,Hadoop基于Shell指令與底層Unix作業系統的互動,以及在MapReduce模型中通過指令行的方式送出管理計算任務的一些詳細情況。
首先看一下,與Unix系統指令行執行有關的一些類的繼承層次關系:
◦org.apache.hadoop.util.Shell
◦org.apache.hadoop.util.Shell.ShellCommandExecutor
◦org.apache.hadoop.fs.DF
◦org.apache.hadoop.fs.DU
◦org.apache.hadoop.fs.FileUtil.CygPathCommand
Shell指令最頂層的抽象類是org.apache.hadoop.util.Shell,它定義了如何在目前檔案系統環境中,與底層的Unix系統通過指令行進行必要的互動。
從org.apache.hadoop.util.Shell類定義的屬性來看,可以分為兩種類型的屬性,一種是static final的字元串指令,另一種是與實作指令的執行相關的屬性。第一種屬性(我把兩個static final的擷取指令的方式也列出,放到這裡的屬性的後面)如下所示:
/** Unix指令whoami :執行指令得到目前使用者名 */
public final static String USER_NAME_COMMAND = "whoami";
/** Unix指令chmod :執行指令設定使用者操作權限 */
public static final String SET_PERMISSION_COMMAND = "chmod";
/** Unix指令chown :執行指令設定屬主 */
public static final String SET_OWNER_COMMAND = "chown";
/** Unix指令chgrp :執行指令設定組 */
public static final String SET_GROUP_COMMAND = "chgrp";
/** Unix指令bash -c groups :執行指令得到目前使用者所屬的組清單 */
public static String[] getGROUPS_COMMAND() {
return new String[]{"bash", "-c", "groups"};
}
/** Unix指令ls -ld :執行指令設定組,不支援Windows系統,但可以支援Cygwin */
public static String[] getGET_PERMISSION_COMMAND() {
return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
}
看到這些Unix的指令,應該非常熟悉。
第二種屬性,都屬于與如何實作定義的上述指令行的執行有關的,如下所示:
private long interval; // 重新整理間隔
private long lastTime; // 最後執行指令的時間
private Map<String, String> environment; // 指令行執行所需要的作業系統環境
private File dir;
private Process process; // 執行指令行的子程序
private int exitCode; // 執行指令行完成後,退出狀态碼
dir屬性表示目前執行指令所在的工作目錄,environment屬性表示目前指令執行的環境,它們在Shell類中都提供了一個受保護的設定操作,可以在Shell抽象類的子類中根據需要設定不同工作目錄和環境,其中,dir預設為系統“user.dir”變量值,environment使用系統預設的環境。
通過interval與lastTime屬性來檢查,是否有必要重新執行一次,如果是就執行,否則重置退出狀态碼exitCode為0,正常退出,可以在Shell類的run方法中看到:
protected void run() throws IOException {
if (lastTime + interval > System.currentTimeMillis())
return; // 不需要重新執行指令行,傳回
exitCode = 0;
runCommand(); // 調用:需要重新執行指令行
}
通過runCommand方法就可以執行指定的Shell指令,它是Shell類的核心。在分析runCommand方法之前,先了解一下與Shell指令執行相關的環境資訊。
當在Windows系統下,打開一個cmd視窗的時候,執行set指令,就能看到目前系統的環境變量的資訊,如下所示:
ALLUSERSPROFILE=C:/Documents and Settings/All Users
APPDATA=C:/Documents and Settings/Administrator/Application Data
CLASSPATH=.;E:/Program Files/Java/jdk1.6.0_14/lib/tools.jar;E:/Program Files/Java/jdk1.6.0_14/lib/dt.jar;E:/Program Files/Java/jdk1.6.0_14/jre/lib/rt.jar;E:/Program Files/Java/jdk1.6.0_14/jre/lib/charsets.jar
CLIENTNAME=Console
CommonProgramFiles=C:/Program Files/Common Files
COMPUTERNAME=SYJ
ComSpec=C:/WINDOWS/system32/cmd.exe
DEVMGR_SHOW_NONPRESENT_DEVICES=1
FP_NO_HOST_CHECK=NO
HERITRIX_HOME=E:/MyEclipse/workspace/heritrix
HOME=C:/Documents and Settings/Administrator
HOMEDRIVE=C:
HOMEPATH=/Documents and Settings/Administrator
JAVA_HOME=E:/Program Files/Java/jdk1.6.0_14
JSERV=D:/oracle/ora92/Apache/Jserv/conf
LOGONSERVER=//SYJ
NUMBER_OF_PROCESSORS=2
NUTSUFFIX=1
NUT_SUFFIXED_SEARCHING=1
OS=Windows_NT
Path=D:/oracle/ora92/bin;C:/Program Files/Oracle/jre/1.3.1/bin;C:/Program Files/Oracle/jre/1.1.8/bin;E:/Program Files/CollabNet Subversion Client;E:/Program Files/Java/jdk1.6.0_14/bin;C:/WINDOWS/system32;C:/WINDOWS;C:/WINDOWS/System32/Wbem;C:/Program Files/Microsoft SQL Server/80/Tools/Binn/;C:/Program Files/Microsoft SQL Server/90/Tools/binn/;C:/Program Files/MyEclipse 7.0M1/jre/bin;E:/Program Files/TortoiseSVN/bin;E:/PROGRA~1/F-Secure/SSHTRI~1;D:/Program Files/MySQL/MySQL Server 5.1/bin;F:/Program Files/Rational/common;C:/Program Files/StormII/Codec;C:/Program Files/StormII;C:/Program Files/SSH Communications Security/SSH Secure Shell;C:/Program Files/IDM Computer Solutions/UltraEdit/
PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH
PROCESSOR_ARCHITECTURE=x86
PROCESSOR_IDENTIFIER=x86 Family 6 Model 23 Stepping 10, GenuineIntel
PROCESSOR_LEVEL=6
PROCESSOR_REVISION=170a
ProgramFiles=C:/Program Files
PROMPT=$P$G
RATL_RTHOME=F:/Program Files/Rational/Rational Test
SESSIONNAME=Console
SystemDrive=C:
SystemRoot=C:/WINDOWS
TEMP=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp
TMP=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp
TMPDIR=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp
USERDOMAIN=SYJ
USERNAME=Administrator
USERPROFILE=C:/Documents and Settings/Administrator
VBOX_INSTALL_PATH=E:/Program Files/Sun/xVM VirtualBox/
VS90COMNTOOLS=d:/Microsoft Visual Studio 9.0/Common7/Tools/
windir=C:/WINDOWS
WV_GATEWAY_CFG=D:/oracle/ora92/Apache/modplsql/cfg/wdbsvr.app
這些環境變量的資訊都是以鍵值對的形式出現的,當在作業系統上運作相關的應用程式的時候,其實就是在上述環境變量所設定的一個上下文中運作,環境是應用程式運作不可缺少的條件。你在Unix系統中執行env指令,同樣也能得到與上面類似的鍵值對的環境變量資訊。
是以,org.apache.hadoop.util.Shell作為Shell指令的抽象,一定要擷取到目前所在作業系統(Unix系統) 的環境變量,在這樣一個上下文中,才能如同從Unix系統中輸入執行Shell指令進行執行一樣。
在Java中,實作了一個java.lang.ProcessBuilder類,該類能夠建立一個作業系統的程序,通過為該程序設定運作環境變量,進而啟動進行執行。預設情況下ProcessBuilder類已經實作了設定目前作業系統環境的功能,可以通過environment()方法檢視它的執行個體所具有的環境資訊,這等價于使用System.getenv()擷取到的環境變量,都是以鍵值對的形式存在于ProcessBuilder類執行個體的上下文中。
下面,分析Shell類實作執行指令的過程,實作方法為runCommand,如下所示:
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString()); // 方法getExecString()在該類中式抽象的,需要在實作類中實作,擷取到一個指令名稱及其參數,進而基于此 構造一個ProcessBuilder程序執行個體
boolean completed = false; // 辨別執行指令完成情況
if (environment != null) {
builder.environment().putAll(this.environment); // 如果有必要,設定指令行執行環境
}
if (dir != null) {
builder.directory(this.dir); // 如果必要,設定指令行執行所在工作目錄
}
process = builder.start(); // 啟動ProcessBuilder builder程序,傳回一個用來管理指令行執行情況的子程序process
final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); // 當builder程序啟動後,檢查送出的指令行是否合法,如果不合法或者執行出錯,将出錯資訊寫入到緩沖流中,可以從其中解析讀取出來
BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); // 執行指令傳回執行結果,通過process管理子線程來擷取執行流中的執行結果資訊
final StringBuffer errMsg = new StringBuffer(); // 存放執行指令出錯資訊的String緩沖區
// 定義解析線程,解析指令行執行出錯資訊所在的流,解析完成後釋放流緩沖區
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while((line != null) && !isInterrupted()) {
errMsg.append(line);
errMsg.append(System.getProperty("line.separator"));
line = errReader.readLine();
}
} catch(IOException ioe) {
LOG.warn("Error reading the error stream", ioe);
}
}
};
try {
errThread.start(); // 啟動線程,處理出錯資訊
} catch (IllegalStateException ise) { }
try {
parseExecResult(inReader); // 調用:解析執行指令傳回的結果資訊
String line = inReader.readLine();
while(line != null) {
line = inReader.readLine();
}
exitCode = process.waitFor(); // 等待程序process處理完畢,置exitCode狀态碼
try {
errThread.join(); // 等待出錯資訊處理線程執行完成
} catch (InterruptedException ie) {
LOG.warn("Interrupted while reading the error stream", ie);
}
completed = true; // 置指令行執行完成狀态
if (exitCode != 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
try {
inReader.close(); // 最後,需要關閉流對象
} catch (IOException ioe) {
LOG.warn("Error while closing the input stream", ioe);
}
if (!completed) {
errThread.interrupt(); // 可能處理錯誤資訊的線程發生異常,未能置completed=true,最後終止要該線程
}
try {
errReader.close(); // 關閉流對象
} catch (IOException ioe) {
LOG.warn("Error while closing the error stream", ioe);
}
process.destroy(); // 終止子程序process
lastTime = System.currentTimeMillis(); // 設定目前時間為該指令行執行的最後時間,為了判斷一個指令是否需要被重新執行
}
}
上面已經做了詳細的注釋,基本上闡明了一個指令行的執行過程。
在類中,還提供了一個static方法execCommand,為執行指令提供入口:
public static String execCommand(String ... cmd) throws IOException {
return execCommand(null, cmd);
}
執行該方法,調用了另一個重載的execCommand方法,傳回指令執行結果的資訊。
注意,在Shell抽象類中并沒有實作該怎樣擷取一個指令名稱及其參數的方法,需要在實作類中給出,是以,在Shell類内部定義了一個靜态内部類ShellCommandExecutor,該類實作了擷取指令名稱及其參數的方法。在上面方法execCommand中,調用了一個重載的execCommand方法,該方法中通過執行個體化一個ShellCommandExecutor類,來提供擷取指令名稱及其參數,進而構造一個ProcessBuilder執行個體,建立一個作業系統線程來執行指令行。
? extends Shell
下面看實作Shell抽象類的一些子類的實作。
- ShellCommandExecutor類
ShellComandExecutor類的實作如下所示:
public static class ShellCommandExecutor extends Shell {
private String[] command; // 指令名稱及其參數
private StringBuffer output; // 存放執行指令行傳回結果的String緩沖區
public ShellCommandExecutor(String[] execString) {
command = execString.clone();
}
public ShellCommandExecutor(String[] execString, File dir) {
this(execString);
this.setWorkingDirectory(dir);
}
public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) {
this(execString, dir);
this.setEnvironment(env);
}
/** 繼承自Shell基類,執行指令行 */
public void execute() throws IOException {
this.run();
}
protected String[] getExecString() {
return command; // 輸入的就是指令名稱+參數的格式,直接得到
}
/**
* 解析指令行執行後的輸出結果流,存放到String緩沖區中
*/
protected void parseExecResult(BufferedReader lines) throws IOException {
output = new StringBuffer();
char[] buf = new char[512];
int nRead;
while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {
output.append(buf, 0, nRead);
}
}
- DF類
org.apache.hadoop.fs.DF類實作了Unix系統中Shell指令df,用來擷取磁盤使用情況的統計資料。該Shell實作類中定義域df指令操作相關的内容,可以從屬性來看:
public static final long DF_INTERVAL_DEFAULT = 3 * 1000; // 設定df指令重新整理間隔為3s
private String dirPath; // 執行df指令所在工作目錄
private String filesystem; // 磁盤裝置名
private long capacity; // 磁盤總容量
private long used; // 磁盤使用量
private long available; // 磁盤可用量
private int percentUsed; // 磁盤使用率
private String mount; // 磁盤挂載位置
隻需要實作Shell類定義的getExecString與parseExecResult方法即可。比較簡單,getExecString方法實作如下:
protected String[] getExecString() {
return new String[] {"bash","-c","exec 'df' '-k' '" + dirPath + "' 2>/dev/null"};
}
該方法傳回的字元串數組,用來構造一個ProcessBuilder程序執行個體。
parseExecResult方法實作如下所示:
protected void parseExecResult(BufferedReader lines) throws IOException {
lines.readLine(); // 去掉流中的首行
String line = lines.readLine();
if (line == null) {
throw new IOException( "Expecting a line not the end of stream" );
}
StringTokenizer tokens = new StringTokenizer(line, " /t/n/r/f%");
this.filesystem = tokens.nextToken();
if (!tokens.hasMoreTokens()) {
line = lines.readLine();
if (line == null) {
throw new IOException( "Expecting a line not the end of stream" );
}
tokens = new StringTokenizer(line, " /t/n/r/f%");
}
/**
* 下面處理并設定執行df -k指令的結果資訊
*/
this.capacity = Long.parseLong(tokens.nextToken()) * 1024;
this.used = Long.parseLong(tokens.nextToken()) * 1024;
this.available = Long.parseLong(tokens.nextToken()) * 1024;
this.percentUsed = Integer.parseInt(tokens.nextToken());
this.mount = tokens.nextToken();
}
- DU類
DU類實作了Unix的du指令,顯示目錄或者檔案大小的資訊,具體實作可以參考org.apache.hadoop.fs.DU類,這裡跳過。
- CygPathCommand類
CygPathCommand類是org.apache.hadoop.fs.FileUtil類的一個内部靜态類,實作了Windows系統上模拟Unix系統的Cygwin系統的cygpath指令,這裡跳過。