July 26, 2017

跳跃链表 Skip List

最近用到了redis的SortedSet数据结构来实现一个排行榜,比较好奇里面的数据结构是怎么样的,在网上查了一下得知是跳跃链表,于是乎自己实现了一番,然后简单分析了一下。

先祭出代码地址:https://github.com/ZhihaoJun/IndexableSkipList

朴素的跳跃链表能够非常高效地进行查找,但是不支持范围查询的需求,此时就需要进行一些小的改进,实现所谓的Indexable Skip List来支持范围查询。

结构

一句话描述就是在链表的基础之上加入了快速通道。如图所示,朴素跳跃链表在链表之上添加了多层的快速通道,在元素之间跳跃,位于上面层中的节点都保有一个指向正对底下节点的引用,而且跳跃链表中的元素排列都是有序的。

跳跃链表基本结构图

S表示哨兵节点,圆圈中的数字表示节点的键值。

Indexable Skip List则在朴素跳跃链表的节点之间加上了距离信息,如下图所示,用来表示中间跳跃了几步。

索引跳跃链表基本结构图

基本操作

在支持的操作上与集合数据结构保持一致:

  • set(key, value) 设置值,不存在就插入,存在就修改
  • remove(key) value 删除值,返回删除的值
  • find(key) value 根据key查询值
  • at(index) value 通过排序位置查找值
  • range(start, stop) [value] 通过排序位置范围查找一系列值

插入

插入就是要在某个高度下的所有层的合适位置都创建一个节点,并且维护节点距离值。

跳跃链表是一个随机数据结构的例子,其随机性就体现在插入操作上。SkipList是分为多层的,在每一次插入的时候,要先随机地决定插入高度,然后在不大于这个高度每层的合适位置插入一个节点。

插入演示

如图演示将要插入的节点18,插入高度是1,注意最底层高度是0

不同插入高度的随机方式可以生成很多不同结构和性质的跳跃链表,这里采用的是丢硬币的方式,硬币朝上的次数决定了插入高度,丢硬币的次数是当前跳跃链表的层数减一。默认的插入高度为0,这样保证了最底层一定会有新节点。当插入高度会覆盖所有的层的时候,就在当前的根节点上面插入一个新的哨兵节点,作为一层。

用python的实现进行描述如下:

def _flip_coin(self):
    return random.randint(0, 1)

def _insert_height(self):
    height = 0
    for i in xrange(self._layers-1):
        if self._flip_coin() == 1:
            height += 1
    return height

整个插入的过程,就是寻找该元素每层中合适位置的过程,这个过程有点像二叉搜索树的寻找过程:比当前节点大就往右,比当前节点小就往左。在跳跃链表中稍有不同,描述出来就是,右边节点键值比插入的键值小,就往右走,不能往右走了就是合适的位置,此时再到下一层去继续插入。

插入演示

用上面的图中蓝色路径就是搜寻路径。

  • 标记a处的下一个节点不是None,而且键值小于18向右走
  • 标记b处下一个节点的键值42大于18,此处是合适的插入位置,但是由于这一层高度为2,不进行插入,走到下一层
  • 标记c处下一个节点是23大于18,是合适的插入位置,此层高度为1,进行插入,并且走到下一层
  • 标记d处下一个节点是23大于18,是合适的插入位置,此层高度为0,进行插入,把上一层插入的节点18链接到新插入的节点18,并且走到下一层,此时cur已经为None,终止搜寻。

在这里既可以一直进行插入操作,实现能够插入多个相同键值的需求;也可以在右边键值与插入键值相等时候,进行修改操作,这样键值就唯一了。

后面的工作就是需要维护节点的距离值。当进行插入之后,可以发现距离值需要更新的节点都是向下拐的节点,如图中的标记b、c和d。

节点的距离值分为两种情况。

  • 如果这一层有插入新节点,则距离值=InsertedIdx-NodeIdx
  • 如果这一层没有插入新节点,则距离值=InsertedIdx-NodeIdx+1

在搜寻的过程中保存一个变量dis累加向右跨跃的距离,就很容易可以得出

  • 结束搜寻后InsertedIdx = dis + 1
  • 搜寻到某个节点之后NodeIdx = dis

在这里,哨兵节点的idx为0,因为在结束之后才能得出InsetedIdx,所以需要在搜寻过程中先保存需要维护的节点,搜寻结束之后再进行维护。

完整代码如下

def set(self, key, value):
  insert_height = self._insert_height()
  cur = self._head
  dis = 0
  if insert_height >= self._layers-1:
    # insert a sentinel head above
    new_head = SkipListNode(0, 0, self._layers, True)
    new_head.down = self._head
    self._layers += 1
    self._head = new_head

  last_inserted = None
  to_update = []
  to_update_idx = []
  to_update_inserted = []

  while 1:
    if cur.next is None or cur.next.key >= key:
      to_update.append(cur)
      to_update_idx.append(dis)
      to_update_inserted.append(False)

      if cur.next is not None and cur.next.key == key:
        # modify
        cur.next.value = value
      elif cur.height <= insert_height:
        # insert
        new_node = self._insert_here(cur, key, value)
        if last_inserted is not None:
          last_inserted.down = new_node
        last_inserted = new_node
        to_update_inserted[-1] = True

      cur = cur.down
    else: # cur.next.key < key
      dis += cur.dis()
      cur = cur.next

    if cur is None:
      break

  new_node_idx = dis + 1

  # update dis
  for i, n in enumerate(to_update):
    if to_update_inserted[i]:
      n.set_dis(new_node_idx - to_update_idx[i])
    else:
      n.set_dis(new_node_idx - to_update_idx[i] + 1)
    
  self._size += 1

删除

删除和插入操作是相反对立的操作,就是把跳跃链表中所有和键值相等的节点都移除掉,并且维护好距离值,它们在代码结构上非常的相似,也是通过搜寻来查找相应的节点并移除,这里需要注意总是拿当前节点(cur)的下一个节点(cur.next)键值和要删除的键值进行比较。

维护节点的距离值,也跟插入有相似的方法,如下图中同样所有需要维护的节点是向下走的节点,同时这些节点被分为两类:

  • 这一层中有节点被移除了,那么这个节点的距离值就应该是cur_dis + removed_dis-1,removed_dis表示被移除节点的距离值
  • 这一层中没有节点被移除,那么这个节点的距离值是cur_dis-1
def remove(self, key):
  cur = self._head
  to_update = []
  dis = 0

  while 1:
    if cur.next is None or cur.next.key >= key:
      to_update.append(cur)

      if cur.next is not None and cur.next.key == key:
        # remove
        removed = cur.next
        cur.next = removed.next
        removed.down = None
        cur.set_dis(cur.dis() + removed.dis() - 1)
      elif cur.next is not None:
        cur.set_dis(cur.dis() - 1)
        
      cur = cur.down
    else: # cur.next.key < key
      dis += cur.dis()
      cur = cur.next

    if cur is None:
      break

  self._size -= 1

查找

普通的查找就非常的简单,同样也是搜寻流程。

def find(self, key):
  cur = self._head
  while 1:
    if cur.is_sentinel():
      if cur.next is None or cur.next.key > key:
        cur = cur.down
      else:
        # ccondition is cur.next.key <= key
        cur = cur.next
    elif cur.key == key:
      return cur.value
    else:
      # there is no cur.key > key condition
      if cur.next is None or cur.next.key > key:
        cur = cur.down
      else:
        # ccondition is cur.next.key <= key
        cur = cur.next

    if cur is None:
      raise KeyError('key {} not found'.format(key))

查找还有一种需求,就是通过下标来进行查找。因为跳跃链表维护的节点都是有序的,所以也可以说是查找排在n位置的元素。

可以通过修改搜寻过程的条件来完成,一开始维护一个cur_idx表示当前节点的位置,如果右边节点不存在或者向右迈一步超过了位置n,就往下一层走,否则就向右并且增加cur_idx的值。

def _at(self, idx):
  if idx >= self._size:
    raise IndexError('index out of range')

  cur = self._head
  cur_idx = -1
  while 1:
    if cur.next is None or cur_idx + cur.dis() > idx:
      cur = cur.down
    else:
      cur_idx += cur.dis()
      cur = cur.next
      if cur_idx == idx:
        return cur
      
    if cur is None:
      break
  raise IndexError('index out of range')

def at(self, idx):
  return self._at(idx).value

范围查找

当实现了按照位置查找节点了之后,范围查找就非常的方便,首先通过已经实现好的_at函数找到范围起始的节点,然后“下沉”到链表的最低层,逐步返回右边的节点元素直到到了范围结尾。

def range(self, start_idx, end_idx):
  start_node = self._at(start_idx)
  dis = 0
  # sink down
  while start_node.down is not None:
    start_node = start_node.down

  cur = start_node
  result = []
  while cur is not None and dis <= end_idx - start_idx:
    result.append(cur.value)
    cur = cur.next
    dis += 1
  return result

后记

确实发现,无论是自己在实现还是把这个数据结构仔细的描述出来都是一件不容易的事情,想简洁又想通俗易懂真的有点难,希望上面的章节描述清楚了跳跃链表(捂脸。