天天看點

LevelDB源碼分析之二:comparator

這裡的comparator包括抽象類Comparator及其兩個實作類:一個是内置的BytewiseComparatorImpl,另一個是InternalKeyComparator。

一.Comparator

Comparator隻是導出了幾個接口。

class Comparator {
 public:
  virtual ~Comparator();


  // Three-way comparison.  Returns value:
  //   < 0 iff "a" < "b",
  //   == 0 iff "a" == "b",
  //   > 0 iff "a" > "b"
  virtual int Compare(const Slice& a, const Slice& b) const = 0;


  // The name of the comparator.  Used to check for comparator
  // mismatches (i.e., a DB created with one comparator is
  // accessed using a different comparator.
  //
  // The client of this package should switch to a new name whenever
  // the comparator implementation changes in a way that will cause
  // the relative ordering of any two keys to change.
  //
  // Names starting with "leveldb." are reserved and should not be used
  // by any clients of this package.
  //傳回comparator的名字
  virtual const char* Name() const = 0;


  // Advanced functions: these are used to reduce the space requirements
  // for internal data structures like index blocks.


  // If *start < limit, changes *start to a short string in [start,limit).
  // Simple comparator implementations may return with *start unchanged,
  // i.e., an implementation of this method that does nothing is correct.
  // 這兩個函數作用是減少像index blocks這樣的内部資料結構占用的空間。
  // 如果*start < limit,就在[start,limit)中找到一個短字元串,并賦給*start傳回。
  // 當然傳回的*start可能沒變化(*start==limit),此時這個函數相當于啥都沒幹,這也是正确的。
  virtual void FindShortestSeparator(
      std::string* start,
      const Slice& limit) const = 0;


  // Changes *key to a short string >= *key.
  // Simple comparator implementations may return with *key unchanged,
  // i.e., an implementation of this method that does nothing is correct.
  // 将*key變成一個比原*key大的短字元串,并指派給*key傳回。
  virtual void FindShortSuccessor(std::string* key) const = 0;
};
           

二.BytewiseComparatorImpl

BytewiseComparatorImpl是按字典逐位元組序進行比較的,也就是說i>helloworld,因為先比較i和h,i>h,比較結束。

class BytewiseComparatorImpl : public Comparator {
 public:
  BytewiseComparatorImpl() { }


  virtual const char* Name() const {
    return "leveldb.BytewiseComparator";
  }
  // 直接調用Slice的compare函數
  virtual int Compare(const Slice& a, const Slice& b) const {
    return a.compare(b);
  }
  // FindShortestSeparator找到start、limit之間最短的字元串,如“helloworld”和”hellozoomer”之間最短的key可以是”hellox”。
  virtual void FindShortestSeparator(
      std::string* start,
      const Slice& limit) const {
    // 找到共同字首的長度
    size_t min_length = std::min(start->size(), limit.size());
    size_t diff_index = 0;
    while ((diff_index < min_length) &&
           ((*start)[diff_index] == limit[diff_index])) {
      diff_index++;
    }
    // 如果一個字元串是另個一字元串的字首,無需做截短操作,否則進入else。
    if (diff_index >= min_length) {
      // Do not shorten if one string is a prefix of the other
    } else {
      uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
      if (diff_byte < static_cast<uint8_t>(0xff) &&
          diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
        (*start)[diff_index]++;
        start->resize(diff_index + 1);
        assert(Compare(*start, limit) < 0);
      }
    }
  }
  // FindShortSuccessor則更極端,用于找到比key大的最短字元串,如傳入“helloworld”,傳回的key可能是“i”而已。
  virtual void FindShortSuccessor(std::string* key) const {
    // Find first character that can be incremented
    size_t n = key->size();
    for (size_t i = 0; i < n; i++) {
      const uint8_t byte = (*key)[i];
      if (byte != static_cast<uint8_t>(0xff)) {
        (*key)[i] = byte + 1;
        key->resize(i+1);
        return;
      }
    }
    // *key is a run of 0xffs.  Leave it alone.
  }
};
           

三.Internal Key

1.Internal Key的結構。

下面是一個未編碼前,或者說是一個解碼後的Internal Key結構,它由user_key、sequence和type三個字段組成。

struct ParsedInternalKey {
  Slice user_key;
  SequenceNumber sequence;
  ValueType type;


  ParsedInternalKey() { }  // Intentionally left uninitialized (for speed)
  ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
      : user_key(u), sequence(seq), type(t) { }
  std::string DebugString() const;
};
           

2.Internal Key的編碼

源碼中通過InternalKey類封裝了Internal Key的相關操作。編碼的用到的函數如下。

void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
  result->append(key.user_key.data(), key.user_key.size());
  PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}

static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
  assert(seq <= kMaxSequenceNumber);
  assert(t <= kValueTypeForSeek);
  return (seq << 8) | t;
}
           

AppendInternalKey函數先把user_key添加到*result中,然後用PackSequenceAndType函數将sequence和type打包,并将打包的結果添加到*result中。

PutFixed64函數隻是簡單的進行了拷貝,詳見部落格:LevelDB源碼分析之一:coding

PackSequenceAndType函數的原理是先将seq左移8位,然後和t進行或操作,相當于把t放到了seq的低8為。為什麼seq要小于等于kMaxSequenceNumber呢。

因為kMaxSequenceNumber的值如下所示。

typedef uint64_t SequenceNumber;
// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
//0x1ull:u-unsigned 無符号;l-long 長整型,ll就是64位整型。整個0x1ull代表的含義是無符号64位整型常量1,用16進制表示。
static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
           

用二進制表示就是:0000 0000 1111 1111 1111 1111 1111 1111。如果seq大于kMaxSequenceNumber,左移8位的話會移出界。

3.Internal key的解碼

inline bool ParseInternalKey(const Slice& internal_key,
                             ParsedInternalKey* result) {
  const size_t n = internal_key.size();
  if (n < 8) return false;
  // DecodeFixed64函數是PutFixed64函數的逆過程,傳回sequence和type的打包結果,并指派給num。
  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
  // 截取低8位,指派給c
  unsigned char c = num & 0xff;
  // 右移8位,指派給sequence
  result->sequence = num >> 8;
  // 将c轉換為type
  result->type = static_cast<ValueType>(c);
  // 取出user_key
  result->user_key = Slice(internal_key.data(), n - 8);
  return (c <= static_cast<unsigned char>(kTypeValue));
}
           

通過上述解碼函數可以将編碼後的internal_key解碼為* result。為了使用友善,源碼中還專門為解碼user_key和type提供了兩個函數。

// Returns the user key portion of an internal key.
inline Slice ExtractUserKey(const Slice& internal_key) {
  assert(internal_key.size() >= 8);
  return Slice(internal_key.data(), internal_key.size() - 8);
}

inline ValueType ExtractValueType(const Slice& internal_key) {
  assert(internal_key.size() >= 8);
  const size_t n = internal_key.size();
  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
  unsigned char c = num & 0xff;
  return static_cast<ValueType>(c);
}
           

四.InternalKeyComparator

InternalKeyComparator是要重點關注的一個比較器,它用來比較内部鍵(Internal Key)。内部鍵值是為了友善處理,将原普通鍵、序列号和值類型組成的新鍵。

先看下Compare函數

int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
  // Order by:
  //    increasing user key (according to user-supplied comparator)
  //    decreasing sequence number
  //    decreasing type (though sequence# should be enough to disambiguate)
  int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
  if (r == 0) {
    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
    if (anum > bnum) {
      r = -1;
    } else if (anum < bnum) {
      r = +1;
    }
  }
  return r;
}
           

1)首先比較user_key,如果user_key不相同,就直接傳回比較結果,否則繼續進行第二步。user_comparator_是使用者指定的比較器,在InternalKeyComparator構造時傳入。

2)在user_key相同的情況下,比較sequence_numer|value type然後傳回結果(注意每個Internal Key的sequence_number是唯一的,是以不可能出現anum==bnum的情況)

再看看FindShortestSeparator函數

void InternalKeyComparator::FindShortestSeparator(
      std::string* start,
      const Slice& limit) const {
  // Attempt to shorten the user portion of the key
  Slice user_start = ExtractUserKey(*start);
  Slice user_limit = ExtractUserKey(limit);
  std::string tmp(user_start.data(), user_start.size());
  user_comparator_->FindShortestSeparator(&tmp, user_limit);
  if (tmp.size() < user_start.size() &&
      user_comparator_->Compare(user_start, tmp) < 0) {
    // User key has become shorter physically, but larger logically.
    // Tack on the earliest possible number to the shortened user key.
    PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
    assert(this->Compare(*start, tmp) < 0);
    assert(this->Compare(tmp, limit) < 0);
    start->swap(tmp);
  }
}
           

1)該函數取出Internal Key中的user_key字段,根據使用者指定的comparator找到短字元串并替換user_start。此時user_start實體上是變短了,但是邏輯上卻變大了,原理詳見第二節的論述。

2)如果user_start被替換了,就用新的user_start更新Internal Key,并使用最大的sequence number。否則start保持不變。

接下來FindShortSuccessor函數

void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
  Slice user_key = ExtractUserKey(*key);
  std::string tmp(user_key.data(), user_key.size());
  user_comparator_->FindShortSuccessor(&tmp);
  if (tmp.size() < user_key.size() &&
      user_comparator_->Compare(user_key, tmp) < 0) {
    // User key has become shorter physically, but larger logically.
    // Tack on the earliest possible number to the shortened user key.
    PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
    assert(this->Compare(*key, tmp) < 0);
    key->swap(tmp);
  }
}
           

原理上與FindShortestSeparator相同。

參考連結:http://www.jianshu.com/p/42c0e90d8190