redis源码学习-跳表篇

跳表介绍

跳表全称跳跃列表,是一个经过优化过的有序链表。通过将一条链表构造成多层次结构,将原先O(n)的查找效率提升到O(log(n))。而跳跃一词体现在,在每一层经过尽量少的查找,跳跃到相对应的下一层,实现O(n)->O(log(n))的效率提升。
跳表图示如下:

跳表中有两个重要概念,
步长:
比如一个简单的有序数组1,2,3,4,5,6 。从1到2的步长为1,从1到3的步长为2
层:
单纯的有序链表为一条线,而为了用一定的空间提前获取较粗粒度的索引,根据粒度的粗细新建一层层的顺序关系。
这两个概念结合如上图所示,最下的原型代表着从1,2,3,4,…,n的有序数组。假设这个数组为第0层,那么按照步长为2的逻辑新建一层,在1节点多维护一个next指针,指向下下个节点,那么这一层称作L1。那么在L1层,可以看作是1,3,5,7,9,…,n这样的有序数组。在L1层的基础上,再按照第0层中步长为4的逻辑新建L2层,那么L2层中可以看作是1,5,9,…,n的有序数组,以此类推,建很多层。
如图的结构,只有L3层。那么查找时,假如想知道6在数组哪个位置,只需判断L3层,断定6在1-9中,判断了一次。再看L2层,判断两次发现6在5-9中。在L1层判断了一次,发现6在5-7中。最终回到原有序链表,从5开始遍历,发现6在5之后。从L3-到原链表,判断了5次。
对于数据量越大的链表,跳表的效率就越高。
跳表本身讲清楚后,回到redis,从数据结构上看redis中的跳表到底是什么样的。
redis中跳表分析:

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

zskiplist为跳表。zskiplistNode为跳表中的节点。
先看zskiplistNode。
ele为跳表中每个节点存储的string值,sds功能与string类似,具体差别请看

score为分值。跳表的节点排序是按照分值来的,相同分值的情况按照ele的字典序排列。
backward为这个节点的上一个节点,与跳表无关,用来反向遍历原链表。
level[]就为跳表的属性了。level就是最开始讲跳表结构的L简称,level[0],level[1],level[2]就分别对应L1,L2,L3了。每个level有两个字段,forward为下一个节点地址,span为步长。对照着最开始的图示与文字解释,那么假如再看

这个图的话,假设最开始的这个1,是一个具体的zskiplistNode实例,起名叫做zskiplistNode1,那么zskiplistNode1中的ele就是1,score也看做1,backward就是1的上一个节点,为最后的n。
level[]共有三层,记作level[0],level[1],level[2]。
level[0]中的forward是zskiplistNode3的地址,level[0]中的span就是2了。
level[1]中的
forward是zskiplistNode5的地址,level[0]中的span就是4了。
level[2]中的*forward是zskiplistNode9的地址,level[0]中的span就是8了。
再看zskiplist。
理解了zskiplistNode,再看zskiplist就很简单了。zskiplist只保留两个指针,分别为跳表的头和尾。length为记录整个跳表的节点数量,level代表着整个跳表建完之后的最大L层。
整个跳表的数据结构看完,再看看跳表的增删改查代码实现
跳表的生成依赖随机算法,即每个跳表有多少层,再初始化时根据随机算法随机给个最大层数,按照当前的节点数量开始计算每层步长。
随机算法如下:

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

其中,ZSKIPLIST_MAXLEVEL定义在了宏中,#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */,代表着redis主动限制一个跳表的最大层数上限为32。层数越高,跳表越复杂。在源码中,while((random()&0xFFFF)<(ZSKIPLIST_P *0xFFFF))确定了最终的概率。ZSKIPLIST_P 定义再宏#define ZSKIPLIST_P 0.25中,由此可见,zslRandomLevel得到1的概率为1-0.25,得到2的概率为(1-0.25)0.75,得到3的概率为(1-0.25)0.750.75,以此类推发现,越大的数出现的概率越小,这也叫做幂次定律(powerlaw)
跳表初始化:

/* Create a skiplist node with the specified number of levels.
 * The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    zskiplistNode *zn =
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    //分配内存
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    //创建一个null的zslCreateNode,作为header,后续的节点加入都在null之后
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

插入节点:

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    //update与rank都为辅助数组,负责处理跳表的结构处理。
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    //从高层到低层循环计算节点跨度,保存在update辅助数组里
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            //累加span真正的操作
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        将每层计算好的保存在辅助数组中
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    //根据幂次定律计算新的level层
    level = zslRandomLevel();
    //如果得到的随机level比现在的level最大层还要大的话,那么需要设置最大层为最新的level了
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;//这里是将原有level层以上的rank都初始化为0
            update[i] = zsl->header;//将原有level层以上的update[i]先指向头
            update[i]->level[i].span = zsl->length;//也是初始化原有level层以上的操作,步长直接置为length,就是从头直接到尾,具体新的步长将在下边计算
        }
        zsl->level = level;//更新最大的level层数
    }
    //初始化新的节点,开始做插入操作
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        //设置新节点的下一个节点为每一层最后一个节点的的下个节点
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
   
        /* update span covered by update[i] as x is inserted here */
        //根据update辅助数组更新span跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

可以看出,新增节点时需要变动跳表的结构,小概率会新增层级,由于全部都是修改每个节点的level[]数组,所以就算for和while循环,性能也是很好的。
删除节点:

int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    //辅助数组
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    //和查找类似,从level最高层开始遍历,得到最终位置
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            //通关将原先指向它的节点指向它的下一个,将他从链表中剔除
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    //设置后退指针
    x = x->level[0].forward;
    //在这里再验证是否要删除时当前节点
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}
/* Internal function used by zslDelete, zslDeleteRangeByScore and
 * zslDeleteRangeByRank. */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        //如果这一层涉及到删除的节点为跳表的索引的话,将原先指向需要被删除的节点变成它的下一个节点
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}
//这个调用sds的方式,释放内存
void zslFreeNode(zskiplistNode *node) {
    sdsfree(node->ele);
    zfree(node);
}

查找:

unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        ////只要分值小于score或者分值相等,对象字典序小于给定对象o
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
            //更新rank,并且指向下个节点
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && x->score == score && sdscmp(x->ele,ele) == 0) {
            //最终得到的rank值是第i层找到分值相同,且对象相同时的rank值,否则返回0
            return rank;
        }
    }
    return 0;
}

这里的查找其实是根据score获取排名,具体逻辑看注释。
redis中的跳表相对于其他数据结构而言,逻辑性更复杂,会设定更多的辅助数组来保证span与level和next指针的正确和合理性,同时用到了幂次定律等辅助level的合理值,也因此会存在概率性问题,但也因此,涉及到redis的无论面试或分享中,跳表是重要一环,但也因为复杂性,跳表再redis中的使用场景较少,出现再zset中,涉及到的命令有ZADD ,ZRANGE ,ZSCORE等等。