天天看点

leveldb第二幕 简单的Put路径1 静态库与测试程序2 调用路径

leveldb第二幕 简单的Put路径

  • 1 静态库与测试程序
  • 2 调用路径

仅做个人记录

1 静态库与测试程序

以Debug模式生成静态库

mkdir -p build_dbg && cd build_dbg
cmake -DCMAKE_BUILD_TYPE=Debug .. && cmake --build .
cp libleveldb.a  libleveldb_dbg.a
mv libleveldb_dbg.a /usr/local/lib/
ls /usr/local/lib/
           
leveldb第二幕 简单的Put路径1 静态库与测试程序2 调用路径

测试程序

#include <cassert>
#include <iostream>
#include <string>
#include <chrono>
#include <leveldb/db.h>

int main() {
  leveldb::DB* db;
  leveldb::Options options;
  options.create_if_missing = true;  // 不存在时创建数据库
  leveldb::Status status = leveldb::DB::Open(options, "./testdb", &db);
  std::string key = "aaa";
  std::string value = "bbb";
  leveldb::Status s = db->Put(leveldb::WriteOptions(), key, value);
  assert(status.ok());
  delete db;
  return 0;
}
           

编译调试

g++ test.cpp -o  test -g -lleveldb_dbg -pthread
gdb ./test
           

2 调用路径

首先在DB::Put函数

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}
           
Breakpoint 1, leveldb::DB::Put (this=0x4, opt=..., key=..., value=...) at /root/leveldb/leveldb/db/db_impl.cc:1483
1483    Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
(gdb) s
1484      WriteBatch batch;
(gdb) p key
$7 = (const leveldb::Slice &) @0x7fffffffde30: {data_ = 0x7fffffffdec0 "aaa", size_ = 3}
(gdb) p value
$8 = (const leveldb::Slice &) @0x7fffffffde40: {data_ = 0x7fffffffdee0 "bbb", size_ = 3}
(gdb) n
1485      batch.Put(key, value);
(gdb) n
1486      return Write(opt, &batch);
(gdb) p batch
$9 = {rep_ = "\000\000\000\000\000\000\000\000\001\000\000\000\001\003aaa\003bbb"}
(gdb) p batch.rep_.size()
$10 = 21
(gdb) p batch.rep_.data()
$11 = 0x5555555d0da0 ""
(gdb) x/21xb 0x5555555d0da0
0x5555555d0da0: 0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x5555555d0da8: 0x01    0x00    0x00    0x00    0x01    0x03    0x61    0x61
0x5555555d0db0: 0x61    0x03    0x62    0x62    0x62
(gdb) 
           

rep_数据构成

// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
//    kTypeValue varstring varstring         |
//    kTypeDeletion varstring
// varstring :=
//    len: varint32
//    data: uint8[len]
           

填充代码

void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}
0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00  // sequence
0x01    0x00    0x00    0x00 	// count
0x01    // kTypeValue
0x03    0x61    0x61    0x61    // 3 aaa
0x03    0x62    0x62    0x62	// 3 bbb
           

然后到DBImpl::Write函数

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(updates == nullptr);
           

在MakeRoomForWrite函数中于mem_->ApproximateMemoryUsage() <= options_.write_buffer_size分支处跳出

1348                   (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
(gdb) p mem_->ApproximateMemoryUsage()
$1 = 4104
(gdb) p options_.write_buffer_size
$2 = 4194304
           

系统只有一个写请求,BuildBatchGroup函数没有Append其他请求

uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
           
(gdb) p write_batch->rep_.data()
$12 = 0x5555555d0da0 "\001"
(gdb) x/21xb 0x5555555d0da0		// 仅序列号发生改变
0x5555555d0da0: 0x01    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x5555555d0da8: 0x01    0x00    0x00    0x00    0x01    0x03    0x61    0x61
0x5555555d0db0: 0x61    0x03    0x62    0x62    0x62
           

而后跳到Writer::AddRecord函数

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();
  size_t left = slice.size();

  // Fragment the record if necessary and emit it.  Note that if slice
  // is empty, we still want to iterate once to emit a single
  // zero-length record
  Status s;
  bool begin = true;
  do {
    const int leftover = kBlockSize - block_offset_;
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {
      // Switch to a new block
      if (leftover > 0) {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        static_assert(kHeaderSize == 7, "");
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ = 0;
    }

    // Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    const bool end = (left == fragment_length);
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }
    s = EmitPhysicalRecord(type, ptr, fragment_length);
           
(gdb) p *this
$13 = {dest_ = 0x5555555d3770, block_offset_ = 0, type_crc_ = {1383945041, 2685849682, 3007718310, 1093509285, 2514994254}}
(gdb) p kBlockSize
$14 = 32768
(gdb) p type
$15 = leveldb::log::kFullType
(gdb) p fragment_length
$16 = 21
           

进入Writer::EmitPhysicalRecord函数

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
                                  size_t length) {
  assert(length <= 0xffff);  // Must fit in two bytes
  assert(block_offset_ + kHeaderSize + length <= kBlockSize);

  // Format the header
  char buf[kHeaderSize];
  buf[4] = static_cast<char>(length & 0xff);
  buf[5] = static_cast<char>(length >> 8);
  buf[6] = static_cast<char>(t);

  // Compute the crc of the record type and the payload.
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
  crc = crc32c::Mask(crc);  // Adjust for storage
  EncodeFixed32(buf, crc);

  // Write the header and the payload
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    s = dest_->Append(Slice(ptr, length));
    if (s.ok()) {
      s = dest_->Flush();
    }
  }
  block_offset_ += kHeaderSize + length;
  return s;
}
           

查看头部信息

99        Status s = dest_->Append(Slice(buf, kHeaderSize));
(gdb) x/7xb buf
0x7fffffffdba1: 0x69    0x09    0xb5    0x6b    0x15    0x00    0x01
(gdb) p crc
$17 = 1807026537
(gdb) p/x crc
$18 = 0x6bb50969


0x69    0x09    0xb5    0x6b	// 校验和
0x15	0x00   	// 长度 21
0x01	// 类型 kFullType
           

日志写入完成后

106       block_offset_ += kHeaderSize + length;
(gdb) n
107       return s;
(gdb) p block_offset_
$20 = 28
           

日志内容

leveldb第二幕 简单的Put路径1 静态库与测试程序2 调用路径

而后跳到WriteBatchInternal::InsertInto函数-》WriteBatch::Iterate-》MemTableInserter::Put-》MemTable::Add函数

bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
        
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
  MemTableInserter inserter;
  inserter.sequence_ = WriteBatchInternal::Sequence(b);
  inserter.mem_ = memtable;
  return b->Iterate(&inserter);
}  

Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
  if (input.size() < kHeader) {
    return Status::Corruption("malformed WriteBatch (too small)");
  }

  input.remove_prefix(kHeader);
  Slice key, value;
  int found = 0;
  while (!input.empty()) {
    found++;
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Put(key, value);     
void Put(const Slice& key, const Slice& value) override {
  mem_->Add(sequence_, kTypeValue, key, value);
  sequence_++;
}

void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  tag          : uint64((sequence << 8) | type)
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();
  size_t val_size = value.size();
  size_t internal_key_size = key_size + 8;
  const size_t encoded_len = VarintLength(internal_key_size) +
                             internal_key_size + VarintLength(val_size) +
                             val_size;
  char* buf = arena_.Allocate(encoded_len);		// 内存由arena_分配
  char* p = EncodeVarint32(buf, internal_key_size);
  std::memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  p = EncodeVarint32(p, val_size);
  std::memcpy(p, value.data(), val_size);
  assert(p + val_size == buf + encoded_len);
  table_.Insert(buf);
} 
           

此时buf的数据构成为:

(gdb) n
99        table_.Insert(buf);
(gdb) p encoded_len
$24 = 16
(gdb) x/16xb buf
0x5555555e3838: 0x0b    0x61    0x61    0x61    0x01    0x01    0x00    0x00
0x5555555e3840: 0x00    0x00    0x00    0x00    0x03    0x62    0x62    0x62
(gdb) x/16xt buf
0x5555555e3838: 00001011        01100001        01100001        01100001        00000001        00000001        00000000   00000000
0x5555555e3840: 00000000        00000000        00000000        00000000        00000011        01100010        01100010   01100010

0x0b    // internal_key_size 11
0x61    0x61    0x61	// key aaa
0x01    // type 
0x01    0x00    0x00	0x00    0x00    0x00    0x00	// sequence 
0x03	// value_size 3
0x62    0x62    0x62	// value bbb
           

跳转到SkipList<Key, Comparator>::Insert,key为const char*,跳表相关操作见1206. 设计跳表

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == nullptr || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (nullptr), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since nullptr sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.store(height, std::memory_order_relaxed);
  }

  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}
           

直接看NewNode

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
    const Key& key, int height) {
  char* const node_memory = arena_->AllocateAligned(
      sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
  return new (node_memory) Node(key);
}
           
182       char* const node_memory = arena_->AllocateAligned(
(gdb) p height
$25 = 2
(gdb) p key
$26 = (const char * const&) @0x7fffffffdaf0: 0x5555555e3838 "\vaaa\001\001"
(gdb) p sizeof(Node)
$27 = 16
leveldb::Arena::AllocateAligned (this=0x5555555e37d8, bytes=12884892048) at /root/leveldb/leveldb/util/arena.cc:38
38      char* Arena::AllocateAligned(size_t bytes) {
(gdb) s
39        const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
(gdb) p bytes
$28 = 24

分配24字节内存,其中8字节存储char*指针,指向MemTable::Add申请的buf,其余16字节为长度为2的next数组,指向各层的下一个节点
           
for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
           
360       for (int i = 0; i < height; i++) {
(gdb) p x
$6 = (leveldb::SkipList<char const*, leveldb::MemTable::KeyComparator>::Node *) 0x5555555e3848
(gdb) p head_
$7 = (leveldb::SkipList<char const*, leveldb::MemTable::KeyComparator>::Node * const) 0x5555555e37d0
(gdb) p prev
$8 = {0x5555555e37d0, 0x5555555e37d0, 0x7fffffffda80, 
  0x55555556b71a <std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >::_S_do_relocate(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >&, std::integral_constant<bool, true>)+52>, 0x7fffffffdc20, 0x5555555d0dc0, 0x0, 0x7fffffffdb90, 
  0x7fffffffdaa0, 0x7fffffffdb3c, 0x300000101, 0x5555555e3844}
(gdb) p height 
$9 = 2
(gdb) p head_->next_[0]
$10 = {_M_b = {_M_p = 0x0}}
(gdb) p head_->next_[1]
$11 = {_M_b = {_M_p = 0x0}}
(gdb) n
363         x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
(gdb) n
364         prev[i]->SetNext(i, x);
(gdb) n
360       for (int i = 0; i < height; i++) {
(gdb) n
363         x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
(gdb) n
364         prev[i]->SetNext(i, x);
(gdb) n
360       for (int i = 0; i < height; i++) {
(gdb) p head_->next_[0]
$12 = {_M_b = {_M_p = 0x5555555e3848}}
(gdb) p head_->next_[1]
$13 = {_M_b = {_M_p = 0x5555555e3848}}
(gdb) p x->next_[0]
$14 = {_M_b = {_M_p = 0x0}}
(gdb) p x->next_[1]
$15 = {_M_b = {_M_p = 0x0}}
           

继续阅读