redis源码学习-t_set篇

t_set介绍

t_set为redis中无序集合的容器,用来保存不同成员,底层数据结构为dict(hash table)和intset实现。
dict与intset分别在


这两篇文章中进行了详细分析,不熟悉的同学可以先看这两篇文章。

t_set详解

t_set编码方式

#define OBJ_ENCODING_HT 2      /* Encoded as hash table */
#define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
  1. intset编码条件:首次添加时为longlong类型能够保存的整数,且之后添加的全都为longlong类型能够保存的整数
  2. dict编码条件:当整个set中有一个longlong不能保存的值,就全部换为dict

t_set中负责处理的具体命令

  1. sadd:向集合添加一个或多个成员
  2. scard:获取集合的成员数
  3. sdiff:返回多个集合的差集
  4. sdiffstore:返回给定所有集合的差集并存储
  5. sinter:返回多个集合的交集
  6. sinterstore:返回给定所有集合的交集并存储
  7. sunion:返回多个集合的并集
  8. sunionstore:返回给定所有集合的并集并存储
  9. sismember:判断元素是否是为该集合的成员
  10. smembers:返回集合中的所有成员
  11. smove:将元素从一个集合移动到另一个集合
  12. spop:移除并返回集合中的一个元素
  13. srandmember:返回集合中一个或多个随机成员
  14. srem:移除集合中一个或多个成员
  15. sscan:迭代集合中的元素

t_set源码分析

t_set中特殊点在于提供了集合间的交集并集差集操作,t_set中的GenericCommand也全部是为了交叉并集包装的通用函数,一些相对独立的命令不使用Generic公共函数,由自己负责pub/sub等操作。
sadd:

void saddCommand(client *c) {
    robj *set;
    int j, added = 0;
    //判断命令是否合规
    set = lookupKeyWrite(c->db,c->argv[1]);
    if (checkType(c,set,OBJ_SET)) return;
    //如果set为空,那么会优先判断类型是否能是intset,如果不行的话就使用dict
    if (set == NULL) {
        set = setTypeCreate(c->argv[2]->ptr);
        //在这里直接调用db模块的add命令,将集合的key写入到db中
        dbAdd(c->db,c->argv[1],set);
    }

    for (j = 2; j < c->argc; j++) {
           //这里的setTypeAdd函数根据编码类型调用底层数据结构的dictAdd或intsetAdd
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    }
    if (added) {
        // 发送键修改通知
        signalModifiedKey(c,c->db,c->argv[1]);
         //自己做pub/sub的通知,没有托管给通用函数
        notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
    }
    server.dirty += added;
    //与t_hash类似,由于可能此命令操作的数据数量太大,所以需要longlong的addreply才能安全的放下
    addReplyLongLong(c,added);
}
/* Factory method to return a set that *can* hold "value". When the object has
 * an integer-encodable value, an intset will be returned. Otherwise a regular
 * hash table. */
robj *setTypeCreate(sds value) {
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        return createIntsetObject();
    return createSetObject();
}

整个sadd看下来会发现,tset中的编码也与t_hash类似,当编码"升级"之后就再也不会“降级了”。只要有longlong满足不了的value插入,整个t_set将全部变为dict进行存储
scard:

void scardCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,OBJ_SET)) return;

    addReplyLongLong(c,setTypeSize(o));
}

相对来讲scard是整个tset最简单的函数了,由于dict与intset本身就会记录存储的entry个数,因此t_set只需要验证编码格式,然后直接调用即可
t_set中的交叉并集
t_set中交叉并集全部都有保存计算后的集合的命令,因此相对应的GenericCommand函数就根据传参条件判断需不需要保存计算后的集合即各个stroe命令。
交集通用函数:

void sinterGenericCommand(client *c, robj **setkeys,
                          unsigned long setnum, robj *dstkey,
                          int cardinality_only, unsigned long limit) {
    //申请内存,用来临时保存结果,整个GenericCommandreturn之前都会执行zfree释放此缓存空间
    robj **sets = zmalloc(sizeof(robj*)*setnum);
    setTypeIterator *si;
    robj *dstset = NULL;
    sds elesds;
    int64_t intobj;
    void *replylen = NULL;
    unsigned long j, cardinality = 0;
    int encoding, empty = 0;

    for (j = 0; j < setnum; j++) {
        //当需要保存计算结果时,将会传入dstkey
        robj *setobj = dstkey ?
             //用来取出对象
            lookupKeyWrite(c->db,setkeys[j]) :
            lookupKeyRead(c->db,setkeys[j]);
        //假如说对象不存在的话,那么无需做check操作了,将预设的empty自增,进入for循环外的分支,走完整的free操作
        if (!setobj) {
            /* A NULL is considered an empty set */
            empty += 1;
            sets[j] = NULL;
            continue;
        }
        if (checkType(c,setobj,OBJ_SET)) {
            zfree(sets);
            return;
        }
        sets[j] = setobj;
    }

    /* Set intersection with an empty set always results in an empty set.
     * Return ASAP if there is an empty set. */
    //empty>0会出现在上边的for循环中,没有找到对应的对象,那么释放缓存set的内存,发送通知,返回结果统一处理
    if (empty > 0) {
        zfree(sets);
        if (dstkey) {
            if (dbDelete(c->db,dstkey)) {
                signalModifiedKey(c,c->db,dstkey);
                notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
                server.dirty++;
            }
            addReply(c,shared.czero);
        } else if (cardinality_only) {
            addReplyLongLong(c,cardinality);
        } else {
            addReply(c,shared.emptyset[c->resp]);
        }
        return;
    }

    /* Sort sets from the smallest to largest, this will improve our
     * algorithm's performance */
    //这里用的是快排,由于之后的交集为不停的for循环,那么将无序预先变为有序,方便for循环从中间直接break,增加效率
    qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);

    /* The first thing we should output is the total number of elements...
     * since this is a multi-bulk write, but at this stage we don't know
     * the intersection set size, so we use a trick, append an empty object
     * to the output list and save the pointer to later modify it with the
     * right length */
    //在输出列表中附加一个空的对象replylen(adlist),保存它的指针,将之后的长度更新在replylen中
    if (dstkey) {
        /* If we have a target key where to store the resulting set
         * create this key with an empty set inside */
        dstset = createIntsetObject();
    } else if (!cardinality_only) {
        replylen = addReplyDeferredLen(c);
    }

    /* Iterate all the elements of the first (smallest) set, and test
     * the element against all the other sets, if at least one set does
     * not include the element it is discarded */
    //为了减小for循环次数,需要先遍历entry数最少的集合,用它的元素与其余的集合进行对比,如果不是所有的元素都含有此集合中的元素,那么此元素就不为交集
    si = setTypeInitIterator(sets[0]);
    while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
        for (j = 1; j < setnum; j++) {
            //如果为当前集合则跳过
            if (sets[j] == sets[0]) continue;
            //intset的话
            if (encoding == OBJ_ENCODING_INTSET) {
                /* intset with intset is simple... and fast */
                //由于都是int,遍历的效率相对dict会高很多
                if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
                    !intsetFind((intset*)sets[j]->ptr,intobj))
                {
                    break;
                /* in order to compare an integer with an object we
                 * have to use the generic function, creating an object
                 * for this */
                } else if (sets[j]->encoding == OBJ_ENCODING_HT) {
                   //其他集合为非intset类型会稍微麻烦些,这里构造里一个范型对象用来对比整数与对象
                    elesds = sdsfromlonglong(intobj);
                    if (!setTypeIsMember(sets[j],elesds)) {
                        sdsfree(elesds);
                        break;
                    }
                    sdsfree(elesds);
                }
            } else if (encoding == OBJ_ENCODING_HT) {
                //纯dict的话尝试set,能够set进去那么就说明不为交集
                if (!setTypeIsMember(sets[j],elesds)) {
                    break;
                }
            }
        }
        //当发现此元素在所有集合中,即发现了新的交集元素时
        /* Only take action when all sets contain the member */
        if (j == setnum) {
            if (cardinality_only) {
                cardinality++;

                /* We stop the searching after reaching the limit. */
                //判断是否到了limit的限制,到的话不做其余操作了
                if (limit && cardinality >= limit)
                    break;
            } else if (!dstkey) {
                //dstkey如果为空,那么说明仅仅是找交集,不用写入
                if (encoding == OBJ_ENCODING_HT)
                    addReplyBulkCBuffer(c,elesds,sdslen(elesds));
                else
                    addReplyBulkLongLong(c,intobj);
                cardinality++;
            } else {
                if (encoding == OBJ_ENCODING_INTSET) {
                    elesds = sdsfromlonglong(intobj);
                    setTypeAdd(dstset,elesds);
                    sdsfree(elesds);
                } else {
                    setTypeAdd(dstset,elesds);
                }
            }
        }
    }
    setTypeReleaseIterator(si);

    if (cardinality_only) {
        addReplyLongLong(c,cardinality);
    } else if (dstkey) {
        //需要将结果写入库中
        /* Store the resulting set into the target, if the intersection
         * is not an empty set. */
        if (setTypeSize(dstset) > 0) {
            setKey(c,c->db,dstkey,dstset);
            addReplyLongLong(c,setTypeSize(dstset));
            notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
                dstkey,c->db->id);
            server.dirty++;
        } else {
            addReply(c,shared.czero);
            if (dbDelete(c->db,dstkey)) {
                //脏键自增
                server.dirty++;
                //通知变化
                signalModifiedKey(c,c->db,dstkey);
                //通知订阅
                notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
            }
        }
        decrRefCount(dstset);
    } else {
        setDeferredSetLen(c,replylen,cardinality);
    }
    zfree(sets);
}

交集相对来说比较简单,得益于dict的尝试插入与intset的寻找优化,整个时间复杂度在n^2以内
并集差集:

void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
                              robj *dstkey, int op) {
    robj **sets = zmalloc(sizeof(robj*)*setnum);
    setTypeIterator *si;
    robj *dstset = NULL;
    sds ele;
    int j, cardinality = 0;
    int diff_algo = 1;

    for (j = 0; j < setnum; j++) {
        robj *setobj = dstkey ?
            lookupKeyWrite(c->db,setkeys[j]) :
            lookupKeyRead(c->db,setkeys[j]);
        if (!setobj) {
            sets[j] = NULL;
            continue;
        }
        if (checkType(c,setobj,OBJ_SET)) {
            zfree(sets);
            return;
        }
        sets[j] = setobj;
    }

    /* Select what DIFF algorithm to use.
     *
     * Algorithm 1 is O(N*M) where N is the size of the element first set
     * and M the total number of sets.
     *
     * Algorithm 2 is O(N) where N is the total number of elements in all
     * the sets.
     *
     * We compute what is the best bet with the current input here. */
     //根据输入的集合决定用哪种算法,根据对比min元素数量*集合个数与所有集合中元素个数相比较,决定用最小时间复杂度的算法
    if (op == SET_OP_DIFF && sets[0]) {
        long long algo_one_work = 0, algo_two_work = 0;
      
        for (j = 0; j < setnum; j++) {
            if (sets[j] == NULL) continue;
            //algo_one_work为计算最小基数*集合个数的时间复杂度
            algo_one_work += setTypeSize(sets[0]);
            //algo_two_work为计算所有集合全部个数的时间复杂度
            algo_two_work += setTypeSize(sets[j]);
        }

        /* Algorithm 1 has better constant times and performs less operations
         * if there are elements in common. Give it some advantage. */
        //算法1的常数一般较为低,所以一般有限考虑算法1
        algo_one_work /= 2;
        diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;

        if (diff_algo == 1 && setnum > 1) {
            /* With algorithm 1 it is better to order the sets to subtract
             * by decreasing size, so that we are more likely to find
             * duplicated elements ASAP. */
            //算法1的时间复杂度依赖提前对其余的集合进行排序
            qsort(sets+1,setnum-1,sizeof(robj*),
                qsortCompareSetsByRevCardinality);
        }
    }

    /* We need a temp set object to store our union. If the dstkey
     * is not NULL (that is, we are inside an SUNIONSTORE operation) then
     * this set object will be the resulting object to set into the target key*/
    //临时集合用来保存结果,如果为并集的操作的话,那么这个集合就是最终的结果
    dstset = createIntsetObject();
    //并集计算
    if (op == SET_OP_UNION) {
        /* Union is trivial, just add every element of every set to the
         * temporary set. */
        //暴力遍历即可
        for (j = 0; j < setnum; j++) {
            if (!sets[j]) continue; /* non existing keys are like empty sets */

            si = setTypeInitIterator(sets[j]);
            while((ele = setTypeNextObject(si)) != NULL) {
                if (setTypeAdd(dstset,ele)) cardinality++;
                sdsfree(ele);
            }
            setTypeReleaseIterator(si);
        }
     //差集相对比较麻烦,需要考虑不同的算法情况
     //算法1的差集
    } else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
        /* DIFF Algorithm 1:
         *
         * We perform the diff by iterating all the elements of the first set,
         * and only adding it to the target set if the element does not exist
         * into all the other sets.
         *
         * This way we perform at max N*M operations, where N is the size of
         * the first set, and M the number of sets. */
        //在算法1中,需要将最小基数的集合与其他所有集合元素进行对比,所以这里算法一的复杂度为N*M
        si = setTypeInitIterator(sets[0]);
        while((ele = setTypeNextObject(si)) != NULL) {
            for (j = 1; j < setnum; j++) {
                if (!sets[j]) continue; /* no key is an empty set. */
                if (sets[j] == sets[0]) break; /* same set! */
                if (setTypeIsMember(sets[j],ele)) break;
            }
            if (j == setnum) {
                /* There is no other set with this element. Add it. */
                setTypeAdd(dstset,ele);
                cardinality++;
            }
            sdsfree(ele);
        }
        setTypeReleaseIterator(si);
     //算法2的差集
    } else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
        /* DIFF Algorithm 2:
         *
         * Add all the elements of the first set to the auxiliary set.
         * Then remove all the elements of all the next sets from it.
         *
         * This is O(N) where N is the sum of all the elements in every
         * set. */
        //与算法1不同的是,需要将最小基数的集合也添加到备用结果集中,然后再遍历所有集合,将相同元素删除
        for (j = 0; j < setnum; j++) {
            if (!sets[j]) continue; /* non existing keys are like empty sets */

            si = setTypeInitIterator(sets[j]);
            while((ele = setTypeNextObject(si)) != NULL) {
                if (j == 0) {
                    if (setTypeAdd(dstset,ele)) cardinality++;
                } else {
                    if (setTypeRemove(dstset,ele)) cardinality--;
                }
                sdsfree(ele);
            }
            setTypeReleaseIterator(si);

            /* Exit if result set is empty as any additional removal
             * of elements will have no effect. */
            if (cardinality == 0) break;
        }
    }

    /* Output the content of the resulting set, if not in STORE mode */
    //与交集基本上一致,唯一不通的是根据并集和差集的区别,看dstkey是否需要被删除
    if (!dstkey) {
        addReplySetLen(c,cardinality);
        si = setTypeInitIterator(dstset);
        while((ele = setTypeNextObject(si)) != NULL) {
            addReplyBulkCBuffer(c,ele,sdslen(ele));
            sdsfree(ele);
        }
        setTypeReleaseIterator(si);
        server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset, -1) :
                                          decrRefCount(dstset);
    } else {
        /* If we have a target key where to store the resulting set
         * create this key with the result set inside */
        if (setTypeSize(dstset) > 0) {
            //说明结果集不为空,那么需要添加到数据库中
            setKey(c,c->db,dstkey,dstset);
            addReplyLongLong(c,setTypeSize(dstset));
            //通知
            notifyKeyspaceEvent(NOTIFY_SET,
                op == SET_OP_UNION ? "sunionstore" : "sdiffstore",
                dstkey,c->db->id);
            server.dirty++;
        } else {
            addReply(c,shared.czero);
            if (dbDelete(c->db,dstkey)) {
                server.dirty++;
                signalModifiedKey(c,c->db,dstkey);
                notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
            }
        }
        decrRefCount(dstset);
    }
    zfree(sets);
}

并集差集中特殊的是,会预先计算时间复杂度,然后根据计算结果采用不同的算法来做,两个算法不同的就在于,对于最小基数的集合,是直接与其他元素相比较,还是添加到其他元素中再遍历相同元素

t_set总结

t_set中对于简单的command命令,实现基本全依赖底层数据结构包装好的命令,唯一特殊的就在于对于多个集合的交叉并集的统一支持。其中并差集采用相同的函数,根据具体的集合情况判断相对应的具体算法。对于交集,时间复杂度最坏在O(n^2),而对于并差集,最坏的时间复杂度则在min(O(n*M),O(N))(n为最小基数的集合基数,M为集合个数,N为集合中元素个数之和)