leveldb第二幕 简单的Put路径
- 1 静态库与测试程序
- 2 调用路径
仅做个人记录
1 静态库与测试程序
以Debug模式生成静态库
mkdir -p build_dbg && cd build_dbg
cmake -DCMAKE_BUILD_TYPE=Debug .. && cmake --build .
cp libleveldb.a libleveldb_dbg.a
mv libleveldb_dbg.a /usr/local/lib/
ls /usr/local/lib/

测试程序
#include <cassert>
#include <iostream>
#include <string>
#include <chrono>
#include <leveldb/db.h>
int main() {
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true; // 不存在时创建数据库
leveldb::Status status = leveldb::DB::Open(options, "./testdb", &db);
std::string key = "aaa";
std::string value = "bbb";
leveldb::Status s = db->Put(leveldb::WriteOptions(), key, value);
assert(status.ok());
delete db;
return 0;
}
编译调试
g++ test.cpp -o test -g -lleveldb_dbg -pthread
gdb ./test
2 调用路径
首先在DB::Put函数
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
Breakpoint 1, leveldb::DB::Put (this=0x4, opt=..., key=..., value=...) at /root/leveldb/leveldb/db/db_impl.cc:1483
1483 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
(gdb) s
1484 WriteBatch batch;
(gdb) p key
$7 = (const leveldb::Slice &) @0x7fffffffde30: {data_ = 0x7fffffffdec0 "aaa", size_ = 3}
(gdb) p value
$8 = (const leveldb::Slice &) @0x7fffffffde40: {data_ = 0x7fffffffdee0 "bbb", size_ = 3}
(gdb) n
1485 batch.Put(key, value);
(gdb) n
1486 return Write(opt, &batch);
(gdb) p batch
$9 = {rep_ = "\000\000\000\000\000\000\000\000\001\000\000\000\001\003aaa\003bbb"}
(gdb) p batch.rep_.size()
$10 = 21
(gdb) p batch.rep_.data()
$11 = 0x5555555d0da0 ""
(gdb) x/21xb 0x5555555d0da0
0x5555555d0da0: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x5555555d0da8: 0x01 0x00 0x00 0x00 0x01 0x03 0x61 0x61
0x5555555d0db0: 0x61 0x03 0x62 0x62 0x62
(gdb)
rep_数据构成
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring |
// kTypeDeletion varstring
// varstring :=
// len: varint32
// data: uint8[len]
填充代码
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 // sequence
0x01 0x00 0x00 0x00 // count
0x01 // kTypeValue
0x03 0x61 0x61 0x61 // 3 aaa
0x03 0x62 0x62 0x62 // 3 bbb
然后到DBImpl::Write函数
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
在MakeRoomForWrite函数中于mem_->ApproximateMemoryUsage() <= options_.write_buffer_size分支处跳出
1348 (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
(gdb) p mem_->ApproximateMemoryUsage()
$1 = 4104
(gdb) p options_.write_buffer_size
$2 = 4194304
系统只有一个写请求,BuildBatchGroup函数没有Append其他请求
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
(gdb) p write_batch->rep_.data()
$12 = 0x5555555d0da0 "\001"
(gdb) x/21xb 0x5555555d0da0 // 仅序列号发生改变
0x5555555d0da0: 0x01 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x5555555d0da8: 0x01 0x00 0x00 0x00 0x01 0x03 0x61 0x61
0x5555555d0db0: 0x61 0x03 0x62 0x62 0x62
而后跳到Writer::AddRecord函数
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
s = EmitPhysicalRecord(type, ptr, fragment_length);
(gdb) p *this
$13 = {dest_ = 0x5555555d3770, block_offset_ = 0, type_crc_ = {1383945041, 2685849682, 3007718310, 1093509285, 2514994254}}
(gdb) p kBlockSize
$14 = 32768
(gdb) p type
$15 = leveldb::log::kFullType
(gdb) p fragment_length
$16 = 21
进入Writer::EmitPhysicalRecord函数
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);
// Format the header
char buf[kHeaderSize];
buf[4] = static_cast<char>(length & 0xff);
buf[5] = static_cast<char>(length >> 8);
buf[6] = static_cast<char>(t);
// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + length;
return s;
}
查看头部信息
99 Status s = dest_->Append(Slice(buf, kHeaderSize));
(gdb) x/7xb buf
0x7fffffffdba1: 0x69 0x09 0xb5 0x6b 0x15 0x00 0x01
(gdb) p crc
$17 = 1807026537
(gdb) p/x crc
$18 = 0x6bb50969
0x69 0x09 0xb5 0x6b // 校验和
0x15 0x00 // 长度 21
0x01 // 类型 kFullType
日志写入完成后
106 block_offset_ += kHeaderSize + length;
(gdb) n
107 return s;
(gdb) p block_offset_
$20 = 28
日志内容
而后跳到WriteBatchInternal::InsertInto函数-》WriteBatch::Iterate-》MemTableInserter::Put-》MemTable::Add函数
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len); // 内存由arena_分配
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
此时buf的数据构成为:
(gdb) n
99 table_.Insert(buf);
(gdb) p encoded_len
$24 = 16
(gdb) x/16xb buf
0x5555555e3838: 0x0b 0x61 0x61 0x61 0x01 0x01 0x00 0x00
0x5555555e3840: 0x00 0x00 0x00 0x00 0x03 0x62 0x62 0x62
(gdb) x/16xt buf
0x5555555e3838: 00001011 01100001 01100001 01100001 00000001 00000001 00000000 00000000
0x5555555e3840: 00000000 00000000 00000000 00000000 00000011 01100010 01100010 01100010
0x0b // internal_key_size 11
0x61 0x61 0x61 // key aaa
0x01 // type
0x01 0x00 0x00 0x00 0x00 0x00 0x00 // sequence
0x03 // value_size 3
0x62 0x62 0x62 // value bbb
跳转到SkipList<Key, Comparator>::Insert,key为const char*,跳表相关操作见1206. 设计跳表
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
直接看NewNode
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}
182 char* const node_memory = arena_->AllocateAligned(
(gdb) p height
$25 = 2
(gdb) p key
$26 = (const char * const&) @0x7fffffffdaf0: 0x5555555e3838 "\vaaa\001\001"
(gdb) p sizeof(Node)
$27 = 16
leveldb::Arena::AllocateAligned (this=0x5555555e37d8, bytes=12884892048) at /root/leveldb/leveldb/util/arena.cc:38
38 char* Arena::AllocateAligned(size_t bytes) {
(gdb) s
39 const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
(gdb) p bytes
$28 = 24
分配24字节内存,其中8字节存储char*指针,指向MemTable::Add申请的buf,其余16字节为长度为2的next数组,指向各层的下一个节点
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
360 for (int i = 0; i < height; i++) {
(gdb) p x
$6 = (leveldb::SkipList<char const*, leveldb::MemTable::KeyComparator>::Node *) 0x5555555e3848
(gdb) p head_
$7 = (leveldb::SkipList<char const*, leveldb::MemTable::KeyComparator>::Node * const) 0x5555555e37d0
(gdb) p prev
$8 = {0x5555555e37d0, 0x5555555e37d0, 0x7fffffffda80,
0x55555556b71a <std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::_S_do_relocate(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >&, std::integral_constant<bool, true>)+52>, 0x7fffffffdc20, 0x5555555d0dc0, 0x0, 0x7fffffffdb90,
0x7fffffffdaa0, 0x7fffffffdb3c, 0x300000101, 0x5555555e3844}
(gdb) p height
$9 = 2
(gdb) p head_->next_[0]
$10 = {_M_b = {_M_p = 0x0}}
(gdb) p head_->next_[1]
$11 = {_M_b = {_M_p = 0x0}}
(gdb) n
363 x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
(gdb) n
364 prev[i]->SetNext(i, x);
(gdb) n
360 for (int i = 0; i < height; i++) {
(gdb) n
363 x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
(gdb) n
364 prev[i]->SetNext(i, x);
(gdb) n
360 for (int i = 0; i < height; i++) {
(gdb) p head_->next_[0]
$12 = {_M_b = {_M_p = 0x5555555e3848}}
(gdb) p head_->next_[1]
$13 = {_M_b = {_M_p = 0x5555555e3848}}
(gdb) p x->next_[0]
$14 = {_M_b = {_M_p = 0x0}}
(gdb) p x->next_[1]
$15 = {_M_b = {_M_p = 0x0}}