文章目錄
- 前言
- 使用方式
- 實作原理
- 總結
前言
很多時候,我們使用資料庫時會有離線向資料庫導入資料的需求。比如大量使用者在本地的一些離線資料,想要将這一些資料導入到已有的資料庫中;或者說NewSQL場景中部分機器離線,重新上線之後的資料增量/全量同步 等場景。這個時候 我們并不想要讓這一些資料占用過多的系統資源,更不希望他們對正常的線上業務有影響,是以盡可能高效得完成這一些資料的同步就需要深入設計一番。
而如果底層引擎使用的是rocksdb,那就非常省事了,隻需要組織好你們的資料調用接口就完事了,剩下的導入過程由引擎完成。 tikv便是通過 rocksdb的這個功能完成叢集異常恢複之後 region之間的全量增量同步的。回到今天我們要讨論的主題,便是rocksdb的這個資料導入過程是如何盡可能快、盡可能高效得完成的。
使用方式
講解實作原理之前我們先看看如何使用這個功能,功能的易用性也很重要,使用者還是希望盡可能得少寫代碼來完成這個工作。使用上主要是兩部分:建立SST檔案 和 導入SST檔案。
- 建立sst檔案:這一步主要是通過一個sst_filter_writer,将需要導入的 k/v 資料轉換成sst檔案
需要注意的是:
- 使用者k/v 資料需要按照options.comparator 嚴格有序,預設是按照key的字典序
- 這裡的options 建議和db寫入的options用一套(壓縮選項,sst檔案相關選項等)
Options options;
SstFileWriter sst_file_writer(EnvOptions(), options);
// 指定形成的sst檔案的路徑
std::string file_path = "/home/usr/file1.sst";
// open file_path
Status s = sst_file_writer.Open(file_path);
for (...) {
// 寫入sst,使用者保證k/v 的順序
s = sst_file_writer.Put(key, value);
if (!s.ok()) {
printf("Error while adding Key: %s, Error: %s\n", key.c_str(),
s.ToString().c_str());
return 1;
}
}
// 完成寫入
s = sst_file_writer.Finish();
- 導入sst檔案:這個步驟就是将建立好的一個或者多個sst檔案導入到db中,也允許向多個cf中導入
IngestExternalFileOptions ifo;
// Ingest the 2 passed SST files into the DB
// 導入資料
Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo);
使用還是比較簡單的,整體的使用過程如下:
#include <iostream>
#include <vector>
#include <gflags/gflags.h>
#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/sst_file_writer.h>
#define DATA_SIZE 10
#define VALUE_SIZE 1024
using namespace std;
// 比較函數
bool cmp(pair<string, string> str1,
pair<string, string> str2) {
if(str1.first < str2.first) {
return true;
} else if (str1.first == str2.first
&& str1.second < str2.second) {
return true;
} else {
return false;
}
}
// 随機字元串
static string rand_data(long data_range) {
char buff[30];
unsigned long long num = 1;
for (int i = 0;i < 4; ++i) {
num *= (unsigned long long )rand();
}
sprintf(buff, "%llu", num % (unsigned long long)data_range );
string data(buff);
return data;
}
// 構造有序資料
void construct_data(vector<pair<string,
string>> &input) {
int i;
string key;
string value;
for (i = 0;i < DATA_SIZE; i++) {
if(key == "0") {
continue;
}
key = rand_data(VALUE_SIZE);
value = rand_data(VALUE_SIZE);
input.push_back(make_pair(key, value));
}
}
void traverse_data(vector<pair<string,string>> input) {
int i;
for(auto data : input) {
cout << data.first << " " << data.second << endl;
}
}
// 建立sst檔案
int create_sst(string file_path) {
vector<pair<string,string>> input;
vector<pair<string,string>>::iterator input_itr;
rocksdb::Options option;
/* open statistics and disable compression */
option.create_if_missing = true;
option.compression = rocksdb::CompressionType::kNoCompression;
rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), option);
rocksdb::Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
printf("Error while opening file %s, Error: %s\n",
file_path.c_str(),s.ToString().c_str());
return 1;
}
// 需要保證資料有序後再寫入
construct_data(input);
sort(input.begin(), input.end(), cmp);
traverse_data(input);
// Insert rows into the SST file, note that inserted keys must be
// strictly increasing (based on options.comparator)
for (input_itr = input.begin(); input_itr != input.end();
input_itr ++) {
rocksdb::Slice key(input_itr->first);
rocksdb::Slice value(input_itr->second);
s = sst_file_writer.Put(key, value);
if (!s.ok()) {
printf("Error while adding Key: %s, Error: %s\n",
key.ToString().c_str(),s.ToString().c_str());
return 1;
}
}
// Close the file
s = sst_file_writer.Finish();
if (!s.ok()) {
printf("Error while finishing file %s, Error: %s\n",
file_path.c_str(),s.ToString().c_str());
return 1;
}
return 0;
}
static rocksdb::DB *db;
void create_db() {
rocksdb::Options option;
/* open statistics and disable compression */
option.create_if_missing = true;
option.compression = rocksdb::CompressionType::kNoCompression;
rocksdb::Status s = rocksdb::DB::Open( option,"./db", &db);
if (!s.ok()) {
printf("Open db failed : %s\n", s.ToString().c_str());
return;
}
}
void db_write(int num_keys) {
rocksdb::WriteOptions write_option;
write_option.sync = true;
rocksdb::Slice key;
rocksdb::Slice value;
rocksdb::Status s;
int i;
printf("begin write \n");
for (i = 0;i < num_keys; i++) {
key = rand_data(VALUE_SIZE);
value = rand_data(VALUE_SIZE);
s = db->Put(write_option, key, value);
if (!s.ok()) {
printf("Put db failed : %s\n", s.ToString().c_str());
return;
}
}
db->Flush(rocksdb::FlushOptions());
printf("finish write \n");
}
int main() {
// 先寫入一批資料
create_db();
db_write(100000);
// 建立sst檔案
if (create_sst("./test.sst") == 0) {
printf("creates sst success !\n");
} else {
printf("creates sst failed !\n");
}
// 導入資料
rocksdb::IngestExternalFileOptions ifo;
// Ingest the 2 passed SST files into the DB
printf("Ingest sst !\n");
rocksdb::Status s = db->IngestExternalFile({"test.sst"}, ifo);
if (!s.ok()) {
printf("Error while adding file test.sst , Error %s\n",
s.ToString().c_str());
return 1;
}
return 0;
}
運作輸出如下:
begin write
finish write
# consturct data,需按照字典序,如果沒有按照字典序構造的話會報錯
1008 232
240 880
288 63
410 768
506 56
534 256
640 180
72 248
800 672
944 217
creates sst success !
通過db日志可以看到我們建立的sst檔案
test.sst
被成功導入到db,形成了
./db/000020.sst
,且在db目錄中。
╰─$ cat db/LOG |grep ingested
[AddFile] External SST file test.sst was ingested in L0 with path ./db/000020.sst (global_seqno=200012)
╰─$ ls db
000017.log 000020.sst IDENTITY LOG LOG.old.1618643738564935 OPTIONS-000008
000019.sst CURRENT LOCK LOG.old.1618123487361092 MANIFEST-000013 OPTIONS-000016
實作原理
從如何使用這個功能上我們能夠感覺到這一些資料并不是通過rocksdb正常的I/O流程寫入的。如果使用正常的接口,那我們使用者不需要排序,而是直接通過
db->Put
接口将k/v寫入,凡事都有但是,但是這樣來導入離線資料在rocksdb内部後續的flush/compaction 都會消耗大量的系統資源,而這并不是我們想要的高效。是以,rocksdb提供的
ingest
接口肯定不會讓這一些要導入的資料消耗過多的資源,接下來我們一起看看底層的詳細實作。
為了更形象得告訴大家在rocksdb作為存儲引擎的場景,如果通過傳統的put接口導入資料會多出哪一些I/O,如下圖
其中紅色的尖頭 是ingest file 相比于傳統的put接口 少的I/O部分,可以說ingest方式導入資料極大得節約了整個系統資源的開銷(包括但不限于I/O , CPU 資源的開銷)。
下面主要介紹的是有了sst檔案,接下來如何導入到db中的過程。關于通過
sst_file_writer
建立具體的sst檔案的過程就不多說了,也就是按照sst檔案的格式(datablock,index block…footer)等将有序的資料一個個添加進去而已。
主要有如下幾步:
- 為待插入的sst檔案建立file link到db目錄,或者直接拷貝進去
- 停止寫入,需要保證即将導入的sst檔案在db中擁有一個安全合理的seqno,如果持續寫入,那這個seqno可能不會全局遞增了。
- 檢查導入的sst檔案是否和memtable中的key-range有重疊,有的話需要flush memtable
- 為這個sst檔案 按照其key-range挑選一個合适的level放進去
- 為這個問天添加一個全局的seqno
- 恢複db的寫入
其中停止寫入到恢複寫入這段時間對于使用者來說越小越好,是以ingest的性能很重要。
接下來看看詳細的源代碼實作:
導入資料的函數入口是
DBImpl::IngestExternalFiles
導入的sst檔案最後都需要形成一個db内部的sst檔案,因為這個時候已經停止寫入了,是以會從最新的sst檔案編号之後取一個檔案編号,後續的其他要導入的sst檔案會不斷追加。
Status DBImpl::IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) {
...
// 構造檔案編号到next_file_number中
Status status = ReserveFileNumbersBeforeIngestion(
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
pending_output_elem, &next_file_number);
if (!status.ok()) {
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
return status;
}
...
}
有了在db内部的合法檔案編号,我們就可以進行檔案遷移了,将待導入的sst檔案遷移到db内部已經構造好的sst檔案編号之中。
會為每一個cf構造一個ingest_job, 将待導入檔案拷貝/移動到 db内部的sst檔案中,這個過程是在接下來的
Prepare
函數中。
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) {
start_file_number += args[i - 1].external_files.size();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
// prepare 函數
exec_results[i].second = ingestion_jobs[i].Prepare(
args[i].external_files, start_file_number, super_version);
exec_results[i].first = true;
CleanupSuperVersion(super_version);
}
看看
Prepare
的函數實作:
- 拿着輸入的多個sst檔案,如果有多個,則需要檢查這一些檔案之間是否有重疊key,有的話就不支援了(rocksdb除了l0,其他層不允許有重疊key)。
- 根據使用者指定的ingest option: move_files 是否為true,來将待導入檔案move到db中, 如果move失敗了就拷貝檔案。
Status ExternalSstFileIngestionJob::Prepare(
const std::vector<std::string>& external_files_paths,
uint64_t next_file_number, SuperVersion* sv) {
// 解析檔案資訊
for (const std::string& file_path : external_files_paths) {
IngestedFileInfo file_to_ingest;
status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);
if (!status.ok()) {
return status;
}
files_to_ingest_.push_back(file_to_ingest);
}
// 確定導入的多個sst檔案之間沒有重疊
......
} else if (num_files > 1) {
// Verify that passed files dont have overlapping ranges
autovector<const IngestedFileInfo*> sorted_files;
for (size_t i = 0; i < num_files; i++) {
sorted_files.push_back(&files_to_ingest_[i]);
}
std::sort(
sorted_files.begin(), sorted_files.end(),
[&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
return sstableKeyCompare(ucmp, info1->smallest_internal_key,
info2->smallest_internal_key) < 0;
});
// 如果有重疊的話,ingest也無法支援,因為在db中大于level0的更高層level内部的
// sst檔案之間是不允許有重疊的,加速更高層的二分查找。
for (size_t i = 0; i < num_files - 1; i++) {
if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >= 0) {
files_overlap_ = true;
break;
}
}
}
......
// 根據使用者參數move檔案
if (ingestion_options_.move_files) {
status = env_->LinkFile(path_outside_db, path_inside_db);
...
} else { // 否則就拷貝檔案
f.copy_file = true;
}
if (f.copy_file) {
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
nullptr);
// CopyFile also sync the new file.
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
...
}
到此,檔案就已經進入到了rocksdb 之中,ingest_job的prepare流程就結束了。
接下來 就到了我們前面介紹總步驟的第二步,停止使用者對目前db的寫入:
DBImpl::IngestExternalFiles
WriteThread::EnterUnbatched
其中
WriteThread::EnterUnbatched
函數會讓目前db的寫入線程都處于wait狀态。
接下來就是檢查目前要導入的檔案是否和memtable中的key-range有重疊,函數調用如下:
DBImpl::IngestExternalFiles
ExternalSstFileIngestionJob::NeedsFlush
ColumnFamilyData::RangesOverlapWithMemtables
這個函數
ColumnFamilyData::RangesOverlapWithMemtables
會拿着從ingest files中構造好的key-range和memtable中的 key-range 進行對比,如果有重疊key,則會将memtable flush置為true
Status ColumnFamilyData::RangesOverlapWithMemtables(
const autovector<Range>& ranges, SuperVersion* super_version,
bool* overlap) {
...
Status status;
// 拿着ingest files的range中的每一個key,看是否能夠從memtable中找到
for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
auto* vstorage = super_version->current->storage_info();
auto* ucmp = vstorage->InternalComparator()->user_comparator();
InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
kValueTypeForSeek);
// 從memtable中找
memtable_iter->Seek(range_start.Encode());
status = memtable_iter->status();
ParsedInternalKey seek_result;
if (status.ok()) {
if (memtable_iter->Valid() &&
!ParseInternalKey(memtable_iter->key(), &seek_result)) {
status = Status::Corruption("DB have corrupted keys");
}
}
// 找到了,則置overlap為true
if (status.ok()) {
if (memtable_iter->Valid() &&
ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
*overlap = true;
} else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
ranges[i].limit)) {
*overlap = true;
}
}
}
...
}
在後續的
DBImpl::FlushMemTable
函數中會flush memtable,不同的cf是分開進行的
DBImpl::IngestExternalFiles
DBImpl::FlushMemTable
接下來就開始了第四步和第五步的處理邏輯,需要為每一個落到db中的sst檔案挑選合适的level以及配置設定全局seqno,處理邏輯在
Run
函數中:
DBImpl::IngestExternalFiles
ExternalSstFileIngestionJob::Run
主要處理邏輯如下:
一個一個ingest file進行處理
-
選擇一個合适的level,将ingest file插入進去
如果user配置了
,即允許導入的資料直接插入到最後一層的檔案位置,且ingest的時候配置的ingest option中allow_ingest_behind=true
,則會先嘗試插入到ingest_behind=true
,如果最後一層的檔案和待插入的檔案有重疊,則插入失敗。處理邏輯在bottomest level
CheckLevelForIngestedBehindFile
函數之中。
否則逐層周遊,找到第一個和這一些key-range有重疊的level即可。函數
AssignLevelAndSeqnoForIngestedFile
- 找到了合适的level的同時會記錄一個
,是在目前assigned_seqno
的基礎上+1得到的。函數last_sequence
之中。AssignLevelAndSeqnoForIngestedFile
- 為目前ingest_file 寫入一個global seq no, 并執行fsync/sync。函數
之中。AssignGlobalSeqnoForIngestedFile
- 最後就是将當完成更新的ingest file的元資訊更新到
之中。VersionEdit
接下來就進入尾聲了:
- 将更新的
寫入到MANIFEST檔案之中VersionEdit
- 更新每個ingest file對應的cf資訊,并且排程compaction/flush, 因為之前ingest file時找的是有重疊key的一層。
- 恢複db的寫入
// 将`VersionEdit`寫入到MANIFEST檔案之中
status =
versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
edit_lists, &mutex_, directories_.GetDbDir());
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (!cfd->IsDropped()) {
//更新每個ingest file對應的cf資訊,并且排程compaction/flush, 因為之前ingest file時找的是有重疊key的一層
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
*cfd->GetLatestMutableCFOptions());
...
}
}
}
// 恢複db的寫入,喚醒db的其他所有的writer
write_thread_.ExitUnbatched(&w);