這裡的comparator包括抽象類Comparator及其兩個實作類:一個是内置的BytewiseComparatorImpl,另一個是InternalKeyComparator。
一.Comparator
Comparator隻是導出了幾個接口。
class Comparator {
public:
virtual ~Comparator();
// Three-way comparison. Returns value:
// < 0 iff "a" < "b",
// == 0 iff "a" == "b",
// > 0 iff "a" > "b"
virtual int Compare(const Slice& a, const Slice& b) const = 0;
// The name of the comparator. Used to check for comparator
// mismatches (i.e., a DB created with one comparator is
// accessed using a different comparator.
//
// The client of this package should switch to a new name whenever
// the comparator implementation changes in a way that will cause
// the relative ordering of any two keys to change.
//
// Names starting with "leveldb." are reserved and should not be used
// by any clients of this package.
//傳回comparator的名字
virtual const char* Name() const = 0;
// Advanced functions: these are used to reduce the space requirements
// for internal data structures like index blocks.
// If *start < limit, changes *start to a short string in [start,limit).
// Simple comparator implementations may return with *start unchanged,
// i.e., an implementation of this method that does nothing is correct.
// 這兩個函數作用是減少像index blocks這樣的内部資料結構占用的空間。
// 如果*start < limit,就在[start,limit)中找到一個短字元串,并賦給*start傳回。
// 當然傳回的*start可能沒變化(*start==limit),此時這個函數相當于啥都沒幹,這也是正确的。
virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const = 0;
// Changes *key to a short string >= *key.
// Simple comparator implementations may return with *key unchanged,
// i.e., an implementation of this method that does nothing is correct.
// 将*key變成一個比原*key大的短字元串,并指派給*key傳回。
virtual void FindShortSuccessor(std::string* key) const = 0;
};
二.BytewiseComparatorImpl
BytewiseComparatorImpl是按字典逐位元組序進行比較的,也就是說i>helloworld,因為先比較i和h,i>h,比較結束。
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() { }
virtual const char* Name() const {
return "leveldb.BytewiseComparator";
}
// 直接調用Slice的compare函數
virtual int Compare(const Slice& a, const Slice& b) const {
return a.compare(b);
}
// FindShortestSeparator找到start、limit之間最短的字元串,如“helloworld”和”hellozoomer”之間最短的key可以是”hellox”。
virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const {
// 找到共同字首的長度
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}
// 如果一個字元串是另個一字元串的字首,無需做截短操作,否則進入else。
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}
// FindShortSuccessor則更極端,用于找到比key大的最短字元串,如傳入“helloworld”,傳回的key可能是“i”而已。
virtual void FindShortSuccessor(std::string* key) const {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i+1);
return;
}
}
// *key is a run of 0xffs. Leave it alone.
}
};
三.Internal Key
1.Internal Key的結構。
下面是一個未編碼前,或者說是一個解碼後的Internal Key結構,它由user_key、sequence和type三個字段組成。
struct ParsedInternalKey {
Slice user_key;
SequenceNumber sequence;
ValueType type;
ParsedInternalKey() { } // Intentionally left uninitialized (for speed)
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) { }
std::string DebugString() const;
};
2.Internal Key的編碼
源碼中通過InternalKey類封裝了Internal Key的相關操作。編碼的用到的函數如下。
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
return (seq << 8) | t;
}
AppendInternalKey函數先把user_key添加到*result中,然後用PackSequenceAndType函數将sequence和type打包,并将打包的結果添加到*result中。
PutFixed64函數隻是簡單的進行了拷貝,詳見部落格:LevelDB源碼分析之一:coding
PackSequenceAndType函數的原理是先将seq左移8位,然後和t進行或操作,相當于把t放到了seq的低8為。為什麼seq要小于等于kMaxSequenceNumber呢。
因為kMaxSequenceNumber的值如下所示。
typedef uint64_t SequenceNumber;
// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
//0x1ull:u-unsigned 無符号;l-long 長整型,ll就是64位整型。整個0x1ull代表的含義是無符号64位整型常量1,用16進制表示。
static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
用二進制表示就是:0000 0000 1111 1111 1111 1111 1111 1111。如果seq大于kMaxSequenceNumber,左移8位的話會移出界。
3.Internal key的解碼
inline bool ParseInternalKey(const Slice& internal_key,
ParsedInternalKey* result) {
const size_t n = internal_key.size();
if (n < 8) return false;
// DecodeFixed64函數是PutFixed64函數的逆過程,傳回sequence和type的打包結果,并指派給num。
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
// 截取低8位,指派給c
unsigned char c = num & 0xff;
// 右移8位,指派給sequence
result->sequence = num >> 8;
// 将c轉換為type
result->type = static_cast<ValueType>(c);
// 取出user_key
result->user_key = Slice(internal_key.data(), n - 8);
return (c <= static_cast<unsigned char>(kTypeValue));
}
通過上述解碼函數可以将編碼後的internal_key解碼為* result。為了使用友善,源碼中還專門為解碼user_key和type提供了兩個函數。
// Returns the user key portion of an internal key.
inline Slice ExtractUserKey(const Slice& internal_key) {
assert(internal_key.size() >= 8);
return Slice(internal_key.data(), internal_key.size() - 8);
}
inline ValueType ExtractValueType(const Slice& internal_key) {
assert(internal_key.size() >= 8);
const size_t n = internal_key.size();
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
unsigned char c = num & 0xff;
return static_cast<ValueType>(c);
}
四.InternalKeyComparator
InternalKeyComparator是要重點關注的一個比較器,它用來比較内部鍵(Internal Key)。内部鍵值是為了友善處理,将原普通鍵、序列号和值類型組成的新鍵。
先看下Compare函數
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
1)首先比較user_key,如果user_key不相同,就直接傳回比較結果,否則繼續進行第二步。user_comparator_是使用者指定的比較器,在InternalKeyComparator構造時傳入。
2)在user_key相同的情況下,比較sequence_numer|value type然後傳回結果(注意每個Internal Key的sequence_number是唯一的,是以不可能出現anum==bnum的情況)
再看看FindShortestSeparator函數
void InternalKeyComparator::FindShortestSeparator(
std::string* start,
const Slice& limit) const {
// Attempt to shorten the user portion of the key
Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if (tmp.size() < user_start.size() &&
user_comparator_->Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0);
start->swap(tmp);
}
}
1)該函數取出Internal Key中的user_key字段,根據使用者指定的comparator找到短字元串并替換user_start。此時user_start實體上是變短了,但是邏輯上卻變大了,原理詳見第二節的論述。
2)如果user_start被替換了,就用新的user_start更新Internal Key,并使用最大的sequence number。否則start保持不變。
接下來FindShortSuccessor函數
void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
Slice user_key = ExtractUserKey(*key);
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if (tmp.size() < user_key.size() &&
user_comparator_->Compare(user_key, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0);
key->swap(tmp);
}
}
原理上與FindShortestSeparator相同。
參考連結:http://www.jianshu.com/p/42c0e90d8190