
iceberg 源碼分析之 HadoopTableOperations




public class HadoopTableOperations implements TableOperations {
  private volatile TableMetadata currentMetadata = null;
  private volatile Integer version = null;
  private volatile boolean shouldRefresh = true;

  protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf {
    this.conf = conf;
    this.location = location;
    this.fileIO = fileIO;


  1. shouldRefresh 預設 true
  2. 調用 refresh 方法
  public TableMetadata current() {
    if (this.currentMetadata != null) {
      LOG.info("[shouldRefresh='{}', currentMetadata='{}']", this.shouldRefresh,
          this.currentMetadata.location() + "::" + this.currentMetadata.metadataFileLocation());
    if (shouldRefresh) {
      return refresh();
    return currentMetadata;


  1. 先調用 findVersion 根據 metadataRoot/version-hint.text 讀到 ver 值
  2. 自旋獲得 metadataFile 路徑: metadataRoot/v_$ver_$codec_metadata.json
  3. 調用 updateVersionAndMetadata 更新中繼資料: version/ currentMetadata/
  public TableMetadata refresh() {
    int ver = version != null ? version : findVersion();
    LOG.info("[ver='{}']", ver);
    try {
      Path metadataFile = getMetadataFile(ver);
      if (version == null && metadataFile == null && ver == 0) {
        // no v0 metadata means the table doesn't exist yet
        return null;
      } else if (metadataFile == null) {
        throw new ValidationException("Metadata file for version %d is missing", ver);

      // spin: nextMetadataFile == null
      // 在 commit 之後 ver 自增
      // 此處 spin 來保障讀最新的 ver
      Path nextMetadataFile = getMetadataFile(ver + 1);
      while (nextMetadataFile != null) {
        ver += 1;
        metadataFile = nextMetadataFile;
        nextMetadataFile = getMetadataFile(ver + 1);

      // 線程安全
      updateVersionAndMetadata(ver, metadataFile.toString());

      this.shouldRefresh = false;
      return currentMetadata;
    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to refresh the table");


  1. base metadata 是目前的中繼資料版本; temp metadata 是馬上要寫中繼資料;
  2. 寫 temp metadata.json 之後 rename 成 $ver+1 版本的最終版中繼資料
  3. 寫 version-hint.text 檔案
  4. 若有開關則删除 old 版本的 metadata.json 檔案
  public void commit(TableMetadata base, TableMetadata metadata) {
    LOG.info("start commit.");
    // 此處有個 caffeine 本地 cache
    Pair<Integer, TableMetadata> current = versionAndMetadata();
    if (base != current.second()) {
      throw new CommitFailedException("Cannot commit changes based on stale table metadata");

    if (base == metadata) {
      LOG.info("Nothing to commit.");

    Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
        "Hadoop path-based tables cannot be relocated");
        "Hadoop path-based tables cannot relocate metadata");

    String codecName = metadata.property(
    TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
    String fileExtension = TableMetadataParser.getFileExtension(codec);
    Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
    LOG.info("[base='{}', metadata='{}', tempMetadataFile='{}']", base.metadataFileLocation(),
        metadata.metadataFileLocation(), tempMetadataFile);
    TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
    LOG.info("TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));");

    int nextVersion = (current.first() != null ? current.first() : 0) + 1;
    Path finalMetadataFile = metadataFilePath(nextVersion, codec);
    LOG.info("nextVer='{}', finalMetaFile='{}'", nextVersion, finalMetadataFile);
    FileSystem fs = getFileSystem(tempMetadataFile, conf);

    try {
      if (fs.exists(finalMetadataFile)) {
        throw new CommitFailedException(
            "Version %d already exists: %s", nextVersion, finalMetadataFile);
    } catch (IOException e) {
      throw new RuntimeIOException(e,
          "Failed to check if next version exists: %s", finalMetadataFile);

    // this rename operation is the atomic commit operation
    LOG.info("start renameToFinal.");
    renameToFinal(fs, tempMetadataFile, finalMetadataFile);

    LOG.info("start writeVersionHint");
    // update the best-effort version pointer

    LOG.info("start deleteRemovedMetadataFiles");
    // 開關:
    // org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED
    deleteRemovedMetadataFiles(base, metadata);

    this.shouldRefresh = true;
    LOG.info("end commit.");