redis源码学习-t_zset篇
0 条评论redis源码学习-t_zset篇
t_zset介绍
t_zset为redis中有序集合的容器,与t_set类似,但提供了排名与排序等复杂操作。底层数据结构为在6以后为listpack,dict+skiplist,删除了性能较差的ziplist(在t_hash中也将ziplist用更高性能的listpack代替),宣告着ziplist正式退出中层容器的舞台。
底层数据结构在
中进行了详解,不熟悉的同学可以先看这三篇文章。
t_zset详解
t_zset编码方式
在6之后使用listpack与(dict+skiplist),编码方式为OBJ_ENCODING_LISTPACK与OBJ_ENCODING_SKIPLIST
# Similarly to hashes and lists, sorted sets are also specially encoded in
# order to save a lot of space. This encoding is only used when the length and
# elements of a sorted set are below the following limits:
zset-max-listpack-entries 128
zset-max-listpack-value 64
listpack编码方式:首次add时当zset-max-listpack-entries为0或者zset长度小于zset-max-listpack-value时,在之后zset数据小于zset-max-listpack-value且zset元素个数小于zset-max-listpack-entries时
skiplist编码方式:首次add时zset_max_listpack_entries不为0且zset_max_listpack_entries为0且zset长度大于zset-max-listpack-value,在之后zset数据大于zset-max-listpack-value或zset元素个数大于zset-max-listpack-entries时
t_zset中负责处理的具体命令
- zadd:向集合添加成员或更新score
- zrem:移除集合中的成员
- zcard:获取集合的成员数
- zcount:计算在集合中指定区间分数的成员数
- zincrby:对指定成员的分数加减
- zinter:计算给定的集合的交集
- zinterstore:计算给定的集合的交集并存储
- zdiff:计算给定的集合的差集
- zdiffstore:计算给定的集合的差集并存储
- zunion:计算给定的集合的并集
- zunionstore:计算给定的集合的并集并存储
- zrange:获取集合指定区间内的成员
- zrangebylex:通过字典区间返回集合的成员
- zrangebyscore:通过分数返回集合指定区间内的成员
- zrank:返回集合中指定成员的索引
- zremrangebylex:通过字典区间移除集合的成员
- zremrangebyrank:移除集合中给定的排名区间的所有成员
- zremrangebyscore:移除集合中给定的分数区间的所有成员
- zrevrange:返回集中指定区间内的成员,并且通过索引,分数从高到低排序
- zrevengebyscore:返回集中指定分数区间内的成员,分数从高到低排序
- zrevrank:返回集合中指定成员的排名,有序集成员按分数从高到低排序
- zscore:返回集合中成员的score
- zscan:迭代集合中的成员
t_zset源码分析
tzset中提供了zaddGenericCommand,zremrangeGenericCommand,zmpopGenericCommand等通用函数用来负责整体的add与remove的落库与通知,pub/sub等操作,zrangeGenericCommand用来通用遍历,同时也与t_set类似,提供了zunionInterDiffGenericCommand负责交叉并集的统一处理。
zaddGenericCommand
/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements, ch = 0;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
//用来标记不同的操作记录
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
//处理命令解析,zset由于有分数的概念,同时支持多个元素的操作,因此arg个数将有可能很多,需要统一进行处理
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
else break;
scoreidx++;
}
/* Turn options into simple to check vars. */
//与上边的arg解析连贯起来,用来解析后转为变量供之后的逻辑调用
int incr = (flags & ZADD_IN_INCR) != 0;
int nx = (flags & ZADD_IN_NX) != 0;
int xx = (flags & ZADD_IN_XX) != 0;
int gt = (flags & ZADD_IN_GT) != 0;
int lt = (flags & ZADD_IN_LT) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
//计算命令中元素与分值的总数
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */
/* Check for incompatible options. */
//由于nx与xx是互斥的逻辑,因此需要这里判断并且提前结束
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
//gt lt nx三者互斥,,因此需要这里判断并且提前结束
if ((gt && nx) || (lt && nx) || (gt && lt)) {
addReplyError(c,
"GT, LT, and/or NX options at the same time are not compatible");
return;
}
/* Note that XX is compatible with either GT or LT */
//incr只能操作一个元素
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
//申请内存空间,这里用来缓存所有的分数
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
//如果格式不正确的话,统一goto到清理行进行统一的free
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist. */
//开始检查zset的键是否存在,类型不符也一样统一goto到清理行进行统一的free
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
/*判断首次add时的编码格式
if (server.zset_max_listpack_entries == 0 ||
server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetListpackObject();
}
dbAdd(c->db,key,zobj);
}
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = 0;
ele = c->argv[scoreidx+1+j*2]->ptr;
//zsetAdd相对复杂些,需要处理好score的结构
int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_OUT_ADDED) added++;
if (retflags & ZADD_OUT_UPDATED) updated++;
if (!(retflags & ZADD_OUT_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
//统一处理通知与pub sub模块
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
统一包装后,整体的if判断相对较多,需要处理nx与xx互斥,gt lt nx三者互斥的关系,整体zset的逻辑函数也很多,由于本身score的特殊性质,为预留排名导致无法直接调用底层listpack与skiplist的包装函数,也因此z_set中代码复杂度相对较高
zsetadd底层逻辑
/* Add a new element or update the score of an existing element in a sorted
* set, regardless of its encoding.
*
* The set of flags change the command behavior.
*
* The input flags are the following:
*
* ZADD_INCR: Increment the current element score by 'score' instead of updating
* the current element score. If the element does not exist, we
* assume 0 as previous score.
* ZADD_NX: Perform the operation only if the element does not exist.
* ZADD_XX: Perform the operation only if the element already exist.
* ZADD_GT: Perform the operation on existing elements only if the new score is
* greater than the current score.
* ZADD_LT: Perform the operation on existing elements only if the new score is
* less than the current score.
*
* When ZADD_INCR is used, the new score of the element is stored in
* '*newscore' if 'newscore' is not NULL.
*
* The returned flags are the following:
*
* ZADD_NAN: The resulting score is not a number.
* ZADD_ADDED: The element was added (not present before the call).
* ZADD_UPDATED: The element score was updated.
* ZADD_NOP: No operation was performed because of NX or XX.
*
* Return value:
*
* The function returns 1 on success, and sets the appropriate flags
* ADDED or UPDATED to signal what happened during the operation (note that
* none could be set if we re-added an element using the same score it used
* to have, or in the case a zero increment is used).
*
* The function returns 0 on error, currently only when the increment
* produces a NAN condition, or when the 'score' value is NAN since the
* start.
*
* The command as a side effect of adding a new element may convert the sorted
* set internal encoding from listpack to hashtable+skiplist.
*
* Memory management of 'ele':
*
* The function does not take ownership of the 'ele' SDS string, but copies
* it if needed. */
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
/* Turn options into simple to check vars. */
//需要注意 nx与xx的互斥,gt lt nx之间的互斥
int incr = (in_flags & ZADD_IN_INCR) != 0;
int nx = (in_flags & ZADD_IN_NX) != 0;
int xx = (in_flags & ZADD_IN_XX) != 0;
int gt = (in_flags & ZADD_IN_GT) != 0;
int lt = (in_flags & ZADD_IN_LT) != 0;
*out_flags = 0; /* We'll return our response flags. */
double curscore;
/* NaN as input is an error regardless of all the other parameters. */
if (isnan(score)) {
*out_flags = ZADD_OUT_NAN;
return 0;
}
/* Update the sorted set according to its encoding. */
//listpack的数据结构
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
//假如找到了相同元素
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
/* Prepare the score for the increment if needed. */
//incr需要增加分数
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}
/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
if (newscore) *newscore = score;
/* Remove and re-insert when score changed. */
//如果score不同的话,那么就先删除然后再添加,达到更新的操作
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
//如果为新的,就需要考虑编码转换的问题,但是需要先insert之后再判断
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_listpack_entries ||
sdslen(ele) > server.zset_max_listpack_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
//先从dict中判断是否存在,在dict中的hash查询效率比skiplist要快很多
de = dictFind(zs->dict,ele);
if (de != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
curscore = *(double*)dictGetVal(de);
/* Prepare the score for the increment if needed. */
//分数增加
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}
/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
if (newscore) *newscore = score;
/* Remove and re-insert when score changes. */
if (score != curscore) {
//更新跳表中的score
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
//更新dict中的score
dictGetVal(de) = &znode->score; /* Update score ptr. */
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
ele = sdsdup(ele);
//无需编码转换,需要同时add dict与跳表
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*out_flags |= ZADD_OUT_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
可以看出,listpack相对较简单,而dict与skiplist需要冗余存储,通过牺牲一定的内存空间换取极高的查询与排序效率,用空间换时间,将排序与范围查询优化到O(log(n))的级别
zdel
/* Delete the element 'ele' from the sorted set, returning 1 if the element
* existed and was deleted, 0 otherwise (the element was not there). */
int zsetDel(robj *zobj, sds ele) {
//简单的判断编码,然后调用不同的底层数据结构的del操作
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
if (zsetRemoveFromSkiplist(zs, ele)) {
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* No such element found. */
}
t_zset总结
t_zset中较为重要的是dict与skiplist的结合。将元素冗余存储在dict与skiplist,牺牲了部分的空间利用率,但是得到了极高的查询与排序效率。
dict负责查询元素的score,而skiplist用来根据score查询元素,排名也在skiplist中负责。add时由于dict的搜寻性能更高,会优先查询dict来判断元素是否存在。
dict+skiplist结合后,可以很快的先在dict中由数据查到score,然后根据score到skiplist中反查,查到数据后直接就可以获取排名,很巧妙的通过两者不同的底层数据结构将复杂的排名操作优化到了O(log n),也因此redis在大版本升级时只替换了ziplist的底层数据结构。也由于dict+skiplist的结合,在add的时候,对于编码为OBJ_ENCODING_SKIPLIST的逻辑操作相对较多一些,整体add函数相对复杂。需要了解dict与skiplist后才能明白两者分别在OBJ_ENCODING_SKIPLIST中的优势点,才能理解zscore,zrevrange,zrevrank的高效实现。