redis源码学习-t_list篇

t_list介绍

t_list,负责处理与list相关的命令容器,比如rpush,lpush等命令。底层使用quicklist,不熟悉的同学可以看

t_list中负责处理的具体命令

  1. lpush:将一个或多个值插入到列表头部
  2. blpop:移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
  3. brpop:移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
  4. lindex:通过索引获取列表中的元素
  5. linsert:在列表的元素前或者后插入元素
  6. llen:获取列表长度
  7. lpop:移出并获取列表的第一个元素
  8. lpush:将一个或多个值插入到列表头部
  9. lpushx:将一个值插入到已存在的列表头部
  10. lrange:获取列表指定范围内的元素
  11. lrem:移除列表元素
  12. lset:通过索引设置列表元素的值
  13. ltrim:对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
  14. rpop:移除列表的最后一个元素,返回值为移除的元素。
  15. rpoplpush:移除列表的最后一个元素,并将该元素添加到另一个列表并返回
  16. rpush:在列表中添加一个或多个值
  17. rpushx:为已存在的列表添加值
    其中,每个具体命令都对应着一个具体的函数,全部定义在/src/server.h下,如上所属的具体命令的具体实现全部在t_list.c文件中,函数名称为命令名称+Command,采用驼峰命名法,比如lpush对应的就是lpushCommand函数。

t_list所采用的底层数据结构

数据结构编码方式适用类型quicklistQUICKLISTall

t_list分析

与tstring不同,t_list定义了两个结构体 listTypeIterator 与listTypeEntry

/* Structure to hold list iteration abstraction. */
typedef struct {
    robj *subject;
    unsigned char encoding;
    unsigned char direction; /* Iteration direction */
    quicklistIter *iter;
} listTypeIterator;

/* Structure for an entry while iterating over a list. */
typedef struct {
    listTypeIterator *li;
    quicklistEntry entry; /* Entry in quicklist */
} listTypeEntry;

其中subject为redisObject,后续文章将会分析到。结合listTypeIterator和listTypeEntry会发现,listTypeEntry定义了list的头与一个listTypeIterator,listTypeIterator中负责保存迭代的方向以及list的编码方式。
t_list中也定义了通用的底层函数
pushGenericCommand:

/* Implements LPUSH/RPUSH/LPUSHX/RPUSHX. 
 * 'xx': push if key exists. */
void pushGenericCommand(client *c, int where, int xx) {
    int j;
    //查看redis中是否存在这个key,如果存在key但类型不符将直接返回
    robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
    if (checkType(c,lobj,OBJ_LIST)) return;
    if (!lobj) {
        //xx默认为0,仅lpush与rpush使用到了
        if (xx) {
            //将值添加到输出缓存区中
            addReply(c, shared.czero);
            return;
        }
        //如果对象不存在,那么使用quicklist创建对象,并且初始化
        lobj = createQuicklistObject();
        quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                            server.list_compress_depth);
        //将键添加到相对应的db中(常用的都是db0)
        dbAdd(c->db,c->argv[1],lobj);
    }

    for (j = 2; j < c->argc; j++) {
        // 真正的push操作
        listTypePush(lobj,c->argv[j],where);
        //设置当前为脏(dirty),每次修改一个key后,都会对脏键(dirty)增1
        server.dirty++;
    }
    // 返回添加的节点数量
    addReplyLongLong(c, listTypeLength(lobj));

    char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
    // 发送键修改通知
    signalModifiedKey(c,c->db,c->argv[1]);
    //供订阅/消费模块使用
    notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}

从这里可以看到,t_list中全部为quicklist
popGenericCommand:

/* Implements the generic list pop operation for LPOP/RPOP.
 * The where argument specifies which end of the list is operated on. An
 * optional count may be provided as the third argument of the client's
 * command. */
void popGenericCommand(client *c, int where) {
    long count = 0;
    robj *value;
    //格式校验
    if (c->argc > 3) {
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                            c->cmd->name);
        return;
    } else if (c->argc == 3) {
        /* Parse the optional count argument. */
        if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK) 
            return;
        if (count == 0) {
            /* Fast exit path. */
            addReplyNullArray(c);
            return;
        }
    }
    //查看key是否存在或类型是否正确
    robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]);
    if (o == NULL || checkType(c, o, OBJ_LIST))
        return;

    if (!count) {
        /* Pop a single element. This is POP's original behavior that replies
         * with a bulk string. */
        value = listTypePop(o,where);
        serverAssert(value != NULL);
        addReplyBulk(c,value);
        decrRefCount(value);
        //其中供订阅模块通知的函数写在了这里,作为remove后的收尾统一函数
        listElementsRemoved(c,c->argv[1],where,o,1,NULL);
    } else {
        /* Pop a range of elements. An addition to the original POP command,
         *  which replies with a multi-bulk. */
        long llen = listTypeLength(o);
        long rangelen = (count > llen) ? llen : count;
        long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
        long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
        int reverse = (where == LIST_HEAD) ? 0 : 1;

        addListRangeReply(c,o,rangestart,rangeend,reverse);
        listTypeDelRange(o,rangestart,rangelen);
        //其中供订阅模块通知的函数写在了这里,作为remove后的收尾统一函数
        listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL);
    }
}

listElementsRemoved(remove的收尾函数):

/* A housekeeping helper for list elements popping tasks.
 *
 * 'deleted' is an optional output argument to get an indication
 * if the key got deleted by this function. */
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted) {
    char *event = (where == LIST_HEAD) ? "lpop" : "rpop";

    notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
    if (listTypeLength(o) == 0) {
        if (deleted) *deleted = 1;

        dbDelete(c->db, key);
        notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
    } else {
        if (deleted) *deleted = 0;
    }
    // 发送键修改通知
    signalModifiedKey(c, c->db, key);
   //设置当前为脏(dirty),remove可能操作多个key,需对脏键(dirty)+count
    server.dirty += count;
}

list中另外特殊的地方在于,blpop或brpop会阻塞, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。这个机制的具体实现在blockingPopGenericCommand中
blockingPopGenericCommand:

/* Blocking RPOP/LPOP/LMPOP
 *
 * 'numkeys' is the number of keys.
 * 'timeout_idx' parameter position of block timeout.
 * 'where' LIST_HEAD for LEFT, LIST_TAIL for RIGHT.
 * 'count' is the number of elements requested to pop, or 0 for plain single pop.
 *
 * When count is 0, a reply of a single bulk-string will be used.
 * When count > 0, an array reply will be used. */
void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, int timeout_idx, long count) {
    robj *o;
    robj *key;
    mstime_t timeout;
    int j;
    //获取timeout
    if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS)
        != C_OK) return;

    /* Traverse all input keys, we take action only based on one key. */
    for (j = 0; j < numkeys; j++) {
        key = keys[j];
        //从这到紧接着的两个if都是校验,校验不通过则直接报错
        o = lookupKeyWrite(c->db, key);

        /* Non-existing key, move to next key. */
        if (o == NULL) continue;

        if (checkType(c, o, OBJ_LIST)) return;
        // 当前列表为空,则跳过当前开始对下个key操作
        long llen = listTypeLength(o);
        /* Empty list, move to next key. */
        if (llen == 0) continue;
        //非空的话需要pop
        if (count != 0) {
            /* BLMPOP, non empty list, like a normal [LR]POP with count option.
             * The difference here we pop a range of elements in a nested arrays way. */
            listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);

            /* Replicate it as [LR]POP COUNT. */
            robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
            rewriteClientCommandVector(c, 3,
                                       (where == LIST_HEAD) ? shared.lpop : shared.rpop,
                                       key, count_obj);
            decrRefCount(count_obj);
            return;
        }

        /* Non empty list, this is like a normal [LR]POP. */
        robj *value = listTypePop(o,where);
        serverAssert(value != NULL);

        addReplyArrayLen(c,2);
        addReplyBulk(c,key);
        addReplyBulk(c,value);
        decrRefCount(value);
        listElementsRemoved(c,key,where,o,1,NULL);

        /* Replicate it as an [LR]POP instead of B[LR]POP. */
        rewriteClientCommandVector(c,2,
            (where == LIST_HEAD) ? shared.lpop : shared.rpop,
            key);
        return;
    }

    /* If we are not allowed to block the client, the only thing
     * we can do is treating it as a timeout (even with timeout 0). */
    if (c->flags & CLIENT_DENY_BLOCKING) {
        addReplyNullArray(c);
        return;
    }

    /* If the keys do not exist we must block */
    struct blockPos pos = {where};
    //如果说key都不存在的话,那么开始阻塞
    blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL);
}

t_list特性

相对来说,t_list中定义了大量GenericCommand,好猜到的有pushGenericCommand,popGenericCommand,特殊写的有mpopGenericCommand,lmoveGenericCommand等,大部分调用的Command函数基本上只有一行,拼装不同的from与to实现不同的命令。
t_list另一个特殊点在于blocking的通用类,实现redis中少见的阻塞命令比如blpop等。