# /

# 跳表的结构

image.png
跳表B+树都是存放索引的数据结构,两者有相似之处,可以把跳表也视为分块树:

  • B+树的块是固定的,跳表的块是随机的。
  • B+树只有3层,跳表有32层。
  • B+树是从上往下形成,跳表是从下往上形成。

# Redis跳表

typedef struct zskiplistNode { //表示一个节点
    sds ele;       //value
    double score;  //分数
    struct zskiplistNode *backward;
    struct zskiplistLevel {           //层链表
        struct zskiplistNode *forward;//当前层的下一个节点
        unsigned long span;           //与下一个节点的跨度
    } level[];  //层数组
} zskiplistNode;

typedef struct zskiplist {  //表示一个跳表
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Struct to hold an inclusive/exclusive range spec by score comparison. */
typedef struct {
    double min, max;
    int minex, maxex; /* are min or max exclusive? */
} zrangespec;
1
2
3
4
5
zrange key min max [BYSCORE|BYLEX] [REV] [LIMIT offset count] [WITHSCORES]
1
# 根据rank获取node

每个node都有score、span。

注意:score并不能表示node的真实位置rank,但是span可以。这也是redis跳表的特殊之处。

zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;           //当前节点
    unsigned long traversed = 0;//当前节点rank
    int i;

    x = zsl->header; //从头节点开始遍历
    for (i = zsl->level-1; i >= 0; i--) { //从最上层开始遍历
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank) //下一节点非空 && 下一节点的rank<rank
        {
            traversed += x->level[i].span;//累计当前节点的rank
            x = x->level[i].forward;
        }
        if (traversed == rank) {//当前节点rank=rank,返回当前节点
            return x;
        }
    }
    return NULL;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 根据节点(score与ele)获取rank
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;       //当前节点
    unsigned long rank = 0; //当前节点rank
    int i;

    x = zsl->header; //从头节点开始遍历
    for (i = zsl->level-1; i >= 0; i--) { //从最上层开始遍历
        while (x->level[i].forward &&
            (x->level[i].forward->score < score || //下一节点的score<score
                (x->level[i].forward->score == score && 
                sdscmp(x->level[i].forward->ele,ele) <= 0))) { //下一节点的score=score && 下一节点的ele不=ele
            rank += x->level[i].span; //累计当前节点rank
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && sdscmp(x->ele,ele) == 0) { //当前节点的ele=ele,返回当前rank
            return rank;
        }
    }
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 插入节点(score与ele)

跳表的理想结构是:每隔一个节点生成一个层级节点;模拟二又树结构,以此达到搜索时间复杂度为O(log2n)。但是如果删除节点,可能会破坏理想结构,需要重构理想结构,需要大量运算。于是用概率的方式构造理想结构:每增加一个节点有1/2的概率增加一个层级、1/4的概率增加两个层级、1/8的概率增加三个层级。。当节点数量大于256时,通过概率构造的跳表接近理想结构。这样删除节点,只需从链表中去除即可,无需重构跳表。
Reids的跳表也是采用概率的方式构造,产生层级概率是1/4。当有序集合zset的节点数大于128(或有一个字符串长度大于64)时,使用跳表结构。

注意:插入一个节点时,如果随机层级很大,则每一层都要插入。

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header; //从头节点开始遍历
    for (i = zsl->level-1; i >= 0; i--) { //从最上层开始遍历
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; //i层的rank = i+1层的rank
        while (x->level[i].forward &&
                (x->level[i].forward->score < score || //i层的下一节点score<插入节点score
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0))) //i层的下一节点score=插入节点score && i层的ele不=ele
        {
            rank[i] += x->level[i].span; //累计i层的rank
            x = x->level[i].forward;
        }
        update[i] = x; //说明i层不包含'插入节点',x的下一节点是'插入节点'的存放位置,注意:用update指向这里的x
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) { //如果随机层数>跳表层数,则添加n层
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);              //把'插入节点'构造为节点x(注意:这里的x不是上面的x)
    for (i = 0; i < level; i++) {                    //从最下层开始遍历直到最上层,逐层插入'插入节点'
        x->level[i].forward = update[i]->level[i].forward; //在update[i]后面插入x(即:在上面的x后面插入'插入节点')
        update[i]->level[i].forward = x;                   //

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); //span
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;                //span
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;              //span
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0]; //backward
    if (x->level[0].forward)
        x->level[0].forward->backward = x;                       //backward
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 删除节点(score与ele)
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48