redis源码学习-t_set篇
0 条评论redis源码学习-t_set篇
t_set介绍
t_set为redis中无序集合的容器,用来保存不同成员,底层数据结构为dict(hash table)和intset实现。
dict与intset分别在
这两篇文章中进行了详细分析,不熟悉的同学可以先看这两篇文章。
t_set详解
t_set编码方式
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
- intset编码条件:首次添加时为longlong类型能够保存的整数,且之后添加的全都为longlong类型能够保存的整数
- dict编码条件:当整个set中有一个longlong不能保存的值,就全部换为dict
t_set中负责处理的具体命令
- sadd:向集合添加一个或多个成员
- scard:获取集合的成员数
- sdiff:返回多个集合的差集
- sdiffstore:返回给定所有集合的差集并存储
- sinter:返回多个集合的交集
- sinterstore:返回给定所有集合的交集并存储
- sunion:返回多个集合的并集
- sunionstore:返回给定所有集合的并集并存储
- sismember:判断元素是否是为该集合的成员
- smembers:返回集合中的所有成员
- smove:将元素从一个集合移动到另一个集合
- spop:移除并返回集合中的一个元素
- srandmember:返回集合中一个或多个随机成员
- srem:移除集合中一个或多个成员
- sscan:迭代集合中的元素
t_set源码分析
t_set中特殊点在于提供了集合间的交集并集差集操作,t_set中的GenericCommand也全部是为了交叉并集包装的通用函数,一些相对独立的命令不使用Generic公共函数,由自己负责pub/sub等操作。
sadd:
void saddCommand(client *c) {
robj *set;
int j, added = 0;
//判断命令是否合规
set = lookupKeyWrite(c->db,c->argv[1]);
if (checkType(c,set,OBJ_SET)) return;
//如果set为空,那么会优先判断类型是否能是intset,如果不行的话就使用dict
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr);
//在这里直接调用db模块的add命令,将集合的key写入到db中
dbAdd(c->db,c->argv[1],set);
}
for (j = 2; j < c->argc; j++) {
//这里的setTypeAdd函数根据编码类型调用底层数据结构的dictAdd或intsetAdd
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
// 发送键修改通知
signalModifiedKey(c,c->db,c->argv[1]);
//自己做pub/sub的通知,没有托管给通用函数
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
//与t_hash类似,由于可能此命令操作的数据数量太大,所以需要longlong的addreply才能安全的放下
addReplyLongLong(c,added);
}
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
整个sadd看下来会发现,tset中的编码也与t_hash类似,当编码"升级"之后就再也不会“降级了”。只要有longlong满足不了的value插入,整个t_set将全部变为dict进行存储
scard:
void scardCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_SET)) return;
addReplyLongLong(c,setTypeSize(o));
}
相对来讲scard是整个tset最简单的函数了,由于dict与intset本身就会记录存储的entry个数,因此t_set只需要验证编码格式,然后直接调用即可
t_set中的交叉并集
t_set中交叉并集全部都有保存计算后的集合的命令,因此相对应的GenericCommand函数就根据传参条件判断需不需要保存计算后的集合即各个stroe命令。
交集通用函数:
void sinterGenericCommand(client *c, robj **setkeys,
unsigned long setnum, robj *dstkey,
int cardinality_only, unsigned long limit) {
//申请内存,用来临时保存结果,整个GenericCommandreturn之前都会执行zfree释放此缓存空间
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *dstset = NULL;
sds elesds;
int64_t intobj;
void *replylen = NULL;
unsigned long j, cardinality = 0;
int encoding, empty = 0;
for (j = 0; j < setnum; j++) {
//当需要保存计算结果时,将会传入dstkey
robj *setobj = dstkey ?
//用来取出对象
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
//假如说对象不存在的话,那么无需做check操作了,将预设的empty自增,进入for循环外的分支,走完整的free操作
if (!setobj) {
/* A NULL is considered an empty set */
empty += 1;
sets[j] = NULL;
continue;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
/* Set intersection with an empty set always results in an empty set.
* Return ASAP if there is an empty set. */
//empty>0会出现在上边的for循环中,没有找到对应的对象,那么释放缓存set的内存,发送通知,返回结果统一处理
if (empty > 0) {
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
server.dirty++;
}
addReply(c,shared.czero);
} else if (cardinality_only) {
addReplyLongLong(c,cardinality);
} else {
addReply(c,shared.emptyset[c->resp]);
}
return;
}
/* Sort sets from the smallest to largest, this will improve our
* algorithm's performance */
//这里用的是快排,由于之后的交集为不停的for循环,那么将无序预先变为有序,方便for循环从中间直接break,增加效率
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
/* The first thing we should output is the total number of elements...
* since this is a multi-bulk write, but at this stage we don't know
* the intersection set size, so we use a trick, append an empty object
* to the output list and save the pointer to later modify it with the
* right length */
//在输出列表中附加一个空的对象replylen(adlist),保存它的指针,将之后的长度更新在replylen中
if (dstkey) {
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
dstset = createIntsetObject();
} else if (!cardinality_only) {
replylen = addReplyDeferredLen(c);
}
/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
//为了减小for循环次数,需要先遍历entry数最少的集合,用它的元素与其余的集合进行对比,如果不是所有的元素都含有此集合中的元素,那么此元素就不为交集
si = setTypeInitIterator(sets[0]);
while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
for (j = 1; j < setnum; j++) {
//如果为当前集合则跳过
if (sets[j] == sets[0]) continue;
//intset的话
if (encoding == OBJ_ENCODING_INTSET) {
/* intset with intset is simple... and fast */
//由于都是int,遍历的效率相对dict会高很多
if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,intobj))
{
break;
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
} else if (sets[j]->encoding == OBJ_ENCODING_HT) {
//其他集合为非intset类型会稍微麻烦些,这里构造里一个范型对象用来对比整数与对象
elesds = sdsfromlonglong(intobj);
if (!setTypeIsMember(sets[j],elesds)) {
sdsfree(elesds);
break;
}
sdsfree(elesds);
}
} else if (encoding == OBJ_ENCODING_HT) {
//纯dict的话尝试set,能够set进去那么就说明不为交集
if (!setTypeIsMember(sets[j],elesds)) {
break;
}
}
}
//当发现此元素在所有集合中,即发现了新的交集元素时
/* Only take action when all sets contain the member */
if (j == setnum) {
if (cardinality_only) {
cardinality++;
/* We stop the searching after reaching the limit. */
//判断是否到了limit的限制,到的话不做其余操作了
if (limit && cardinality >= limit)
break;
} else if (!dstkey) {
//dstkey如果为空,那么说明仅仅是找交集,不用写入
if (encoding == OBJ_ENCODING_HT)
addReplyBulkCBuffer(c,elesds,sdslen(elesds));
else
addReplyBulkLongLong(c,intobj);
cardinality++;
} else {
if (encoding == OBJ_ENCODING_INTSET) {
elesds = sdsfromlonglong(intobj);
setTypeAdd(dstset,elesds);
sdsfree(elesds);
} else {
setTypeAdd(dstset,elesds);
}
}
}
}
setTypeReleaseIterator(si);
if (cardinality_only) {
addReplyLongLong(c,cardinality);
} else if (dstkey) {
//需要将结果写入库中
/* Store the resulting set into the target, if the intersection
* is not an empty set. */
if (setTypeSize(dstset) > 0) {
setKey(c,c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
dstkey,c->db->id);
server.dirty++;
} else {
addReply(c,shared.czero);
if (dbDelete(c->db,dstkey)) {
//脏键自增
server.dirty++;
//通知变化
signalModifiedKey(c,c->db,dstkey);
//通知订阅
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
}
}
decrRefCount(dstset);
} else {
setDeferredSetLen(c,replylen,cardinality);
}
zfree(sets);
}
交集相对来说比较简单,得益于dict的尝试插入与intset的寻找优化,整个时间复杂度在n^2以内
并集差集:
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *dstset = NULL;
sds ele;
int j, cardinality = 0;
int diff_algo = 1;
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
if (!setobj) {
sets[j] = NULL;
continue;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
/* Select what DIFF algorithm to use.
*
* Algorithm 1 is O(N*M) where N is the size of the element first set
* and M the total number of sets.
*
* Algorithm 2 is O(N) where N is the total number of elements in all
* the sets.
*
* We compute what is the best bet with the current input here. */
//根据输入的集合决定用哪种算法,根据对比min元素数量*集合个数与所有集合中元素个数相比较,决定用最小时间复杂度的算法
if (op == SET_OP_DIFF && sets[0]) {
long long algo_one_work = 0, algo_two_work = 0;
for (j = 0; j < setnum; j++) {
if (sets[j] == NULL) continue;
//algo_one_work为计算最小基数*集合个数的时间复杂度
algo_one_work += setTypeSize(sets[0]);
//algo_two_work为计算所有集合全部个数的时间复杂度
algo_two_work += setTypeSize(sets[j]);
}
/* Algorithm 1 has better constant times and performs less operations
* if there are elements in common. Give it some advantage. */
//算法1的常数一般较为低,所以一般有限考虑算法1
algo_one_work /= 2;
diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;
if (diff_algo == 1 && setnum > 1) {
/* With algorithm 1 it is better to order the sets to subtract
* by decreasing size, so that we are more likely to find
* duplicated elements ASAP. */
//算法1的时间复杂度依赖提前对其余的集合进行排序
qsort(sets+1,setnum-1,sizeof(robj*),
qsortCompareSetsByRevCardinality);
}
}
/* We need a temp set object to store our union. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE operation) then
* this set object will be the resulting object to set into the target key*/
//临时集合用来保存结果,如果为并集的操作的话,那么这个集合就是最终的结果
dstset = createIntsetObject();
//并集计算
if (op == SET_OP_UNION) {
/* Union is trivial, just add every element of every set to the
* temporary set. */
//暴力遍历即可
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (setTypeAdd(dstset,ele)) cardinality++;
sdsfree(ele);
}
setTypeReleaseIterator(si);
}
//差集相对比较麻烦,需要考虑不同的算法情况
//算法1的差集
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
/* DIFF Algorithm 1:
*
* We perform the diff by iterating all the elements of the first set,
* and only adding it to the target set if the element does not exist
* into all the other sets.
*
* This way we perform at max N*M operations, where N is the size of
* the first set, and M the number of sets. */
//在算法1中,需要将最小基数的集合与其他所有集合元素进行对比,所以这里算法一的复杂度为N*M
si = setTypeInitIterator(sets[0]);
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets[j]) continue; /* no key is an empty set. */
if (sets[j] == sets[0]) break; /* same set! */
if (setTypeIsMember(sets[j],ele)) break;
}
if (j == setnum) {
/* There is no other set with this element. Add it. */
setTypeAdd(dstset,ele);
cardinality++;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
//算法2的差集
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
/* DIFF Algorithm 2:
*
* Add all the elements of the first set to the auxiliary set.
* Then remove all the elements of all the next sets from it.
*
* This is O(N) where N is the sum of all the elements in every
* set. */
//与算法1不同的是,需要将最小基数的集合也添加到备用结果集中,然后再遍历所有集合,将相同元素删除
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
if (setTypeRemove(dstset,ele)) cardinality--;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
/* Exit if result set is empty as any additional removal
* of elements will have no effect. */
if (cardinality == 0) break;
}
}
/* Output the content of the resulting set, if not in STORE mode */
//与交集基本上一致,唯一不通的是根据并集和差集的区别,看dstkey是否需要被删除
if (!dstkey) {
addReplySetLen(c,cardinality);
si = setTypeInitIterator(dstset);
while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
}
setTypeReleaseIterator(si);
server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset, -1) :
decrRefCount(dstset);
} else {
/* If we have a target key where to store the resulting set
* create this key with the result set inside */
if (setTypeSize(dstset) > 0) {
//说明结果集不为空,那么需要添加到数据库中
setKey(c,c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
//通知
notifyKeyspaceEvent(NOTIFY_SET,
op == SET_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id);
server.dirty++;
} else {
addReply(c,shared.czero);
if (dbDelete(c->db,dstkey)) {
server.dirty++;
signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
}
}
decrRefCount(dstset);
}
zfree(sets);
}
并集差集中特殊的是,会预先计算时间复杂度,然后根据计算结果采用不同的算法来做,两个算法不同的就在于,对于最小基数的集合,是直接与其他元素相比较,还是添加到其他元素中再遍历相同元素
t_set总结
t_set中对于简单的command命令,实现基本全依赖底层数据结构包装好的命令,唯一特殊的就在于对于多个集合的交叉并集的统一支持。其中并差集采用相同的函数,根据具体的集合情况判断相对应的具体算法。对于交集,时间复杂度最坏在O(n^2),而对于并差集,最坏的时间复杂度则在min(O(n*M),O(N))(n为最小基数的集合基数,M为集合个数,N为集合中元素个数之和)