Leveldb之LRUCache实现

leveldb代码读完也有好一阵子了,其实早想好好的写博客从头到尾系统总结一遍,不过它真的是写的太好了,任何一个地方单拿出来都可以写好多,想向TCMalloc那样自下而上写一个系列的话我估计写30篇都写不完…所以就挑着一些主题写写吧,有些是和leveldb强相关的比如迭代器,compact的实现,有些是相对独立、可以用来学习编程的比如LRUCache、SkipList的实现,今天就先说说LRUCache的实现吧

介绍

leveldb在Table和VersionSet的实现中用到了LRUCache,分别给Block和Table加了缓存,加快访问速度,并且当缓存满了淘汰最近最少使用的那个。这篇先不管什么是Block什么是Table,就单纯的来看看levedb的LRUCache是怎么实现的

原理

LRUCache想必都不陌生,就是一个缓存,不过既然是缓存肯定容量有限,当缓存满了后,需要淘汰一个最近最少使用的item。怎么实现呢?也不难,无非是一个HashTable配合一个List就可以了,HashTable用来存item,List按每个item最近使用的先后顺序将所有item排序,tail的item是最近使用的,head的item是最久没使用的,要淘汰就从head开始,当用户访问完某个item,就将该item从List中拿出来再插到最尾,这样就是一个LRUCache的实现啦

实现

那么leveldb是怎么实现的呢?其实和上面说的差不多,不过具体实现中还是可以看出大牛的功底,一点一点来看,先说说item的实现,在leveldb中,HashTable中保存的item叫做LRUHandle,定义如下:

struct LRUHandle {
  void* value;
  void (*deleter)(const Slice&, void* value); // 清理回调函数,用来在释放时做资源回收
  LRUHandle* next_hash; // hash冲突时线性探测
  LRUHandle* next; // List的next
  LRUHandle* prev; // List的prev
  size_t charge;      // TODO(opt): Only allow uint32_t?
  size_t key_length; //key的长度
  uint32_t refs; // 引用计数
  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
  char key_data[1];   // Beginning of key

  Slice key() const {
    // For cheaper lookups, we allow a temporary Handle object
    // to store a pointer to a key in "value".
    if (next == this) {
      return *(reinterpret_cast<Slice*>(value));
    } else {
      return Slice(key_data, key_length);
    }
  }
};

next_hash是在hash冲突时线性探测后续LRUHandle用到的,next和prev是记录List的节点信息,我们发现leveldb在具体实现中,将上一节说到的HashTable中的item和List中的节点揉在了一起,这样HashTable中的每个元素不仅记录着冲突hash值的下一个元素,还记录着LRU顺序

LRUHandle定义完了,接下来就是HashTable的实现了,在levledb叫HandleTable,代码如下:

class HandleTable {
 public:
  HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
  ~HandleTable() { delete[] list_; }

  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
    return *FindPointer(key, hash);
  }

  LRUHandle* Insert(LRUHandle* h) {
    LRUHandle** ptr = FindPointer(h->key(), h->hash); // 先在当前HandleTable中
    //查找有没有包含h->key的LRUHandle,如果有的话则返回保存该LRUHandle地址的内
    //存地址,也就是该LRUHandle的前一个LRUHandle的next字段地址
    LRUHandle* old = *ptr; //对上一步返回的ptr解引用得到对应LRUHandle地址,也
    //就是之前存在的LRUHandle地址,赋值给old
    h->next_hash = (old == NULL ? NULL : old->next_hash); // 将h的next_hash赋
    //值为old的next_hash
    *ptr = h; //将ptr的值也就是对应LRUHandle前一个LRUHandle的next字段赋值为h,
    //这样就将h插入到了桶中,old移出桶
    if (old == NULL) {
      ++elems_;
      if (elems_ > length_) {
        //当前存的元素个数大于桶的个数,扩容
        // Since each cache entry is fairly large, we aim for a small
        // average linked list length (<= 1).
        Resize();
      }
    }
    return old;
  }

  LRUHandle* Remove(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = FindPointer(key, hash);
    LRUHandle* result = *ptr;
    if (result != NULL) {
      *ptr = result->next_hash;
      --elems_;
    }
    return result;
  }
  
  private:
  // The table consists of an array of buckets where each bucket is
  // a linked list of cache entries that hash into the bucket.
  uint32_t length_; // hash桶数
  uint32_t elems_; // 当前有多少个元素
  LRUHandle** list_; // 桶数组

  // Return a pointer to slot that points to a cache entry that
  // matches key/hash.  If there is no such cache entry, return a
  // pointer to the trailing slot in the corresponding linked list.
  // 通过hash值在对应桶中查找key,注意:如果存在则返回保存改LRUHandle地址的
  //内存地址,不存在则返回空
  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = &list_[hash & (length_ - 1)];
    while (*ptr != NULL &&
           ((*ptr)->hash != hash || key != (*ptr)->key())) {
      ptr = &(*ptr)->next_hash;
    }
    return ptr;
  }
  // 扩容
  void Resize() {
    uint32_t new_length = 4;
    while (new_length < elems_) {
      new_length *= 2;
    }
    LRUHandle** new_list = new LRUHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);
    uint32_t count = 0;
    for (uint32_t i = 0; i < length_; i++) {
      LRUHandle* h = list_[i];
      while (h != NULL) {
        LRUHandle* next = h->next_hash;
        uint32_t hash = h->hash;
        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    assert(elems_ == count);
    delete[] list_;
    list_ = new_list;
    length_ = new_length;
  }
};

实现不难,不过有一点值得特殊说一下,自己理解的单向链表的Insert操作一般可以分成两步,第一步是FindPoint,来找到要插入的位置cur,第二步是在FindPoint返回的位置cur之前插入一个新的元素,这里似乎有点问题,因为一般链表的插入需要拿到cur的前一个元素并修改其next指针,但FindPoint只返回的cur,并没有返回cur的前一个元素地址,难道要返回两个指针吗?其实不用,leveldb是这么做的,FindPoint返回的并不是要插入位置元素的地址,而是保存改地址内存的地址,也就是cur前一个元素next字段的地址ptr,这样就一举两得了,通过*ptr可以找到cur,通过修改ptr->next可以直接修改前一个元素的next字段,很巧妙

好了,HandleTable实现完了,现在就可以实现LRUCache了,代码如下:

// A single shard of sharded cache.
class LRUCache {
 public:
  LRUCache();
  ~LRUCache();

  // Separate from constructor so caller can easily make an array of LRUCache
  void SetCapacity(size_t capacity) { capacity_ = capacity; }

  // Like Cache methods, but with an extra "hash" parameter.
  Cache::Handle* Insert(const Slice& key, uint32_t hash,
                        void* value, size_t charge,
                        void (*deleter)(const Slice& key, void* value));
  Cache::Handle* Lookup(const Slice& key, uint32_t hash);
  void Release(Cache::Handle* handle);
  void Erase(const Slice& key, uint32_t hash);

 private:
  void LRU_Remove(LRUHandle* e); //修改e的next和prev指针,从lru_中移除
  void LRU_Append(LRUHandle* e); //修改e的next和prev指针,插入lru_中
  void Unref(LRUHandle* e); //减少e的引用计数,如果为0就释放e

  // Initialized before use.
  size_t capacity_;

  // mutex_ protects the following state.
  port::Mutex mutex_;
  size_t usage_;

  // Dummy head of LRU list.
  // lru.prev is newest entry, lru.next is oldest entry.
  LRUHandle lru_; //lru list的head节点,今后prev指向最新节点,next指向
  //最老节点,每次淘汰next,初始都指向自己

  HandleTable table_; //包含一个HandleTable
};

void LRUCache::LRU_Remove(LRUHandle* e) {
  e->next->prev = e->prev;
  e->prev->next = e->next;
}

void LRUCache::LRU_Append(LRUHandle* e) {
  // Make "e" newest entry by inserting just before lru_
  e->next = &lru_;
  e->prev = lru_.prev;
  e->prev->next = e;
  e->next->prev = e;
}

Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  //从HandleTable中查找是否存在key的LRUHandler
  LRUHandle* e = table_.Lookup(key, hash);
  if (e != NULL) {
    //因为要返回给用户,所以引用数+1
    e->refs++;
    //从lru_中移除
    LRU_Remove(e);
    //插入到lru_的prev位置,这样e的淘汰优先级就最低
    LRU_Append(e);
  }
  return reinterpret_cast<Cache::Handle*>(e);
}

void LRUCache::Release(Cache::Handle* handle) {
  MutexLock l(&mutex_);
  Unref(reinterpret_cast<LRUHandle*>(handle));
}

实现的很清晰,重点再看一下Insert:

Cache::Handle* LRUCache::Insert(
    const Slice& key, uint32_t hash, void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value)) {
  MutexLock l(&mutex_);
  
  //LRUHandle中key_data是最后一个元素,定义是char[1],这里有一个技巧,就是
  //定义中虽然key_data长度为1个字节,但在实际生成时,malloc的空间大小是
  //sizeof(LRUHandle)-1 + key.size(),刚好key_data是最后一个元素,这样从它
  //开始以后的空间是实际存放key的位置,对于结构体中变长字符数组的定义可以
  //学习这样的方式,很不错
  LRUHandle* e = reinterpret_cast<LRUHandle*>(
      malloc(sizeof(LRUHandle)-1 + key.size()));
  e->value = value;
  e->deleter = deleter;
  e->charge = charge;
  e->key_length = key.size();
  e->hash = hash;
  e->refs = 2;  // One from LRUCache, one for the returned handle,引用计数
  //初始为2,一个是LRUCache在用,一个是返回给用户
  memcpy(e->key_data, key.data(), key.size());
  //插入到lru_的prev位置,最低淘汰优先级
  LRU_Append(e);
  usage_ += charge;

  LRUHandle* old = table_.Insert(e); //将e插入到HandleTable中,返回的是存放
  //该key老值的LRUHandle指针
  if (old != NULL) {
    //老的LRUHandle已经没用,从lru_中移除
    LRU_Remove(old);
    //减少引用,为0时释放
    Unref(old);
  }
  //如果已经存满,则淘汰最近最少使用的LRUHandle,也就是lur_的next所指
  while (usage_ > capacity_ && lru_.next != &lru_) {
    LRUHandle* old = lru_.next;
    LRU_Remove(old); //从lru_中移除
    table_.Remove(old->key(), old->hash); //从HandleTable中移除
    Unref(old); //减少引用
  }

  return reinterpret_cast<Cache::Handle*>(e);
}

关键位置已经注释了,不难,至此,LRUCache已经实现差不多了,这时候已经可以结束了,不过leveldb在上面又封了一层ShardedLRUCache,其实就是包了多了LRUCache,这样每个key会定位两次,第一次是用hash值的高4位做shard,算出具体该访问哪个LRUCache,第二次同样的hash值访问LRUCache的某个桶,就是一层封装,不难,直接看实现:

static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits;

class ShardedLRUCache : public Cache {
 private:
  LRUCache shard_[kNumShards]; // 16个LRUCache
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
    return Hash(s.data(), s.size(), 0);
  }

  static uint32_t Shard(uint32_t hash) {
    //hash值的高4位做shard
    return hash >> (32 - kNumShardBits);
  }

 public:
  explicit ShardedLRUCache(size_t capacity)
      : last_id_(0) {
    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
    for (int s = 0; s < kNumShards; s++) {
      shard_[s].SetCapacity(per_shard);
    }
  }
  virtual ~ShardedLRUCache() { }
  virtual Handle* Insert(const Slice& key, void* value, size_t charge,
                         void (*deleter)(const Slice& key, void* value)) {
    const uint32_t hash = HashSlice(key); //求hash
    return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter); //插入
    //到对应的LURCache中
  }
  virtual Handle* Lookup(const Slice& key) {
    const uint32_t hash = HashSlice(key); //求hash
    return shard_[Shard(hash)].Lookup(key, hash); //在对应LRUCache中查找
  }
  virtual void Release(Handle* handle) {
    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
    shard_[Shard(h->hash)].Release(handle);
  }
  virtual void Erase(const Slice& key) {
    const uint32_t hash = HashSlice(key);
    shard_[Shard(hash)].Erase(key, hash);
  }
  virtual void* Value(Handle* handle) {
    return reinterpret_cast<LRUHandle*>(handle)->value;
  }
  virtual uint64_t NewId() { //这个是leveldb给block_cache用的,这里可以不用关心
    MutexLock l(&id_mutex_);
    return ++(last_id_);
  }

总结

LRUCache看似简单,实现起来的方式确很多,本篇介绍了leveldb关于LRUCache的实现,还是非常经典的,值得学习,另外levedb主题的博客我会持续写下去,不过内容真的太多了,下篇先写写迭代器相关的东西吧^^