edu.umn.cs.spatialHadoop.operations.FileMBR 類主要功能為計算輸入資料的最小包圍矩形。
該類的核心實作為fileMBRMapReduce方法。該方法使用MapReduce Job進行計算。
FileMBR 主要實作了map,combine和reduce方法。接下來分别介紹這三個方法。
1. FileMBRMapper為Map類,該類map方法位
public void map(Rectangle dummy, Text text,
OutputCollector<Text, Partition> output, Reporter reporter)
throws IOException {
if (lastSplit != reporter.getInputSplit()) {
lastSplit = reporter.getInputSplit();
value.filename = ((FileSplit)lastSplit).getPath().getName();
fileName = new Text(value.filename);
}
value.size = text.getLength() + 1; // +1 for new line
shape.fromText(text);
Rectangle mbr = shape.getMBR();
if (mbr != null) {
value.set(mbr);
output.collect(fileName, value);
}
}
}
map方法中傳入的key和value是自定義RecordReader的解析結果。重點在value即map方法中的傳入參數Text。 RecordReader會将原始資料中的每一行解析為spatialhadoop自定義的shape類型,并序列化為Text。map方法就是計算每一個shape的mbr,然後輸出。輸出格式為(檔案名,mbr)
2. Combine為Combine類,Reduce為Reduce類,兩者作用基本相同。reduce代碼如下
public static class Reduce extends MapReduceBase implements
Reducer<Text, Partition, NullWritable, Partition> {
@Override
public void reduce(Text filename, Iterator<Partition> values,
OutputCollector<NullWritable, Partition> output, Reporter reporter)
throws IOException {
if (values.hasNext()) {
Partition partition = values.next().clone();
while (values.hasNext()) {
partition.expand(values.next());
}
partition.cellId = Math.abs(filename.hashCode());
output.collect(NullWritable.get(), partition);
}
}
}
reduce方法循環的将相同檔案的兩個shape進行合并,計算兩個shape的mbr,具體計算方法如下,即計算出包圍這兩個shape的最小包圍矩形。
public void expand(final Shape s) {
Rectangle r = s.getMBR();
if (r.x1 < this.x1)
this.x1 = r.x1;
if (r.x2 > this.x2)
this.x2 = r.x2;
if (r.y1 < this.y1)
this.y1 = r.y1;
if (r.y2 > this.y2)
this.y2 = r.y2;
}
同時,pattiton中也儲存了檔案名,資料大小,記錄數這些資訊,在mbr合并的同時,也會将資料大小,記錄數進行相加,用來最後進行輸出。代碼如下:
public void expand(Partition p) {
super.expand(p);
// accumulate size
this.size += p.size;
this.recordCount += p.recordCount;
}
總結如下:
map方法輸出每一個shape的mbr
combine和reduce循序計算兩個shape的mbr。
接下來介紹FileMBR中的outputCommiter,代碼如下:
public static class MBROutputCommitter extends FileOutputCommitter {
// If input is a directory, save the MBR to a _master file there
@Override
public void commitJob(JobContext context) throws IOException {
try {
super.commitJob(context);
// Store the result back in the input file if it is a directory
JobConf job = context.getJobConf();
Path[] inPaths = SpatialInputFormat.getInputPaths(job);
Path inPath = inPaths[0]; // TODO Handle multiple file input
FileSystem inFs = inPath.getFileSystem(job);
if (!inFs.getFileStatus(inPath).isDir())
return;
Path gindex_path = new Path(inPath, "_master.heap");
// Answer has been already cached (may be by another job)
if (inFs.exists(gindex_path))
return;
PrintStream gout = new PrintStream(inFs.create(gindex_path, false));
// Read job result and concatenate everything to the master file
Path outPath = TextOutputFormat.getOutputPath(job);
FileSystem outFs = outPath.getFileSystem(job);
FileStatus[] results = outFs.listStatus(outPath);
for (FileStatus fileStatus : results) {
if (fileStatus.getLen() > 0 && fileStatus.getPath().getName().startsWith("part-")) {
LineReader lineReader = new LineReader(outFs.open(fileStatus.getPath()));
Text text = new Text();
while (lineReader.readLine(text) > 0) {
gout.println(text);
}
lineReader.close();
}
}
gout.close();
} catch (RuntimeException e) {
// This might happen of the input directory is read only
LOG.info("Error caching the output of FileMBR");
}
}
}
commitJob會在job成功運作之後執行,commitJob方法第一行是調用父類的該方法,這将會在輸出檔案夾中生成part-00000檔案,該檔案和下文所講的 _master.heap檔案完全相同,自定義的commitJob方法實質上就是将part-00000檔案中的内容複制到_master.heap檔案中。
FileMBR的輸出:
FileMBR會在輸入檔案夾下生成一個全局索引檔案,該檔案由MBROutputCommitter生成。命名為“_master.heap”,内容執行個體如下
488954273,-176.2562062,-54.9019677,178.4890059,78.2138853,193076,59151087,cemetery.bz2
1789326676,-179.8728244,-89.9678417,179.7586087,78.6569788,1767137,618575974,sports.bz2
每一行代表一個檔案,各個字段分别代表:cellID,x1,y1,x2,y2,recordcount,filesize,filename
cellID為filename的hashcode
(x1,y1)-(x2,y2)為該檔案的最小包圍矩形
recordcount為該檔案資料記錄數
filesize為該檔案未壓縮的大小
filename為該檔案名