天天看点

leveldb学习笔记之五——include/leveldb/write_batch.h

WriteBatch主要是用来对多个数据进行批量写入。

依赖关系

writeBatch类主要依赖于以下几个类,如图:

leveldb学习笔记之五——include/leveldb/write_batch.h

其中,Handler为抽象类,定义了put和delete接口。WriteBatchInternal类为writebatch的实现。

MemTableInserter为Handler的具体实现,实现具体的put和delete操作。此处使用了抽象工厂模式,即Handler为抽象工厂,MemTableInserter定义了具体的工厂。在MemTableInserter中定义了MemTable的成员,通过此成员实现put和delete操作。

writeBatch的数据结构

leveldb学习笔记之五——include/leveldb/write_batch.h

一个Batch结构前8个字节是序列号,接下来4个字节是写入数据的总长度,再接下来是数据。数据的格式按照 类型,key,value的方式来排列。

类型主要分两种,一种是写入数据,另一种是删除数据。删除数据可以看成是一种特殊的写入。

相关函数

  • 此函数用于迭代处理writebatch中的数据。即写入
Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
    if (input.size() < kHeader) {
      return Status::Corruption("malformed WriteBatch (too small)");
    }

    input.remove_prefix(kHeader); //删除前缀,8个字节的序列号,4个字节的长度,一共12个字节
    Slice key, value;
    int found = 0;
    while (!input.empty()) {
      found++;
      char tag = input[0];
      input.remove_prefix(1);
      switch (tag) {
          case kTypeValue:  //正常的写入数据
            if (GetLengthPrefixedSlice(&input, &key) &&
                GetLengthPrefixedSlice(&input, &value)) {
                handler->Put(key, value);
            } else {
                return Status::Corruption("bad WriteBatch Put");
            }
            break;
          case kTypeDeletion: //删除的数据
            if (GetLengthPrefixedSlice(&input, &key)) {
                handler->Delete(key);
            } else {
                return Status::Corruption("bad WriteBatch Delete");
            }
            break;
          default:
            return Status::Corruption("unknown WriteBatch tag");
      }
    }
    if (found != WriteBatchInternal::Count(this)) {
      return Status::Corruption("WriteBatch has wrong count");
    } else {
      return Status::OK();
    }
}      
  • 求长度,数据的长度加上8个字节序列号的长度即可
int WriteBatchInternal::Count(const WriteBatch* b) {
    return DecodeFixed32(b->rep_.data() + 8);
}      
  • 设置长度值
void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
    EncodeFixed32(&b->rep_[8], n);
}      
  • 获取序列号
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
    return SequenceNumber(DecodeFixed64(b->rep_.data()));
}      
  • 设置序列号
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
    EncodeFixed64(&b->rep_[0], seq);
}      
  • 写入
//写入实际上是先写入到batch里面
void WriteBatch::Put(const Slice& key, const Slice& value) {
    WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
    rep_.push_back(static_cast<char>(kTypeValue));
    PutLengthPrefixedSlice(&rep_, key);
    PutLengthPrefixedSlice(&rep_, value);
}      
  • 删除
//删除实际上是将key写入,同时在key之前标注此key对应的数据删除
void WriteBatch::Delete(const Slice& key) {
    WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
    rep_.push_back(static_cast<char>(kTypeDeletion));
    PutLengthPrefixedSlice(&rep_, key);
}      
  • writebatch追加
//两个batch进行追加
void WriteBatch::Append(const WriteBatch &source) {
    WriteBatchInternal::Append(this, &source);
}