天天看點

Leveldb源碼分析--20

12 DB的打開

先分析LevelDB是如何打開db的,萬物始于建立。在打開流程中有幾個輔助函數:DBImpl(),DBImpl::Recover, DBImpl::DeleteObsoleteFiles, DBImpl::RecoverLogFile, DBImpl::MaybeScheduleCompaction。

12.1 DB::Open()

打開一個db,進行PUT、GET操作,就是前面的靜态函數DB::Open的工作。如果操作成功,它就傳回一個db指針。前面說過DB就是一個接口類,其具體實作在DBImp類中,這是一個DB的子類。

函數聲明為:

Status DB::Open(const Options& options, const std::string&dbname, DB** dbptr);

分解來看,Open()函數主要有以下5個執行步驟。

S1 建立DBImpl對象,其後進入DBImpl::Recover()函數執行S2和S3。

S2 從已存在的db檔案恢複db資料,根據CURRENT記錄的MANIFEST檔案讀取db元資訊;這通過調用VersionSet::Recover()完成。

S3 然後過濾出那些最近的更新log,前一個版本可能新加了這些log,但并沒有記錄在MANIFEST中。然後依次根據時間順序,調用DBImpl::RecoverLogFile()從舊到新回放這些操作log。回放log時可能會修改db元資訊,比如dump了新的level 0檔案,是以它将傳回一個VersionEdit對象,記錄db元資訊的變動。

S4 如果DBImpl::Recover()傳回成功,就執行VersionSet::LogAndApply()應用VersionEdit,并儲存目前的DB資訊到新的MANIFEST檔案中。

S5 最後删除一些過期檔案,并檢查是否需要執行compaction,如果需要,就啟動背景線程執行。

下面就來具體分析Open函數的代碼,在Open函數中涉及到上面的3個流程。

S1 首先建立DBImpl對象,鎖定并試圖做Recover操作。Recover操作用來處理建立flag,比如存在就傳回失敗等等,嘗試從已存在的sstable檔案恢複db。并傳回db元資訊的變動資訊,一個VersionEdit對象。

DBImpl* impl = newDBImpl(options, dbname);
  impl->mutex_.Lock(); // 鎖db
  VersionEdit edit;
  Status s =impl->Recover(&edit); // 處理flag&恢複:create_if_missing,error_if_exists
           

S2 如果Recover傳回成功,則調用VersionSet取得新的log檔案編号——實際上是在目前基礎上+1,準備新的log檔案。如果log檔案建立成功,則根據log檔案建立log::Writer。然後執行VersionSet::LogAndApply,根據edit記錄的增量變動生成新的current version,并寫入MANIFEST檔案。

函數NewFileNumber(){returnnext_file_number_++;},直接傳回next_file_number_。

uint64_t new_log_number =impl->versions_->NewFileNumber();
    WritableFile* lfile;
    s =options.env->NewWritableFile(LogFileName(dbname, new_log_number),&lfile);
    if (s.ok()) {
      edit.SetLogNumber(new_log_number);
      impl->logfile_ = lfile;
      impl->logfile_number_ =new_log_number;
      impl->log_ = newlog::Writer(lfile);
      s =impl->versions_->LogAndApply(&edit, &impl->mutex_);
    }
           

 S3 如果VersionSet::LogAndApply傳回成功,則删除過期檔案,檢查是否需要執行compaction,最終傳回建立的DBImpl對象。

if (s.ok()) {
     impl->DeleteObsoleteFiles();
     impl->MaybeScheduleCompaction();
    }
  impl->mutex_.Unlock();
  if (s.ok()) *dbptr = impl;
  return s;
           

以上就是DB::Open的主題邏輯。

12.2 DBImpl::DBImpl()

構造函數做的都是初始化操作,DBImpl::DBImpl(const Options& options, const std::string&dbname)

首先是初始化清單中,直接根據參數指派,或者直接初始化。Comparator和filter policy都是參數傳入的。在傳遞option時會首先将option中的參數合法化,logfile_number_初始化為0,指針初始化為NULL。

建立MemTable,并增加引用計數,建立WriteBatch。

mem_(newMemTable(internal_comparator_)),
      tmp_batch_(new WriteBatch),
      mem_->Ref();
  // 然後在函數體中,建立TableCache和VersionSet。
  // 為其他預留10個檔案,其餘的都給TableCache.
  const int table_cache_size =options.max_open_files - 10;
  table_cache_ = newTableCache(dbname_, &options_, table_cache_size);
  versions_ = newVersionSet(dbname_, &options_, table_cache_, &internal_comparator_);
           

12.3 DBImp::NewDB()

當外部在調用DB::Open()時設定了option指定如果db不存在就建立,如果db不存在leveldb就會調用函數建立新的db。判斷db是否存在的依據是<db name>/CURRENT檔案是否存在。其邏輯很簡單。

// S1首先生産DB元資訊,設定comparator名,以及log檔案編号、檔案編号,以及seq no。
  VersionEdit new_db;
  new_db.SetComparatorName(user_comparator()->Name());
  new_db.SetLogNumber(0);
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);
// S2 生産MANIFEST檔案,将db元資訊寫入MANIFEST檔案。
  const std::string manifest =DescriptorFileName(dbname_, 1);
  WritableFile* file;
  Status s =env_->NewWritableFile(manifest, &file);
  if (!s.ok()) return s;
  {
    log::Writer log(file);
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
    if (s.ok()) s =file->Close();
  }
  delete file;
// S3 如果成功,就把MANIFEST檔案名寫入到CURRENT檔案中
  if (s.ok()) s =SetCurrentFile(env_, dbname_, 1);
  elseenv_->DeleteFile(manifest);
  return s;
           

這就是建立新DB的邏輯,很簡單。

12.4 DBImpl::Recover()

函數聲明為:StatusDBImpl::Recover(VersionEdit* edit),如果調用成功則設定VersionEdit。Recover的基本功能是:首先是處理建立flag,比如存在就傳回失敗等等;然後是嘗試從已存在的sstable檔案恢複db;最後如果發現有大于原資訊記錄的log編号的log檔案,則需要回放log,更新db資料。回放期間db可能會dump新的level 0檔案,是以需要把db元資訊的變動記錄到edit中傳回。函數邏輯如下:

S1 建立目錄,目錄以db name命名,忽略任何建立錯誤,然後嘗試擷取db name/LOCK檔案鎖,失敗則傳回。

    env_->CreateDir(dbname_);

    Status s =env_->LockFile(LockFileName(dbname_), &db_lock_);

    if (!s.ok()) return s;

S2 根據CURRENT檔案是否存在,以及option參數執行檢查。

如果檔案不存在&create_is_missing=true,則調用函數NewDB()建立;否則報錯。

如果檔案存在& error_if_exists=true,則報錯。

S3 調用VersionSet的Recover()函數,就是從檔案中恢複資料。如果出錯則打開失敗,成功則向下執行S4。

    s = versions_->Recover();

S4嘗試從所有比manifest檔案中記錄的log要新的log檔案中恢複(前一個版本可能會添加新的log檔案,卻沒有記錄在manifest中)。另外,函數PrevLogNumber()已經不再用了,僅為了相容老版本。

//  S4.1 這裡先找出所有滿足條件的log檔案:比manifest檔案記錄的log編号更新。
  SequenceNumber max_sequence(0);
  const uint64_t min_log =versions_->LogNumber();
  const uint64_t prev_log =versions_->PrevLogNumber();
  std::vector<std::string>filenames;
  s =env_->GetChildren(dbname_, &filenames); // 列出目錄内的所有檔案
  uint64_t number;
  FileType type;
  std::vector<uint64_t>logs;
  for (size_t i = 0; i <filenames.size(); i++) { // 檢查log檔案是否比min log更新
    if(ParseFileName(filenames[i], &number, &type) && type ==kLogFile
        && ((number >=min_log) || (number == prev_log))) {
      logs.push_back(number);
    }
  }
//  S4.2 找到log檔案後,首先排序,保證按照生成順序,依次回放log。并把DB元資訊的變動(sstable檔案的變動)追加到edit中傳回。
    std::sort(logs.begin(),logs.end());
    for (size_t i = 0; i <logs.size(); i++) {
      s = RecoverLogFile(logs[i],edit, &max_sequence);
      // 前一版可能在生成該log編号後沒有記錄在MANIFEST中,
     //是以這裡我們手動更新VersionSet中的檔案編号計數器
     versions_->MarkFileNumberUsed(logs[i]);
}
//  S4.3 更新VersionSet的sequence
    if (s.ok()) {
      if(versions_->LastSequence() < max_sequence)
         versions_->SetLastSequence(max_sequence);
}
           

上面就是Recover的執行流程。

12.5 DBImpl::DeleteObsoleteFiles()

這個是垃圾回收函數,如前所述,每次compaction和recovery之後都會有檔案被廢棄。DeleteObsoleteFiles就是删除這些垃圾檔案的,它在每次compaction和recovery完成之後被調用。

其調用點包括:DBImpl::CompactMemTable,DBImpl::BackgroundCompaction, 以及DB::Open的recovery步驟之後。

它會删除所有過期的log檔案,沒有被任何level引用到、或不是正在執行的compaction的output的sstable檔案。

該函數沒有參數,其代碼邏輯也很直覺,就是列出db的所有檔案,對不同類型的檔案分别判斷,如果是過期檔案,就删除之,如下:

// S1 首先,確定不會删除pending檔案,将versionset正在使用的所有檔案加入到live中。
  std::set<uint64_t> live =pending_outputs_;
  versions_->AddLiveFiles(&live); //該函數其後分析
// S2 列舉db的所有檔案
  std::vector<std::string>filenames;
  env_->GetChildren(dbname_,&filenames);
// S3 周遊所有列舉的檔案,根據檔案類型,分别處理;
  uint64_t number;
  FileType type;
  for (size_t i = 0; i <filenames.size(); i++) {
     if (ParseFileName(filenames[i], &number,&type)) {
         bool keep = true; //false表明是過期檔案
         // S3.1 kLogFile,log檔案,根據log編号判斷是否過期
              keep = ((number >=versions_->LogNumber()) ||
                  (number ==versions_->PrevLogNumber()));
         // S3.2 kDescriptorFile,MANIFEST檔案,根據versionset記錄的編号判斷
              keep = (number >=versions_->ManifestFileNumber());
         // S3.3 kTableFile,sstable檔案,隻要在live中就不能删除
         // S3.4 kTempFile,如果是正在寫的檔案,隻要在live中就不能删除
              keep = (live.find(number) != live.end());
         // S3.5 kCurrentFile,kDBLockFile, kInfoLogFile,不能删除
              keep = true;
     // S3.6 如果keep為false,表明需要删除檔案,如果是table還要從cache中删除
          if (!keep) {
             if(type == kTableFile) table_cache_->Evict(number);
             Log(options_.info_log, "Delete type=%d #%lld\n",type, number);
             env_->DeleteFile(dbname_ + "/" +filenames[i]);
          }
     }
  }
           

這就是删除過期檔案的邏輯,其中調用到了VersionSet::AddLiveFiles函數,保證不會删除active的檔案。

函數DbImpl::MaybeScheduleCompaction()放在Compaction一節分析,基本邏輯就是如果需要compaction,就啟動背景線程執行compaction操作。

12.6 DBImpl::RecoverLogFile()

函數聲明:StatusRecoverLogFile(uint64_t log_number, VersionEdit* edit,SequenceNumber* max_sequence)

參數說明:

@log_number是指定的log檔案編号

@edit記錄db元資訊的變化——sstable檔案變動

@max_sequence 傳回max{log記錄的最大序号, *max_sequence}

該函數打開指定的log檔案,回放日志。期間可能會執行compaction,生産新的level 0sstable檔案,記錄檔案變動到edit中。

它聲明了一個局部類LogReporter以列印錯誤日志,沒什麼好說的,下面來看代碼邏輯。

// S1 打開log檔案傳回SequentialFile*file,出錯就傳回,否則向下執行S2。
// S2 根據log檔案句柄file建立log::Reader,準備讀取log。
  log::Reader reader(file,&reporter, true/*checksum*/,0/*initial_offset*/);
// S3 依次讀取所有的log記錄,并插入到新生成的memtable中。這裡使用到了批量更新接口WriteBatch,具體後面再分析。
  std::string scratch;
  Slice record;
  WriteBatch batch;
  MemTable* mem = NULL;
  while(reader.ReadRecord(&record, &scratch) && status.ok()) { // 讀取全部log
    if (record.size() < 12) { // log資料錯誤,不滿足最小長度12
     reporter.Corruption(record.size(), Status::Corruption("log recordtoo small"));
      continue;
    }
   WriteBatchInternal::SetContents(&batch, record); // log内容設定到WriteBatch中
   if (mem == NULL) { // 建立memtable
      mem = new MemTable(internal_comparator_);
      mem->Ref();
    }
    status =WriteBatchInternal::InsertInto(&batch, mem); // 插入到memtable中
    MaybeIgnoreError(&status);
    if (!status.ok()) break;
    const SequenceNumber last_seq=
       WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch)- 1;
    if (last_seq >*max_sequence) *max_sequence = last_seq; // 更新max sequence
    // 如果mem的記憶體超過設定值,則執行compaction,如果compaction出錯,
    // 立刻傳回錯誤,DB::Open失敗
    if(mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
      status =WriteLevel0Table(mem, edit, NULL);
      if (!status.ok()) break;
      mem->Unref(); // 釋放目前memtable
      mem = NULL;
    }
  }
// S4 掃尾工作,如果mem != NULL,說明還需要dump到新的sstable檔案中。
  if (status.ok() && mem!= NULL) {// 如果compaction出錯,立刻傳回錯誤
    status = WriteLevel0Table(mem,edit, NULL);
  }
  if (mem != NULL)mem->Unref();
  delete file;
  return status;
           

把MemTabledump到sstable是函數WriteLevel0Table的工作,其實這是compaction的一部分,準備放在compaction一節來分析。

12.7 小結

如上DB打開的邏輯就已經分析完了,打開邏輯參見DB::Open()中描述的5個步驟。此外還有兩個東東:把Memtable dump到sstable的WriteLevel0Table()函數,以及批量修改WriteBatch。第一個放在後面的compaction一節,第二個放在DB更新操作中。接下來就是db的關閉。

13 DB的關閉&銷毀

13.1 DB關閉

外部調用者通過DB::Open()擷取一個DB*對象,如果要關閉打開的DB*db對象,則直接delete db即可,這會調用到DBImpl的析構函數。

析構依次執行如下的5個邏輯:

S1 等待背景compaction任務結束

S2 釋放db檔案鎖,<dbname>/lock檔案

S3 删除VersionSet對象,并釋放MemTable對象

S4 删除log相關以及TableCache對象

S5 删除options的block_cache以及info_log對象

13.2 DB銷毀

函數聲明:StatusDestroyDB(const std::string& dbname, const Options& options)

該函數會删除掉db的資料内容,要謹慎使用。函數邏輯為:

S1 擷取dbname目錄的檔案清單到filenames中,如果為空則直接傳回,否則進入S2。

S2 鎖檔案<dbname>/lock,如果鎖成功就執行S3

S3 周遊filenames檔案清單,過濾掉lock檔案,依次調用DeleteFile删除。

S4 釋放lock檔案,并删除之,然後删除檔案夾。

Destory就執行完了,如果删除檔案出現錯誤,記錄之,依然繼續删除下一個。最後傳回錯誤代碼。

看來這一章很短小。DB的打開關閉分析完畢。

繼續閱讀