天天看点

Rocksdb 的compaction_filter和table_properties_collector 用法 及 其底层实现

文章目录

  • ​​前言​​
  • ​​Compaction Filter​​
  • ​​compaction_filter 使用​​
  • ​​compaction_filter_factory 使用​​
  • ​​compaction filter实现​​
  • ​​Table Collector​​
  • ​​Table Collector 使用​​
  • ​​Table Collector 实现​​
  • ​​总结​​

前言

Rocksdb 因为out of-place update 的方式,导致很多后台运行的合并效率较低,尤其是用户在delete-aware场景想要快速回收key版本,但因为后台操作,不是很方便。同时用户想要删除/保留一批自定义方式的key,这个时候需要由用户来控制compaction过程的一些行为。

所以Rocksdb针对这样的场景对用户暴露了一些接口,支持用户自定义过滤指定类型的key,保留/删除(当然主要是删除),这也是compaciton_filter的接口产生源头。

Table_collector 也是一种类似的接口,Rocksdb为用户暴露了能够访问到底层sst的key-value接口,从而支持一些针对相关key的一些定制化行为(比如统计某种用户定义的key的数量,根据某种key数量的比重加速compaction等等)。

本文将简单介绍一下这两种用户接口的用法以及其如何在rocksdb 底层源代码中的实现原理。

相关的rocksdb代码版本是6.19

Compaction Filter

compaction_filter 使用

  1. 自定义compaction filter,声明如下:
class RemoveEmptyValueCompactionFilter : public CompactionFilter {
 public:
    const char* Name() const override;
    bool Filter(int level,
        const Slice& key,
        const Slice& existing_value,
        std::string* new_value,
        bool* value_changed) const override;
};      
  1. 定义如下:
const char* RemoveEmptyValueCompactionFilter::Name() const {
  return "RemoveEmptyValueCompactionFilter";
}

bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
                                              const Slice& /*key*/,
                                              const Slice& existing_value,
                                              std::string* /*new_value*/,
                                              bool* /*value_changed*/) const {
  // remove kv pairs that have empty values
  return existing_value.empty();
}      

主体是Filter函数,这里底层传入的参数都能够被用户看到包括:

  • Level 当前key的输入level
  • key user_key内容
  • existing_value user_key的value
  • new_value 如果要修改这个existing_value,则这个new_value将被传出
  • value_changed 确定要修改existing_value,需要将这个配置置为true,也会被传出
  1. 增加compaction_filter的配置项
options.compaction_filter = new rocksdb::RemoveEmptyValueCompactionFilter();      
  1. User Filter 监控

    通过增加​

    ​rocksdb.compaction.key.drop.user​

    ​ 来查看被compaction_filter过滤掉的key的数目

compaction_filter_factory 使用

上一节介绍的是compaction_filter的基本使用,用户主体只需要直接实现一个Filter函数。而通过compaction_filter_factory 实现自己的filter工厂,能够用户自定义何时创建自己的filter;有多个Filter是能够通过compaction_filter_factory 来叠加使用。一般​

​compaction_filter​

​​和​

​compaction_filter_factory​

​只需要使用一个就可以了。

如下filter_factory的用例是rocksdb自己的ttl compaction功能,目的是创建一个filter,用户只需要传入一个过期时间,由filter自行在compaction过程中针对过期的key进行清理(当然,前提是key的写入也需要按照ttldb的方式写入),同时factory也定义了保留其他的用户filter。

  1. 定义compaction_filter_factory
class TtlCompactionFilterFactory : public CompactionFilterFactory {
 public:
  TtlCompactionFilterFactory(
      int32_t ttl, SystemClock* clock,
      std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
      : ttl_(ttl),
        clock_(clock),
        user_comp_filter_factory_(comp_filter_factory) {}

  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( // 必须要实现
      const CompactionFilter::Context& context) override {
    std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
        nullptr;
    // 保留其他的用户filter
    if (user_comp_filter_factory_) {
      user_comp_filter_from_factory =
          user_comp_filter_factory_->CreateCompactionFilter(context);
    }

    // 创建一个ttl filter,并将其他已经配置的filter传入
    return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
        ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
  }

  void SetTtl(int32_t ttl) {
    ttl_ = ttl;
  }

  virtual const char* Name() const override { // 必须要实现的
    return "TtlCompactionFilterFactory";
  }

 private:
  int32_t ttl_;
  SystemClock* clock_;
  std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};      
ps:为什么Name()和CreateCompactionFilter必须要实现,可以看看​

​CompactionFilterFactory​

​,这两个函数其实是纯虚函数,必须要基类实现。

class CompactionFilterFactory {

public:

virtual ~CompactionFilterFactory() {}

virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(

const CompactionFilter::Context& context) = 0;

// Returns a name that identifies this compaction filter factory.

virtual const char* Name() const = 0;

};

  1. 基于factory 定义自己的filter
// LayeredCompactionFilterBase 是用来实现filter叠加使用的
class TtlCompactionFilter : public LayeredCompactionFilterBase {
 public:
  TtlCompactionFilter(int32_t ttl, SystemClock* clock,
                      const CompactionFilter* _user_comp_filter,
                      std::unique_ptr<const CompactionFilter>
                          _user_comp_filter_from_factory = nullptr)
      : LayeredCompactionFilterBase(_user_comp_filter,
                                    std::move(_user_comp_filter_from_factory)),
        ttl_(ttl),
        clock_(clock) {}

  // ttl filter,用来判断key是否过期,从而决定是否清理
  virtual bool Filter(int level, const Slice& key, const Slice& old_val,
                      std::string* new_val, bool* value_changed) const
      override {
    if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
      return true;
    }
    if (user_comp_filter() == nullptr) {
      return false;
    }
    assert(old_val.size() >= DBWithTTLImpl::kTSLength);
    Slice old_val_without_ts(old_val.data(),
                             old_val.size() - DBWithTTLImpl::kTSLength);
    if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
                                   value_changed)) {
      return true;
    }
    if (*value_changed) {
      new_val->append(
          old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
          DBWithTTLImpl::kTSLength);
    }
    return false;
  }

  virtual const char* Name() const override { return "Delete By TTL"; }

 private:
  int32_t ttl_;
  SystemClock* clock_;
};      
  1. 增加compaction_filter_factory的配置项
options->compaction_filter_factory =
        std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
            ttl, clock, options->compaction_filter_factory));      

compaction filter实现

关于compaction 处理逻辑的源代码分析可以参考​​rocksdb compaction实现原理​​。

Compaction filter以及 factory生效的逻辑都在 最终compaction处理实际的key-value数据的函数中​

​ProcessKeyValueCompaction​

函数刚开始,会获取用户传入的compaction_filter指针, 如果compaction_filter指针为空,用户会尝试构造compaction_filter_factory,这也就像是上面说的两者之间按需选择一个实现即可。

void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
  .....

  // Create compaction filter and fail the compaction if
  // IgnoreSnapshots() = false because it is not supported anymore
  const CompactionFilter* compaction_filter =
      cfd->ioptions()->compaction_filter;
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (compaction_filter == nullptr) {
    // 先获取用户传入的compaction_filter_factory
    compaction_filter_from_factory =
        sub_compact->compaction->CreateCompactionFilter();
    // 再从factory获取compaction_filter
    compaction_filter = compaction_filter_from_factory.get();
  }
  if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
    sub_compact->status = Status::NotSupported(
        "CompactionFilter::IgnoreSnapshots() = false is not supported "
        "anymore.");
    return;
  }
  ......  
}      

关于这里的​

​CreateCompactionFilter​

​的实现如下,其中有两个配置非常有用:

std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
  if (!cfd_->ioptions()->compaction_filter_factory) {
    return nullptr;
  }

  CompactionFilter::Context context;
  context.is_full_compaction = is_full_compaction_;
  context.is_manual_compaction = is_manual_compaction_;
  context.column_family_id = cfd_->GetID();
  return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
      context);
}      

也就是Rocksdb会将​

​is_full_compaction​

​​和​

​is_manual_compaction​

​ 透传回给用户,用户可以根据是否是fullcompaction和manual compaction来指定自己的filter/factory的行为。

回到​

​ProcessKeyValueCompaction​

​​之中,后续会通过​

​iter->Next()​

​​ --> ​

​NextFromInput()​

​​逐个处理compaction获取到sst中的数据,其中关于compaction_filter的处理会进入到​

​CompactionIterator::InvokeFilterIfNeeded​

​函数

bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
                                              Slice* skip_until) {
  ...
  if (CompactionFilter::Decision::kUndetermined == filter) {
      // 取到filter的行为,决定是remove,还是keep,开始其他相关操作。
      filter = compaction_filter_->FilterV2(
          compaction_->level(), filter_key, value_type,
          blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
          compaction_filter_skip_until_.rep());
    }
    ......
}      

其中​

​FilterV2​

​​的行为如下,因为这个函数也会被用作​

​MergeOperator​

​​中,所以会有​

​kMergeOperand​

​类似的判断;针对正常的Put这种方式写入的key都会调用Filter函数来处理,如果Filter返回为true,则表示需要被remove的。

virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
                          const Slice& existing_value, std::string* new_value,
                          std::string* /*skip_until*/) const {
  switch (value_type) {
    case ValueType::kValue: {
      bool value_changed = false;
      // 用户自定义的Filter函数
      bool rv = Filter(level, key, existing_value, new_value, &value_changed);
      if (rv) {
        return Decision::kRemove;
      }
      return value_changed ? Decision::kChangeValue : Decision::kKeep;
    }
    case ValueType::kMergeOperand: {
      bool rv = FilterMergeOperand(level, key, existing_value);
      return rv ? Decision::kRemove : Decision::kKeep;
    }
    case ValueType::kBlobIndex:
      return Decision::kKeep;
  }
  assert(false);
  return Decision::kKeep;
}      

再次回到​

​InvokeFilterIfNeeded​

​函数中:

针对返回值为​

​Decision::kRemove​

​​的类型,会将当前的key标记为​

​kTypeDeletion​

​​,由​

​NextFromInput​

​后续进行清理

if (filter == CompactionFilter::Decision::kRemove) {
  // convert the current key to a delete; key_ is pointing into
  // current_key_ at this point, so updating current_key_ updates key()
  ikey_.type = kTypeDeletion;
  current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
  // no value associated with delete
  value_.clear();
  iter_stats_.num_record_drop_user++;
}      

到此,基本就清楚了compaction _filter如何在rocksdb内部进行初始化 以及完成指定类型key的清理,仍然是以打一个​

​kTypeDelete​

​的tombstone。

Table Collector

TablePropertiesCollector 的主要作用是在构建SST过程中由用户来指定收集什么样的properties信息。

也能够针对当前sst文件触发一些行为,比如加速当前sst的compaction

Table Collector 使用

  1. Options 中的用法
opts.table_properties_collector_factories.emplace_back(
  NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));      

用户可以自定义多个collectors,只需要将自己定义的collectors工厂添加到table_properties_collector_factories数组中即可。

  1. 继承 table_properties_collector_factories 之后的自定义

    如下CountingUserTblPropCollector 实现了一个统计key个数的功能,并编码到了SSTables的properties属性中。

class CountingUserTblPropCollector : public TablePropertiesCollector {
 public:
  const char* Name() const override { return "CountingUserTblPropCollector"; }

  // 完成一个sstable的构建之后会调用 Finish函数,主要指定用户在构建完成之后的一些自定义行为。
  Status Finish(UserCollectedProperties* properties) override {
    std::string encoded;
    PutVarint32(&encoded, count_);
    *properties = UserCollectedProperties{
        {"CountingUserTblPropCollector", message_}, {"Count", encoded},
    };
    return Status::OK();
  }

  // 用户根据传入的key指定自己的行为,这里是自增计数;这个函数也是一个主要函数,这里能够看到userkey。
  Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
                    EntryType /*type*/, SequenceNumber /*seq*/,
                    uint64_t /*file_size*/) override {
    ++count_;
    return Status::OK();
  }

  UserCollectedProperties GetReadableProperties() const override {
    return UserCollectedProperties{};
  }

 private:
  std::string message_ = "Rocksdb";
  uint32_t count_ = 0;
};      

再构建一个Properties工厂,用来支持传入到option之中

class CountingUserTblPropCollectorFactory
    : public TablePropertiesCollectorFactory {
 public:
  explicit CountingUserTblPropCollectorFactory(
      uint32_t expected_column_family_id)
      : expected_column_family_id_(expected_column_family_id),
        num_created_(0) {}
  // 必须实现
  TablePropertiesCollector* CreateTablePropertiesCollector(
      TablePropertiesCollectorFactory::Context context) override {
    EXPECT_EQ(expected_column_family_id_, context.column_family_id);
    num_created_++;
    return new CountingUserTblPropCollector();
  }
  // 必须实现
  const char* Name() const override {
    return "CountingUserTblPropCollectorFactory";
  }
  void set_expected_column_family_id(uint32_t v) {
    expected_column_family_id_ = v;
  }
  uint32_t expected_column_family_id_;
  uint32_t num_created_;
};      

更多的tablePropertisCollector的定义可以参考​

​table_properties.h​

​​,更多的​

​properties_collector​

​​相关的案例可以整个rocksdb项目 中搜索​

​public TablePropertiesCollector​

​​,其中Rocksdb自己实现了针对delete 比例比较重的场景 加速这种sst 的comapction功能,可以参考​​Rocksdb 对Delete问题的优化​​

Table Collector 实现

引擎内部接受用户传入的TablePropertiesCollector 的过程如下调用栈:

CompactionJob::ProcessKeyValueCompaction // compaction的实际执行函数 -- 处理key-value
  CompactionJob::OpenCompactionOutputFile // 创建compaction过程中的一些文件/文件buffer
    TableBuilder* NewTableBuilder // 创建一种存储key-value数据类型的table
      BlockBasedTableFactory::NewTableBuilder // 默认创建blockbasedtable
        BlockBasedTableBuilder::BlockBasedTableBuilder // 构造函数
          Rep // 构造rep,代表blockbasedtable 来添加数据      

在构造​

​Rep​

​的过程中,会将用户传入的collector_factories 添加到rep维护的properties_colectors成员之中

for (auto& collector_factories : *int_tbl_prop_collector_factories) {
  table_properties_collectors.emplace_back(
    collector_factories->CreateIntTblPropCollector(column_family_id));
}      

现在引擎的blockbasedtable的持有者已经拿到了用户自定义的collectors,之后要构造sstable的过程来执行​

​collectors​

​的一些行为,则会在如下调用栈中进行:

CompactionJob::ProcessKeyValueCompaction
  CompactionJob::FinishCompactionOutputFile
    BlockBasedTableBuilder::Add
      NotifyCollectTableCollectorsOnAdd      

​NotifyCollectTableCollectorsOnAdd​

​​ 这个函数中会拿着前面构造好的rep的​

​table_properties_collectors​

​成员

bool NotifyCollectTableCollectorsOnAdd(
    const Slice& key, const Slice& value, uint64_t file_size,
    const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
    Logger* info_log) {
  bool all_succeeded = true;
  // 遍历所有的collectors,每个都执行一次InternalAdd
  for (auto& collector : collectors) {
    Status s = collector->InternalAdd(key, value, file_size);
    all_succeeded = all_succeeded && s.ok();
    if (!s.ok()) {
      LogPropertiesCollectionError(info_log, "Add" /* method */,
                                   collector->Name());
    }
  }
  return all_succeeded;
}      

如果collector是用户创造的类型,则会统一进入​

​UserKeyTablePropertiesCollector​

​​逻辑中,从而调用我们自己实现的​

​AddUserKey​

​函数。

Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key,
                                                    const Slice& value,
                                                    uint64_t file_size) {
  ParsedInternalKey ikey;
  Status s = ParseInternalKey(key, &ikey, false /* log_err_key */);  // TODO
  if (!s.ok()) {
    return s;
  }

  return collector_->AddUserKey(ikey.user_key, value, GetEntryType(ikey.type),
                                ikey.sequence, file_size);
}      

这样就能够在创建sstable的过程中完成我们自定义属性的更新。

如果在​

​CompactionJob::FinishCompactionOutputFile​

​​ 中完成了key的添加,并构造完成了sstable,则会尝试获取​

​NeedCompact​

​标记,这个标记可以在我们自定义的collector中完成设置。

if (s.ok()) {
  meta->fd.file_size = current_bytes;
  meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}

 |
 | 调用BlockBasedTableBuilder的NeedCompact
\ /
   
bool BlockBasedTableBuilder::NeedCompact() const {
  // 从用户自定义行为中查看NeedCompact标记
  for (const auto& collector : rep_->table_properties_collectors) {
    if (collector->NeedCompact()) {
      return true;
    }
  }
  return false;
}      

到此,我们就知道了TablePropertiesCollector的行为在引擎中的实现。

总结

1. Rocksdb ThreadLocal 机制 原理?为什么要有这个机制?其在读场景下有什么优势?
2. Rocksdb syncpoint 实现,基本用法
3. Rocksdb 事务琐实现:锁管理/加锁流程/死锁检测/空间消耗。
4. Rocksdb 乐观事务/悲观事务实现,基本用法,适用场景。
5. Rocksdb 压缩实现。
6. Rocksdb Pinnable 结合Cleanable 在iter中的作用 及 实现。