文章目录
- 前言
- Compaction Filter
- compaction_filter 使用
- compaction_filter_factory 使用
- compaction filter实现
- Table Collector
- Table Collector 使用
- Table Collector 实现
- 总结
前言
Rocksdb 因为out of-place update 的方式,导致很多后台运行的合并效率较低,尤其是用户在delete-aware场景想要快速回收key版本,但因为后台操作,不是很方便。同时用户想要删除/保留一批自定义方式的key,这个时候需要由用户来控制compaction过程的一些行为。
所以Rocksdb针对这样的场景对用户暴露了一些接口,支持用户自定义过滤指定类型的key,保留/删除(当然主要是删除),这也是compaciton_filter的接口产生源头。
Table_collector 也是一种类似的接口,Rocksdb为用户暴露了能够访问到底层sst的key-value接口,从而支持一些针对相关key的一些定制化行为(比如统计某种用户定义的key的数量,根据某种key数量的比重加速compaction等等)。
本文将简单介绍一下这两种用户接口的用法以及其如何在rocksdb 底层源代码中的实现原理。
相关的rocksdb代码版本是6.19
Compaction Filter
compaction_filter 使用
- 自定义compaction filter,声明如下:
class RemoveEmptyValueCompactionFilter : public CompactionFilter {
public:
const char* Name() const override;
bool Filter(int level,
const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const override;
};
- 定义如下:
const char* RemoveEmptyValueCompactionFilter::Name() const {
return "RemoveEmptyValueCompactionFilter";
}
bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
const Slice& /*key*/,
const Slice& existing_value,
std::string* /*new_value*/,
bool* /*value_changed*/) const {
// remove kv pairs that have empty values
return existing_value.empty();
}
主体是Filter函数,这里底层传入的参数都能够被用户看到包括:
- Level 当前key的输入level
- key user_key内容
- existing_value user_key的value
- new_value 如果要修改这个existing_value,则这个new_value将被传出
- value_changed 确定要修改existing_value,需要将这个配置置为true,也会被传出
- 增加compaction_filter的配置项
options.compaction_filter = new rocksdb::RemoveEmptyValueCompactionFilter();
-
User Filter 监控
通过增加
来查看被compaction_filter过滤掉的key的数目rocksdb.compaction.key.drop.user
compaction_filter_factory 使用
上一节介绍的是compaction_filter的基本使用,用户主体只需要直接实现一个Filter函数。而通过compaction_filter_factory 实现自己的filter工厂,能够用户自定义何时创建自己的filter;有多个Filter是能够通过compaction_filter_factory 来叠加使用。一般
compaction_filter
和
compaction_filter_factory
只需要使用一个就可以了。
如下filter_factory的用例是rocksdb自己的ttl compaction功能,目的是创建一个filter,用户只需要传入一个过期时间,由filter自行在compaction过程中针对过期的key进行清理(当然,前提是key的写入也需要按照ttldb的方式写入),同时factory也定义了保留其他的用户filter。
- 定义compaction_filter_factory
class TtlCompactionFilterFactory : public CompactionFilterFactory {
public:
TtlCompactionFilterFactory(
int32_t ttl, SystemClock* clock,
std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
: ttl_(ttl),
clock_(clock),
user_comp_filter_factory_(comp_filter_factory) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( // 必须要实现
const CompactionFilter::Context& context) override {
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
nullptr;
// 保留其他的用户filter
if (user_comp_filter_factory_) {
user_comp_filter_from_factory =
user_comp_filter_factory_->CreateCompactionFilter(context);
}
// 创建一个ttl filter,并将其他已经配置的filter传入
return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
}
void SetTtl(int32_t ttl) {
ttl_ = ttl;
}
virtual const char* Name() const override { // 必须要实现的
return "TtlCompactionFilterFactory";
}
private:
int32_t ttl_;
SystemClock* clock_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
ps:为什么Name()和CreateCompactionFilter必须要实现,可以看看
CompactionFilterFactory
,这两个函数其实是纯虚函数,必须要基类实现。
class CompactionFilterFactory {
public:
virtual ~CompactionFilterFactory() {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) = 0;
// Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0;
};
- 基于factory 定义自己的filter
// LayeredCompactionFilterBase 是用来实现filter叠加使用的
class TtlCompactionFilter : public LayeredCompactionFilterBase {
public:
TtlCompactionFilter(int32_t ttl, SystemClock* clock,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter>
_user_comp_filter_from_factory = nullptr)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
ttl_(ttl),
clock_(clock) {}
// ttl filter,用来判断key是否过期,从而决定是否清理
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
std::string* new_val, bool* value_changed) const
override {
if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
return true;
}
if (user_comp_filter() == nullptr) {
return false;
}
assert(old_val.size() >= DBWithTTLImpl::kTSLength);
Slice old_val_without_ts(old_val.data(),
old_val.size() - DBWithTTLImpl::kTSLength);
if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
value_changed)) {
return true;
}
if (*value_changed) {
new_val->append(
old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
DBWithTTLImpl::kTSLength);
}
return false;
}
virtual const char* Name() const override { return "Delete By TTL"; }
private:
int32_t ttl_;
SystemClock* clock_;
};
- 增加compaction_filter_factory的配置项
options->compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
ttl, clock, options->compaction_filter_factory));
compaction filter实现
关于compaction 处理逻辑的源代码分析可以参考rocksdb compaction实现原理。
Compaction filter以及 factory生效的逻辑都在 最终compaction处理实际的key-value数据的函数中
ProcessKeyValueCompaction
函数刚开始,会获取用户传入的compaction_filter指针, 如果compaction_filter指针为空,用户会尝试构造compaction_filter_factory,这也就像是上面说的两者之间按需选择一个实现即可。
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
.....
// Create compaction filter and fail the compaction if
// IgnoreSnapshots() = false because it is not supported anymore
const CompactionFilter* compaction_filter =
cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
// 先获取用户传入的compaction_filter_factory
compaction_filter_from_factory =
sub_compact->compaction->CreateCompactionFilter();
// 再从factory获取compaction_filter
compaction_filter = compaction_filter_from_factory.get();
}
if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
sub_compact->status = Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
return;
}
......
}
关于这里的
CreateCompactionFilter
的实现如下,其中有两个配置非常有用:
std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
if (!cfd_->ioptions()->compaction_filter_factory) {
return nullptr;
}
CompactionFilter::Context context;
context.is_full_compaction = is_full_compaction_;
context.is_manual_compaction = is_manual_compaction_;
context.column_family_id = cfd_->GetID();
return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
context);
}
也就是Rocksdb会将
is_full_compaction
和
is_manual_compaction
透传回给用户,用户可以根据是否是fullcompaction和manual compaction来指定自己的filter/factory的行为。
回到
ProcessKeyValueCompaction
之中,后续会通过
iter->Next()
-->
NextFromInput()
逐个处理compaction获取到sst中的数据,其中关于compaction_filter的处理会进入到
CompactionIterator::InvokeFilterIfNeeded
函数
bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
...
if (CompactionFilter::Decision::kUndetermined == filter) {
// 取到filter的行为,决定是remove,还是keep,开始其他相关操作。
filter = compaction_filter_->FilterV2(
compaction_->level(), filter_key, value_type,
blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
compaction_filter_skip_until_.rep());
}
......
}
其中
FilterV2
的行为如下,因为这个函数也会被用作
MergeOperator
中,所以会有
kMergeOperand
类似的判断;针对正常的Put这种方式写入的key都会调用Filter函数来处理,如果Filter返回为true,则表示需要被remove的。
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
switch (value_type) {
case ValueType::kValue: {
bool value_changed = false;
// 用户自定义的Filter函数
bool rv = Filter(level, key, existing_value, new_value, &value_changed);
if (rv) {
return Decision::kRemove;
}
return value_changed ? Decision::kChangeValue : Decision::kKeep;
}
case ValueType::kMergeOperand: {
bool rv = FilterMergeOperand(level, key, existing_value);
return rv ? Decision::kRemove : Decision::kKeep;
}
case ValueType::kBlobIndex:
return Decision::kKeep;
}
assert(false);
return Decision::kKeep;
}
再次回到
InvokeFilterIfNeeded
函数中:
针对返回值为
Decision::kRemove
的类型,会将当前的key标记为
kTypeDeletion
,由
NextFromInput
后续进行清理
if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
}
到此,基本就清楚了compaction _filter如何在rocksdb内部进行初始化 以及完成指定类型key的清理,仍然是以打一个
kTypeDelete
的tombstone。
Table Collector
TablePropertiesCollector 的主要作用是在构建SST过程中由用户来指定收集什么样的properties信息。
也能够针对当前sst文件触发一些行为,比如加速当前sst的compaction
Table Collector 使用
- Options 中的用法
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
用户可以自定义多个collectors,只需要将自己定义的collectors工厂添加到table_properties_collector_factories数组中即可。
-
继承 table_properties_collector_factories 之后的自定义
如下CountingUserTblPropCollector 实现了一个统计key个数的功能,并编码到了SSTables的properties属性中。
class CountingUserTblPropCollector : public TablePropertiesCollector {
public:
const char* Name() const override { return "CountingUserTblPropCollector"; }
// 完成一个sstable的构建之后会调用 Finish函数,主要指定用户在构建完成之后的一些自定义行为。
Status Finish(UserCollectedProperties* properties) override {
std::string encoded;
PutVarint32(&encoded, count_);
*properties = UserCollectedProperties{
{"CountingUserTblPropCollector", message_}, {"Count", encoded},
};
return Status::OK();
}
// 用户根据传入的key指定自己的行为,这里是自增计数;这个函数也是一个主要函数,这里能够看到userkey。
Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
EntryType /*type*/, SequenceNumber /*seq*/,
uint64_t /*file_size*/) override {
++count_;
return Status::OK();
}
UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
private:
std::string message_ = "Rocksdb";
uint32_t count_ = 0;
};
再构建一个Properties工厂,用来支持传入到option之中
class CountingUserTblPropCollectorFactory
: public TablePropertiesCollectorFactory {
public:
explicit CountingUserTblPropCollectorFactory(
uint32_t expected_column_family_id)
: expected_column_family_id_(expected_column_family_id),
num_created_(0) {}
// 必须实现
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) override {
EXPECT_EQ(expected_column_family_id_, context.column_family_id);
num_created_++;
return new CountingUserTblPropCollector();
}
// 必须实现
const char* Name() const override {
return "CountingUserTblPropCollectorFactory";
}
void set_expected_column_family_id(uint32_t v) {
expected_column_family_id_ = v;
}
uint32_t expected_column_family_id_;
uint32_t num_created_;
};
更多的tablePropertisCollector的定义可以参考
table_properties.h
,更多的
properties_collector
相关的案例可以整个rocksdb项目 中搜索
public TablePropertiesCollector
,其中Rocksdb自己实现了针对delete 比例比较重的场景 加速这种sst 的comapction功能,可以参考Rocksdb 对Delete问题的优化
Table Collector 实现
引擎内部接受用户传入的TablePropertiesCollector 的过程如下调用栈:
CompactionJob::ProcessKeyValueCompaction // compaction的实际执行函数 -- 处理key-value
CompactionJob::OpenCompactionOutputFile // 创建compaction过程中的一些文件/文件buffer
TableBuilder* NewTableBuilder // 创建一种存储key-value数据类型的table
BlockBasedTableFactory::NewTableBuilder // 默认创建blockbasedtable
BlockBasedTableBuilder::BlockBasedTableBuilder // 构造函数
Rep // 构造rep,代表blockbasedtable 来添加数据
在构造
Rep
的过程中,会将用户传入的collector_factories 添加到rep维护的properties_colectors成员之中
for (auto& collector_factories : *int_tbl_prop_collector_factories) {
table_properties_collectors.emplace_back(
collector_factories->CreateIntTblPropCollector(column_family_id));
}
现在引擎的blockbasedtable的持有者已经拿到了用户自定义的collectors,之后要构造sstable的过程来执行
collectors
的一些行为,则会在如下调用栈中进行:
CompactionJob::ProcessKeyValueCompaction
CompactionJob::FinishCompactionOutputFile
BlockBasedTableBuilder::Add
NotifyCollectTableCollectorsOnAdd
NotifyCollectTableCollectorsOnAdd
这个函数中会拿着前面构造好的rep的
table_properties_collectors
成员
bool NotifyCollectTableCollectorsOnAdd(
const Slice& key, const Slice& value, uint64_t file_size,
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log) {
bool all_succeeded = true;
// 遍历所有的collectors,每个都执行一次InternalAdd
for (auto& collector : collectors) {
Status s = collector->InternalAdd(key, value, file_size);
all_succeeded = all_succeeded && s.ok();
if (!s.ok()) {
LogPropertiesCollectionError(info_log, "Add" /* method */,
collector->Name());
}
}
return all_succeeded;
}
如果collector是用户创造的类型,则会统一进入
UserKeyTablePropertiesCollector
逻辑中,从而调用我们自己实现的
AddUserKey
函数。
Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key,
const Slice& value,
uint64_t file_size) {
ParsedInternalKey ikey;
Status s = ParseInternalKey(key, &ikey, false /* log_err_key */); // TODO
if (!s.ok()) {
return s;
}
return collector_->AddUserKey(ikey.user_key, value, GetEntryType(ikey.type),
ikey.sequence, file_size);
}
这样就能够在创建sstable的过程中完成我们自定义属性的更新。
如果在
CompactionJob::FinishCompactionOutputFile
中完成了key的添加,并构造完成了sstable,则会尝试获取
NeedCompact
标记,这个标记可以在我们自定义的collector中完成设置。
if (s.ok()) {
meta->fd.file_size = current_bytes;
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
|
| 调用BlockBasedTableBuilder的NeedCompact
\ /
bool BlockBasedTableBuilder::NeedCompact() const {
// 从用户自定义行为中查看NeedCompact标记
for (const auto& collector : rep_->table_properties_collectors) {
if (collector->NeedCompact()) {
return true;
}
}
return false;
}
到此,我们就知道了TablePropertiesCollector的行为在引擎中的实现。
总结
1. Rocksdb ThreadLocal 机制 原理?为什么要有这个机制?其在读场景下有什么优势?
2. Rocksdb syncpoint 实现,基本用法
3. Rocksdb 事务琐实现:锁管理/加锁流程/死锁检测/空间消耗。
4. Rocksdb 乐观事务/悲观事务实现,基本用法,适用场景。
5. Rocksdb 压缩实现。
6. Rocksdb Pinnable 结合Cleanable 在iter中的作用 及 实现。