天天看點

spatialhadoop2.3源碼閱讀(四) FileMBR類

edu.umn.cs.spatialHadoop.operations.FileMBR 類主要功能為計算輸入資料的最小包圍矩形。

該類的核心實作為fileMBRMapReduce方法。該方法使用MapReduce Job進行計算。

FileMBR 主要實作了map,combine和reduce方法。接下來分别介紹這三個方法。

1. FileMBRMapper為Map類,該類map方法位

public void map(Rectangle dummy, Text text,
        OutputCollector<Text, Partition> output, Reporter reporter)
            throws IOException {
      if (lastSplit != reporter.getInputSplit()) {
        lastSplit = reporter.getInputSplit();
        value.filename = ((FileSplit)lastSplit).getPath().getName();
        fileName = new Text(value.filename);
      }
      value.size = text.getLength() + 1; // +1 for new line
      shape.fromText(text);
      Rectangle mbr = shape.getMBR();

      if (mbr != null) {
        value.set(mbr);
        output.collect(fileName, value);
      }
    }
  }
           

map方法中傳入的key和value是自定義RecordReader的解析結果。重點在value即map方法中的傳入參數Text。 RecordReader會将原始資料中的每一行解析為spatialhadoop自定義的shape類型,并序列化為Text。map方法就是計算每一個shape的mbr,然後輸出。輸出格式為(檔案名,mbr)

2. Combine為Combine類,Reduce為Reduce類,兩者作用基本相同。reduce代碼如下

public static class Reduce extends MapReduceBase implements
    Reducer<Text, Partition, NullWritable, Partition> {
    @Override
    public void reduce(Text filename, Iterator<Partition> values,
        OutputCollector<NullWritable, Partition> output, Reporter reporter)
            throws IOException {
      if (values.hasNext()) {
        Partition partition = values.next().clone();
        while (values.hasNext()) {
          partition.expand(values.next());
        }
        partition.cellId = Math.abs(filename.hashCode());
        output.collect(NullWritable.get(), partition);
      }
    }
  }
           

reduce方法循環的将相同檔案的兩個shape進行合并,計算兩個shape的mbr,具體計算方法如下,即計算出包圍這兩個shape的最小包圍矩形。

public void expand(final Shape s) {
    Rectangle r = s.getMBR();
    if (r.x1 < this.x1)
      this.x1 = r.x1;
    if (r.x2 > this.x2)
      this.x2 = r.x2;
    if (r.y1 < this.y1)
      this.y1 = r.y1;
    if (r.y2 > this.y2)
      this.y2 = r.y2;
  }
           

同時,pattiton中也儲存了檔案名,資料大小,記錄數這些資訊,在mbr合并的同時,也會将資料大小,記錄數進行相加,用來最後進行輸出。代碼如下:

public void expand(Partition p) {
    super.expand(p);
    // accumulate size
    this.size += p.size;
    this.recordCount += p.recordCount;
  }
           

總結如下:

map方法輸出每一個shape的mbr

combine和reduce循序計算兩個shape的mbr。

接下來介紹FileMBR中的outputCommiter,代碼如下:

public static class MBROutputCommitter extends FileOutputCommitter {
    // If input is a directory, save the MBR to a _master file there
    @Override
    public void commitJob(JobContext context) throws IOException {
      try {
        super.commitJob(context);
        // Store the result back in the input file if it is a directory
        JobConf job = context.getJobConf();

        Path[] inPaths = SpatialInputFormat.getInputPaths(job);
        Path inPath = inPaths[0]; // TODO Handle multiple file input
        FileSystem inFs = inPath.getFileSystem(job);
        if (!inFs.getFileStatus(inPath).isDir())
          return;
        Path gindex_path = new Path(inPath, "_master.heap");
        // Answer has been already cached (may be by another job)
        if (inFs.exists(gindex_path))
          return;
        PrintStream gout = new PrintStream(inFs.create(gindex_path, false));

        // Read job result and concatenate everything to the master file
        Path outPath = TextOutputFormat.getOutputPath(job);
        FileSystem outFs = outPath.getFileSystem(job);
        FileStatus[] results = outFs.listStatus(outPath);
        for (FileStatus fileStatus : results) {
          if (fileStatus.getLen() > 0 && fileStatus.getPath().getName().startsWith("part-")) {
            LineReader lineReader = new LineReader(outFs.open(fileStatus.getPath()));
            Text text = new Text();
            while (lineReader.readLine(text) > 0) {
              gout.println(text);
            }
            lineReader.close();
          }
        }
        gout.close();
      } catch (RuntimeException e) {
        // This might happen of the input directory is read only
        LOG.info("Error caching the output of FileMBR");
      }
    }
  }
           

commitJob會在job成功運作之後執行,commitJob方法第一行是調用父類的該方法,這将會在輸出檔案夾中生成part-00000檔案,該檔案和下文所講的 _master.heap檔案完全相同,自定義的commitJob方法實質上就是将part-00000檔案中的内容複制到_master.heap檔案中。

FileMBR的輸出:

FileMBR會在輸入檔案夾下生成一個全局索引檔案,該檔案由MBROutputCommitter生成。命名為“_master.heap”,内容執行個體如下

488954273,-176.2562062,-54.9019677,178.4890059,78.2138853,193076,59151087,cemetery.bz2
1789326676,-179.8728244,-89.9678417,179.7586087,78.6569788,1767137,618575974,sports.bz2
           

每一行代表一個檔案,各個字段分别代表:cellID,x1,y1,x2,y2,recordcount,filesize,filename

cellID為filename的hashcode

(x1,y1)-(x2,y2)為該檔案的最小包圍矩形

recordcount為該檔案資料記錄數

filesize為該檔案未壓縮的大小

filename為該檔案名