redis源码学习-t_zset篇

t_zset介绍

t_zset为redis中有序集合的容器,与t_set类似,但提供了排名与排序等复杂操作。底层数据结构为在6以后为listpack,dict+skiplist,删除了性能较差的ziplist(在t_hash中也将ziplist用更高性能的listpack代替),宣告着ziplist正式退出中层容器的舞台。
底层数据结构在



中进行了详解,不熟悉的同学可以先看这三篇文章。

t_zset详解

t_zset编码方式

在6之后使用listpack与(dict+skiplist),编码方式为OBJ_ENCODING_LISTPACK与OBJ_ENCODING_SKIPLIST

# Similarly to hashes and lists, sorted sets are also specially encoded in
# order to save a lot of space. This encoding is only used when the length and
# elements of a sorted set are below the following limits:
zset-max-listpack-entries 128
zset-max-listpack-value 64

listpack编码方式:首次add时当zset-max-listpack-entries为0或者zset长度小于zset-max-listpack-value时,在之后zset数据小于zset-max-listpack-value且zset元素个数小于zset-max-listpack-entries时
skiplist编码方式:首次add时zset_max_listpack_entries不为0且zset_max_listpack_entries为0且zset长度大于zset-max-listpack-value,在之后zset数据大于zset-max-listpack-value或zset元素个数大于zset-max-listpack-entries时

t_zset中负责处理的具体命令

  1. zadd:向集合添加成员或更新score
  2. zrem:移除集合中的成员
  3. zcard:获取集合的成员数
  4. zcount:计算在集合中指定区间分数的成员数
  5. zincrby:对指定成员的分数加减
  6. zinter:计算给定的集合的交集
  7. zinterstore:计算给定的集合的交集并存储
  8. zdiff:计算给定的集合的差集
  9. zdiffstore:计算给定的集合的差集并存储
  10. zunion:计算给定的集合的并集
  11. zunionstore:计算给定的集合的并集并存储
  12. zrange:获取集合指定区间内的成员
  13. zrangebylex:通过字典区间返回集合的成员
  14. zrangebyscore:通过分数返回集合指定区间内的成员
  15. zrank:返回集合中指定成员的索引
  16. zremrangebylex:通过字典区间移除集合的成员
  17. zremrangebyrank:移除集合中给定的排名区间的所有成员
  18. zremrangebyscore:移除集合中给定的分数区间的所有成员
  19. zrevrange:返回集中指定区间内的成员,并且通过索引,分数从高到低排序
  20. zrevengebyscore:返回集中指定分数区间内的成员,分数从高到低排序
  21. zrevrank:返回集合中指定成员的排名,有序集成员按分数从高到低排序
  22. zscore:返回集合中成员的score
  23. zscan:迭代集合中的成员

t_zset源码分析

tzset中提供了zaddGenericCommand,zremrangeGenericCommand,zmpopGenericCommand等通用函数用来负责整体的add与remove的落库与通知,pub/sub等操作,zrangeGenericCommand用来通用遍历,同时也与t_set类似,提供了zunionInterDiffGenericCommand负责交叉并集的统一处理。
zaddGenericCommand

/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements, ch = 0;
    int scoreidx = 0;
    /* The following vars are used in order to track what the command actually
     * did during the execution, to reply to the client and to trigger the
     * notification of keyspace change. */
    //用来标记不同的操作记录
    int added = 0;      /* Number of new elements added. */
    int updated = 0;    /* Number of elements with updated score. */
    int processed = 0;  /* Number of elements processed, may remain zero with
                           options like XX. */

    /* Parse options. At the end 'scoreidx' is set to the argument position
     * of the score of the first score-element pair. */
    //处理命令解析,zset由于有分数的概念,同时支持多个元素的操作,因此arg个数将有可能很多,需要统一进行处理
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
        else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
        else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
        else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
        else break;
        scoreidx++;
    }

    /* Turn options into simple to check vars. */
    //与上边的arg解析连贯起来,用来解析后转为变量供之后的逻辑调用
    int incr = (flags & ZADD_IN_INCR) != 0;
    int nx = (flags & ZADD_IN_NX) != 0;
    int xx = (flags & ZADD_IN_XX) != 0;
    int gt = (flags & ZADD_IN_GT) != 0;
    int lt = (flags & ZADD_IN_LT) != 0;

    /* After the options, we expect to have an even number of args, since
     * we expect any number of score-element pairs. */
    //计算命令中元素与分值的总数
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReplyErrorObject(c,shared.syntaxerr);
        return;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    /* Check for incompatible options. */
    //由于nx与xx是互斥的逻辑,因此需要这里判断并且提前结束
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    //gt lt nx三者互斥,,因此需要这里判断并且提前结束
    if ((gt && nx) || (lt && nx) || (gt && lt)) {
        addReplyError(c,
            "GT, LT, and/or NX options at the same time are not compatible");
        return;
    }
    /* Note that XX is compatible with either GT or LT */
    //incr只能操作一个元素
    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. */
    //申请内存空间,这里用来缓存所有的分数
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        //如果格式不正确的话,统一goto到清理行进行统一的free
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    //开始检查zset的键是否存在,类型不符也一样统一goto到清理行进行统一的free
    zobj = lookupKeyWrite(c->db,key);
    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        /*判断首次add时的编码格式
        if (server.zset_max_listpack_entries == 0 ||
            server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetListpackObject();
        }
        dbAdd(c->db,key,zobj);
    }

    for (j = 0; j < elements; j++) {
        double newscore;
        score = scores[j];
        int retflags = 0;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        //zsetAdd相对复杂些,需要处理好score的结构
        int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_OUT_ADDED) added++;
        if (retflags & ZADD_OUT_UPDATED) updated++;
        if (!(retflags & ZADD_OUT_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReplyNull(c);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        //统一处理通知与pub sub模块
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

统一包装后,整体的if判断相对较多,需要处理nx与xx互斥,gt lt nx三者互斥的关系,整体zset的逻辑函数也很多,由于本身score的特殊性质,为预留排名导致无法直接调用底层listpack与skiplist的包装函数,也因此z_set中代码复杂度相对较高
zsetadd底层逻辑

/* Add a new element or update the score of an existing element in a sorted
 * set, regardless of its encoding.
 *
 * The set of flags change the command behavior. 
 *
 * The input flags are the following:
 *
 * ZADD_INCR: Increment the current element score by 'score' instead of updating
 *            the current element score. If the element does not exist, we
 *            assume 0 as previous score.
 * ZADD_NX:   Perform the operation only if the element does not exist.
 * ZADD_XX:   Perform the operation only if the element already exist.
 * ZADD_GT:   Perform the operation on existing elements only if the new score is 
 *            greater than the current score.
 * ZADD_LT:   Perform the operation on existing elements only if the new score is 
 *            less than the current score.
 *
 * When ZADD_INCR is used, the new score of the element is stored in
 * '*newscore' if 'newscore' is not NULL.
 *
 * The returned flags are the following:
 *
 * ZADD_NAN:     The resulting score is not a number.
 * ZADD_ADDED:   The element was added (not present before the call).
 * ZADD_UPDATED: The element score was updated.
 * ZADD_NOP:     No operation was performed because of NX or XX.
 *
 * Return value:
 *
 * The function returns 1 on success, and sets the appropriate flags
 * ADDED or UPDATED to signal what happened during the operation (note that
 * none could be set if we re-added an element using the same score it used
 * to have, or in the case a zero increment is used).
 *
 * The function returns 0 on error, currently only when the increment
 * produces a NAN condition, or when the 'score' value is NAN since the
 * start.
 *
 * The command as a side effect of adding a new element may convert the sorted
 * set internal encoding from listpack to hashtable+skiplist.
 *
 * Memory management of 'ele':
 *
 * The function does not take ownership of the 'ele' SDS string, but copies
 * it if needed. */
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
    /* Turn options into simple to check vars. */
    //需要注意 nx与xx的互斥,gt lt nx之间的互斥
    int incr = (in_flags & ZADD_IN_INCR) != 0;
    int nx = (in_flags & ZADD_IN_NX) != 0;
    int xx = (in_flags & ZADD_IN_XX) != 0;
    int gt = (in_flags & ZADD_IN_GT) != 0;
    int lt = (in_flags & ZADD_IN_LT) != 0;
    *out_flags = 0; /* We'll return our response flags. */
    double curscore;

    /* NaN as input is an error regardless of all the other parameters. */
    if (isnan(score)) {
        *out_flags = ZADD_OUT_NAN;
        return 0;
    }

    /* Update the sorted set according to its encoding. */
    //listpack的数据结构
    if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
        unsigned char *eptr;

        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
            //假如找到了相同元素
            /* NX? Return, same element already exists. */
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            /* Prepare the score for the increment if needed. */
            //incr需要增加分数
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            }

            /* GT/LT? Only update if score is greater/less than current. */
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            if (newscore) *newscore = score;

            /* Remove and re-insert when score changed. */
            //如果score不同的话,那么就先删除然后再添加,达到更新的操作
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        } else if (!xx) {
            /* Optimize: check if the element is too large or the list
             * becomes too long *before* executing zzlInsert. */
            //如果为新的,就需要考虑编码转换的问题,但是需要先insert之后再判断
            zobj->ptr = zzlInsert(zobj->ptr,ele,score);
            if (zzlLength(zobj->ptr) > server.zset_max_listpack_entries ||
                sdslen(ele) > server.zset_max_listpack_value)
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            if (newscore) *newscore = score;
            *out_flags |= ZADD_OUT_ADDED;
            return 1;
        } else {
            *out_flags |= ZADD_OUT_NOP;
            return 1;
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        //先从dict中判断是否存在,在dict中的hash查询效率比skiplist要快很多
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            curscore = *(double*)dictGetVal(de);

            /* Prepare the score for the increment if needed. */
            //分数增加
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            }

            /* GT/LT? Only update if score is greater/less than current. */
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            if (newscore) *newscore = score;

            /* Remove and re-insert when score changes. */
            if (score != curscore) {
                //更新跳表中的score
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);
                /* Note that we did not removed the original element from
                 * the hash table representing the sorted set, so we just
                 * update the score. */
                //更新dict中的score
                dictGetVal(de) = &znode->score; /* Update score ptr. */
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        } else if (!xx) {
            ele = sdsdup(ele);
            //无需编码转换,需要同时add dict与跳表
            znode = zslInsert(zs->zsl,score,ele);
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *out_flags |= ZADD_OUT_ADDED;
            if (newscore) *newscore = score;
            return 1;
        } else {
            *out_flags |= ZADD_OUT_NOP;
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

可以看出,listpack相对较简单,而dict与skiplist需要冗余存储,通过牺牲一定的内存空间换取极高的查询与排序效率,用空间换时间,将排序与范围查询优化到O(log(n))的级别
zdel

/* Delete the element 'ele' from the sorted set, returning 1 if the element
 * existed and was deleted, 0 otherwise (the element was not there). */
int zsetDel(robj *zobj, sds ele) {
    //简单的判断编码,然后调用不同的底层数据结构的del操作
    if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
        unsigned char *eptr;

        if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
            zobj->ptr = zzlDelete(zobj->ptr,eptr);
            return 1;
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        if (zsetRemoveFromSkiplist(zs, ele)) {
            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* No such element found. */
}

t_zset总结

t_zset中较为重要的是dict与skiplist的结合。将元素冗余存储在dict与skiplist,牺牲了部分的空间利用率,但是得到了极高的查询与排序效率。
dict负责查询元素的score,而skiplist用来根据score查询元素,排名也在skiplist中负责。add时由于dict的搜寻性能更高,会优先查询dict来判断元素是否存在。
dict+skiplist结合后,可以很快的先在dict中由数据查到score,然后根据score到skiplist中反查,查到数据后直接就可以获取排名,很巧妙的通过两者不同的底层数据结构将复杂的排名操作优化到了O(log n),也因此redis在大版本升级时只替换了ziplist的底层数据结构。也由于dict+skiplist的结合,在add的时候,对于编码为OBJ_ENCODING_SKIPLIST的逻辑操作相对较多一些,整体add函数相对复杂。需要了解dict与skiplist后才能明白两者分别在OBJ_ENCODING_SKIPLIST中的优势点,才能理解zscore,zrevrange,zrevrank的高效实现。