天天看點

Rocksdb 通過ingestfile 來支援高效的離線資料導入

文章目錄

  • ​​前言​​
  • ​​使用方式​​
  • ​​實作原理​​
  • ​​總結​​

前言

很多時候,我們使用資料庫時會有離線向資料庫導入資料的需求。比如大量使用者在本地的一些離線資料,想要将這一些資料導入到已有的資料庫中;或者說NewSQL場景中部分機器離線,重新上線之後的資料增量/全量同步 等場景。這個時候 我們并不想要讓這一些資料占用過多的系統資源,更不希望他們對正常的線上業務有影響,是以盡可能高效得完成這一些資料的同步就需要深入設計一番。

而如果底層引擎使用的是rocksdb,那就非常省事了,隻需要組織好你們的資料調用接口就完事了,剩下的導入過程由引擎完成。 tikv便是通過 rocksdb的這個功能完成叢集異常恢複之後 region之間的全量增量同步的。回到今天我們要讨論的主題,便是rocksdb的這個資料導入過程是如何盡可能快、盡可能高效得完成的。

使用方式

講解實作原理之前我們先看看如何使用這個功能,功能的易用性也很重要,使用者還是希望盡可能得少寫代碼來完成這個工作。使用上主要是兩部分:建立SST檔案 和 導入SST檔案。

  • 建立sst檔案:這一步主要是通過一個sst_filter_writer,将需要導入的 k/v 資料轉換成sst檔案
需要注意的是:
  1. 使用者k/v 資料需要按照options.comparator 嚴格有序,預設是按照key的字典序
  2. 這裡的options 建議和db寫入的options用一套(壓縮選項,sst檔案相關選項等)
Options options;

SstFileWriter sst_file_writer(EnvOptions(), options);
// 指定形成的sst檔案的路徑
std::string file_path = "/home/usr/file1.sst";

// open file_path
Status s = sst_file_writer.Open(file_path);
for (...) {
  // 寫入sst,使用者保證k/v 的順序
  s = sst_file_writer.Put(key, value);
  if (!s.ok()) {
    printf("Error while adding Key: %s, Error: %s\n", key.c_str(),
           s.ToString().c_str());
    return 1;
  }
}

// 完成寫入
s = sst_file_writer.Finish();      
  • 導入sst檔案:這個步驟就是将建立好的一個或者多個sst檔案導入到db中,也允許向多個cf中導入
IngestExternalFileOptions ifo;
// Ingest the 2 passed SST files into the DB
// 導入資料
Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo);      

使用還是比較簡單的,整體的使用過程如下:

#include <iostream>
#include <vector>

#include <gflags/gflags.h>

#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/sst_file_writer.h>


#define DATA_SIZE 10
#define VALUE_SIZE 1024

using namespace std;

// 比較函數
bool cmp(pair<string, string> str1,
         pair<string, string> str2) {
  if(str1.first < str2.first) {
    return true;
  } else if (str1.first == str2.first 
            && str1.second < str2.second) {
    return true;
  } else {
    return false;
  }
}

// 随機字元串
static string rand_data(long data_range) {
    char buff[30];
    unsigned long long num = 1;
    for (int i = 0;i < 4; ++i) {
        num *= (unsigned long long )rand();
    }

    sprintf(buff, "%llu", num % (unsigned long long)data_range );
    string data(buff);

    return data;
}

// 構造有序資料
void construct_data(vector<pair<string,
                    string>> &input) {
  int i;
  string key;
  string value;

  for (i = 0;i < DATA_SIZE; i++) {
    if(key == "0") {
      continue;
    }
    key = rand_data(VALUE_SIZE);
    value = rand_data(VALUE_SIZE);

    input.push_back(make_pair(key, value));
  }
}

void traverse_data(vector<pair<string,string>> input) {
  int i;
  for(auto data : input) {
    cout << data.first << " " << data.second << endl;
  }
}

// 建立sst檔案
int create_sst(string file_path) {
  vector<pair<string,string>> input;
  vector<pair<string,string>>::iterator input_itr;
  rocksdb::Options option;

  /* open statistics and disable compression */
  option.create_if_missing = true;
  option.compression = rocksdb::CompressionType::kNoCompression;

  rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), option);

  rocksdb::Status s = sst_file_writer.Open(file_path);
  if (!s.ok()) {
    printf("Error while opening file %s, Error: %s\n", 
        file_path.c_str(),s.ToString().c_str());
    return 1;
  }

  // 需要保證資料有序後再寫入
  construct_data(input);
  sort(input.begin(), input.end(), cmp);
  traverse_data(input);
  
  // Insert rows into the SST file, note that inserted keys must be 
  // strictly increasing (based on options.comparator)
  for (input_itr = input.begin(); input_itr != input.end();
                                  input_itr ++) {
    rocksdb::Slice key(input_itr->first);
    rocksdb::Slice value(input_itr->second);

    s = sst_file_writer.Put(key, value);
    if (!s.ok()) {
      printf("Error while adding Key: %s, Error: %s\n",
          key.ToString().c_str(),s.ToString().c_str());
      return 1;
    }
  }

  // Close the file
  s = sst_file_writer.Finish();
  if (!s.ok()) {
    printf("Error while finishing file %s, Error: %s\n", 
        file_path.c_str(),s.ToString().c_str());
    return 1;
  }

  return 0;
}

static rocksdb::DB *db;

void create_db() {
  rocksdb::Options option;

  /* open statistics and disable compression */
  option.create_if_missing = true;
  option.compression = rocksdb::CompressionType::kNoCompression;

  rocksdb::Status s = rocksdb::DB::Open( option,"./db", &db);
  if (!s.ok()) {
    printf("Open db failed : %s\n", s.ToString().c_str());
    return;
  }
}

void db_write(int num_keys) {
  rocksdb::WriteOptions write_option;
  write_option.sync = true;

  rocksdb::Slice key;
  rocksdb::Slice value;
  rocksdb::Status s;
  int i;

  printf("begin write \n");
  for (i = 0;i < num_keys; i++) {
    key = rand_data(VALUE_SIZE);
    value = rand_data(VALUE_SIZE);

    s = db->Put(write_option, key, value);
    if (!s.ok()) {
      printf("Put db failed : %s\n", s.ToString().c_str());
      return;
    }
  }

  db->Flush(rocksdb::FlushOptions());
  printf("finish write \n");
}

int main() {

  // 先寫入一批資料
  create_db();
  db_write(100000);
  
  // 建立sst檔案
  if (create_sst("./test.sst") == 0) {
    printf("creates sst success !\n");
  } else {
    printf("creates sst failed !\n");
  }
  
  // 導入資料
  rocksdb::IngestExternalFileOptions ifo;
  // Ingest the 2 passed SST files into the DB
  printf("Ingest sst !\n");
  rocksdb::Status s = db->IngestExternalFile({"test.sst"}, ifo);
  if (!s.ok()) {
    printf("Error while adding file test.sst , Error %s\n",
           s.ToString().c_str());
    return 1;
  }

  return 0;
}      

運作輸出如下:

begin write 
finish write
# consturct data,需按照字典序,如果沒有按照字典序構造的話會報錯
1008 232
240 880
288 63
410 768
506 56
534 256
640 180
72 248
800 672
944 217
creates sst success !      

通過db日志可以看到我們建立的sst檔案​

​test.sst​

​​被成功導入到db,形成了​

​./db/000020.sst​

​,且在db目錄中。

╰─$ cat db/LOG |grep ingested
[AddFile] External SST file test.sst was ingested in L0 with path ./db/000020.sst (global_seqno=200012)
  
╰─$ ls db
000017.log               000020.sst               IDENTITY                 LOG                      LOG.old.1618643738564935 OPTIONS-000008
000019.sst               CURRENT                  LOCK                     LOG.old.1618123487361092 MANIFEST-000013          OPTIONS-000016      

實作原理

從如何使用這個功能上我們能夠感覺到這一些資料并不是通過rocksdb正常的I/O流程寫入的。如果使用正常的接口,那我們使用者不需要排序,而是直接通過​

​db->Put​

​​接口将k/v寫入,凡事都有但是,但是這樣來導入離線資料在rocksdb内部後續的flush/compaction 都會消耗大量的系統資源,而這并不是我們想要的高效。是以,rocksdb提供的​

​ingest​

​接口肯定不會讓這一些要導入的資料消耗過多的資源,接下來我們一起看看底層的詳細實作。

為了更形象得告訴大家在rocksdb作為存儲引擎的場景,如果通過傳統的put接口導入資料會多出哪一些I/O,如下圖

Rocksdb 通過ingestfile 來支援高效的離線資料導入

其中紅色的尖頭 是ingest file 相比于傳統的put接口 少的I/O部分,可以說ingest方式導入資料極大得節約了整個系統資源的開銷(包括但不限于I/O , CPU 資源的開銷)。

下面主要介紹的是有了sst檔案,接下來如何導入到db中的過程。關于通過​

​sst_file_writer​

​建立具體的sst檔案的過程就不多說了,也就是按照sst檔案的格式(datablock,index block…footer)等将有序的資料一個個添加進去而已。

主要有如下幾步:

  1. 為待插入的sst檔案建立file link到db目錄,或者直接拷貝進去
  2. 停止寫入,需要保證即将導入的sst檔案在db中擁有一個安全合理的seqno,如果持續寫入,那這個seqno可能不會全局遞增了。
  3. 檢查導入的sst檔案是否和memtable中的key-range有重疊,有的話需要flush memtable
  4. 為這個sst檔案 按照其key-range挑選一個合适的level放進去
  5. 為這個問天添加一個全局的seqno
  6. 恢複db的寫入

其中停止寫入到恢複寫入這段時間對于使用者來說越小越好,是以ingest的性能很重要。

接下來看看詳細的源代碼實作:

導入資料的函數入口是​

​DBImpl::IngestExternalFiles​

導入的sst檔案最後都需要形成一個db内部的sst檔案,因為這個時候已經停止寫入了,是以會從最新的sst檔案編号之後取一個檔案編号,後續的其他要導入的sst檔案會不斷追加。

Status DBImpl::IngestExternalFiles(
    const std::vector<IngestExternalFileArg>& args) {
  ...
    
  // 構造檔案編号到next_file_number中
  Status status = ReserveFileNumbersBeforeIngestion(
      static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
      pending_output_elem, &next_file_number);
  if (!status.ok()) {
    InstrumentedMutexLock l(&mutex_);
    ReleaseFileNumberFromPendingOutputs(pending_output_elem);
    return status;
  }
  ...
}      

有了在db内部的合法檔案編号,我們就可以進行檔案遷移了,将待導入的sst檔案遷移到db内部已經構造好的sst檔案編号之中。

會為每一個cf構造一個ingest_job, 将待導入檔案拷貝/移動到 db内部的sst檔案中,這個過程是在接下來的​

​Prepare​

​函數中。

uint64_t start_file_number = next_file_number;
  for (size_t i = 1; i != num_cfs; ++i) {
    start_file_number += args[i - 1].external_files.size();
    auto* cfd =
        static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
    SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
    // prepare 函數
    exec_results[i].second = ingestion_jobs[i].Prepare(
        args[i].external_files, start_file_number, super_version);
    exec_results[i].first = true;
    CleanupSuperVersion(super_version);
  }      

看看​

​Prepare​

​的函數實作:

  1. 拿着輸入的多個sst檔案,如果有多個,則需要檢查這一些檔案之間是否有重疊key,有的話就不支援了(rocksdb除了l0,其他層不允許有重疊key)。
  2. 根據使用者指定的ingest option: move_files 是否為true,來将待導入檔案move到db中, 如果move失敗了就拷貝檔案。
Status ExternalSstFileIngestionJob::Prepare(
    const std::vector<std::string>& external_files_paths,
    uint64_t next_file_number, SuperVersion* sv) {
  
  // 解析檔案資訊
  for (const std::string& file_path : external_files_paths) {
    IngestedFileInfo file_to_ingest;
    status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);
    if (!status.ok()) {
      return status;
    }
    files_to_ingest_.push_back(file_to_ingest);
  }
  
  // 確定導入的多個sst檔案之間沒有重疊
  ......
  } else if (num_files > 1) {
    // Verify that passed files dont have overlapping ranges
    autovector<const IngestedFileInfo*> sorted_files;
    for (size_t i = 0; i < num_files; i++) {
      sorted_files.push_back(&files_to_ingest_[i]);
    }

    std::sort(
        sorted_files.begin(), sorted_files.end(),
        [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
          return sstableKeyCompare(ucmp, info1->smallest_internal_key,
                                   info2->smallest_internal_key) < 0;
        });

    // 如果有重疊的話,ingest也無法支援,因為在db中大于level0的更高層level内部的
    // sst檔案之間是不允許有重疊的,加速更高層的二分查找。
    for (size_t i = 0; i < num_files - 1; i++) {
      if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
                            sorted_files[i + 1]->smallest_internal_key) >= 0) {
        files_overlap_ = true;
        break;
      }
    }
  }
  ......
  // 根據使用者參數move檔案
  if (ingestion_options_.move_files) {
      status = env_->LinkFile(path_outside_db, path_inside_db);
      ...
  } else { // 否則就拷貝檔案
      f.copy_file = true;
   }

  if (f.copy_file) {
    TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
                             nullptr);
    // CopyFile also sync the new file.
    status = CopyFile(env_, path_outside_db, path_inside_db, 0,
                      db_options_.use_fsync);
  }
  ...
}      

到此,檔案就已經進入到了rocksdb 之中,ingest_job的prepare流程就結束了。

接下來 就到了我們前面介紹總步驟的第二步,停止使用者對目前db的寫入:

DBImpl::IngestExternalFiles
  WriteThread::EnterUnbatched      

其中​

​WriteThread::EnterUnbatched​

​函數會讓目前db的寫入線程都處于wait狀态。

接下來就是檢查目前要導入的檔案是否和memtable中的key-range有重疊,函數調用如下:

DBImpl::IngestExternalFiles
  ExternalSstFileIngestionJob::NeedsFlush
    ColumnFamilyData::RangesOverlapWithMemtables      

這個函數​

​ColumnFamilyData::RangesOverlapWithMemtables​

​會拿着從ingest files中構造好的key-range和memtable中的 key-range 進行對比,如果有重疊key,則會将memtable flush置為true

Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  ...
  Status status;
  // 拿着ingest files的range中的每一個key,看是否能夠從memtable中找到
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    // 從memtable中找
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    // 找到了,則置overlap為true
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  ...
}      

在後續的​

​DBImpl::FlushMemTable​

​函數中會flush memtable,不同的cf是分開進行的

DBImpl::IngestExternalFiles
  DBImpl::FlushMemTable      

接下來就開始了第四步和第五步的處理邏輯,需要為每一個落到db中的sst檔案挑選合适的level以及配置設定全局seqno,處理邏輯在​

​Run​

​函數中:

DBImpl::IngestExternalFiles
  ExternalSstFileIngestionJob::Run      

主要處理邏輯如下:

一個一個ingest file進行處理

  1. 選擇一個合适的level,将ingest file插入進去

    如果user配置了​

    ​allow_ingest_behind=true​

    ​,即允許導入的資料直接插入到最後一層的檔案位置,且ingest的時候配置的ingest option中​

    ​ingest_behind=true​

    ​,則會先嘗試插入到​

    ​bottomest level​

    ​,如果最後一層的檔案和待插入的檔案有重疊,則插入失敗。處理邏輯在​

    ​CheckLevelForIngestedBehindFile​

    ​函數之中。

    否則逐層周遊,找到第一個和這一些key-range有重疊的level即可。函數​

    ​AssignLevelAndSeqnoForIngestedFile​

  2. 找到了合适的level的同時會記錄一個​

    ​assigned_seqno​

    ​,是在目前​

    ​last_sequence​

    ​的基礎上+1得到的。函數​

    ​AssignLevelAndSeqnoForIngestedFile​

    ​之中。
  3. 為目前ingest_file 寫入一個global seq no, 并執行fsync/sync。函數​

    ​AssignGlobalSeqnoForIngestedFile​

    ​之中。
  4. 最後就是将當完成更新的ingest file的元資訊更新到​

    ​VersionEdit​

    ​之中。

接下來就進入尾聲了:

  1. 将更新的​

    ​VersionEdit​

    ​寫入到MANIFEST檔案之中
  2. 更新每個ingest file對應的cf資訊,并且排程compaction/flush, 因為之前ingest file時找的是有重疊key的一層。
  3. 恢複db的寫入
// 将`VersionEdit`寫入到MANIFEST檔案之中
            status =
          versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
                                 edit_lists, &mutex_, directories_.GetDbDir());
    }

    if (status.ok()) {
      for (size_t i = 0; i != num_cfs; ++i) {
        auto* cfd =
            static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
        if (!cfd->IsDropped()) {
          //更新每個ingest file對應的cf資訊,并且排程compaction/flush, 因為之前ingest file時找的是有重疊key的一層
          InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
                                             *cfd->GetLatestMutableCFOptions());
          ...
        }
      }
    }
        // 恢複db的寫入,喚醒db的其他所有的writer
        write_thread_.ExitUnbatched(&w);      

總結