redis源码学习-t_string篇

t_string介绍

redis中,/src/下以t_为开头的c文件可以统一理解为上层容器,与之前介绍的sds,dict等不同,上层容器不定义专门的数据结构,只负责处理具体的redis命令,然后解析命令并操纵sds,dict等最底层数据结构完成数据的内存’持久化’或更新。
而t_string,则是负责处理与string相关的命令容器,比如经典的set,get,mset,mget等命令

t_string中负责处理的具体命令

  1. set:设定键值对
  2. setnx:设定键值对(键不存在时才设置值)
  3. setex:设定带过期时间的键值对(秒)
  4. psetex:设定带过期时间的键值对(毫秒)
  5. setrange:从给定的位数之后全部替代
  6. mset:一次设定多个键值对
  7. msetnx::一次设定多个键值对(键不存在时才设置值)
  8. get:获取key对应的value
  9. mget:获取多个key对应的value
  10. getrange:根据起始和结位置获取key对应的值的子字符串
  11. getset:获取指定的键,如果存在则修改其值;反之不进行操作
  12. incr:值递增1操作
  13. decr:值递减1操作
  14. incrby:值增加操作
  15. decrby:值减少操作
  16. append:追加key对应的值
  17. strlen: 获取key对应值得长度
    其中,每个具体命令都对应着一个具体的函数,全部定义在/src/server.h下,如上所属的具体命令的具体实现全部在t_string.c文件中,函数名称为命令名称+Command,采用驼峰命名法,比如set对应的就是SetCommand函数。

t_string所采用的底层数据结构

数据结构编码方式适用类型longint可用long表示的整数值sdsraw大于39字节的字符串sdsembstr小于40字节的字符串

t_string源码分析

相对于底层数据结构,上层容器中的源码分析就相对于无脑一些了,有点像写业务逻辑,底层优化基本都在sds等底层数据结构中做了。因此这里只分析些常见command的函数
setGenericCommand(通用类,抽出来供set,setnx,setex,psetex等command使用)

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */
    //如果定义了key的过期时间,那么根据&&的规则将会执行getExpireMillisecondsOrReply函数
    //getExpireMillisecondsOrReply中调用getLongLongFromObjectOrReply函数拿存当前key具体的过期时间(如果没拿到那么返回0)
    if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK) {
        return;
    }
    //下边的各种if都是兼容判断当前set的情况的,对于nx或者ex等都会走不同的分支
    if (flags & OBJ_SET_GET) {
        if (getGenericCommand(c) == C_ERR) return;
    }
    //lookupKeyWrite取出key的值
    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        if (!(flags & OBJ_SET_GET)) {
            addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
        }
        return;
    }

    genericSetKey(c,c->db,key, val,flags & OBJ_KEEPTTL,1);
    //设置当前为脏(dirty),每次修改一个key后,都会对脏键(dirty)增1
    server.dirty++;
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);

    if (expire) {
        setExpire(c,c->db,key,milliseconds);
        /* Propagate as SET Key Value PXAT millisecond-timestamp if there is
         * EX/PX/EXAT/PXAT flag. */
        robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);
        rewriteClientCommandVector(c, 5, shared.set, key, val, shared.pxat, milliseconds_obj);
        decrRefCount(milliseconds_obj);
        //发送"set"事件的通知,处理发布订阅(pub/sub)
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
    }

    if (!(flags & OBJ_SET_GET)) {
        addReply(c, ok_reply ? ok_reply : shared.ok);
    }

    /* Propagate without the GET argument (Isn't needed if we had expire since in that case we completely re-written the command argv) */
    if ((flags & OBJ_SET_GET) && !expire) {
        int argc = 0;
        int j;
        robj **argv = zmalloc((c->argc-1)*sizeof(robj*));
        for (j=0; j < c->argc; j++) {
            char *a = c->argv[j]->ptr;
            /* Skip GET which may be repeated multiple times. */
            if (j >= 3 &&
                (a[0] == 'g' || a[0] == 'G') &&
                (a[1] == 'e' || a[1] == 'E') &&
                (a[2] == 't' || a[2] == 'T') && a[3] == '\0')
                continue;
            argv[argc++] = c->argv[j];
            incrRefCount(c->argv[j]);
        }
        replaceClientCommandVector(c, argc, argv);
    }
}

其中,真正负责set操作的setKey在/src/db.c下,后续将单独分析。setKey底层使用的函数genericSetKey是具体负责的函数。如注释所说(All the new keys in the database should be created via this interface.),这个函数是负责全部set的统一接口。
如果还记得sds数据结构的话,结合从这里看到的,就会好理解redis中的过期策略。拿简单的set来说,如果设置了过期时间,而sds或者long在底层结构上并无字段来保存这个值,所以会将这个信息单独维护起来,这也是redis中的‘被动过期策略’。无论是定时,定期,或者懒汉式等等,都由一个引信来通过另外维护的过期队列进行操作sds或者long。这也是为什么redis在aof或rdb持久化重启后,比之前占用内存小的原因了。
set:

/* SET key value [NX] [XX] [KEEPTTL] [GET] [EX <seconds>] [PX <milliseconds>]
 *     [EXAT <seconds-timestamp>][PXAT <milliseconds-timestamp>] */
void setCommand(client *c) {
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_NO_FLAGS;
    //这个函数是取args并且正确的将set类型放置到flags里
    if (parseExtendedStringArgumentsOrReply(c,&flags,&unit,&expire,COMMAND_SET) != C_OK) {
        return;
    }
    // 判断value是否可以编码成整数,如果能则编码;反之不做处理。
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    //通用set方法
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
/*
 * The parseExtendedStringArgumentsOrReply() function performs the common validation for extended
 * string arguments used in SET and GET command.
 *
 * Get specific commands - PERSIST/DEL
 * Set specific commands - XX/NX/GET
 * Common commands - EX/EXAT/PX/PXAT/KEEPTTL
 *
 * Function takes pointers to client, flags, unit, pointer to pointer of expire obj if needed
 * to be determined and command_type which can be COMMAND_GET or COMMAND_SET.
 *
 * If there are any syntax violations C_ERR is returned else C_OK is returned.
 *
 * Input flags are updated upon parsing the arguments. Unit and expire are updated if there are any
 * EX/EXAT/PX/PXAT arguments. Unit is updated to millisecond if PX/PXAT is set.
 */
int parseExtendedStringArgumentsOrReply(client *c, int *flags, int *unit, robj **expire, int command_type) {

    int j = command_type == COMMAND_GET ? 2 : 3;
    //这里是看是否带有ex/px和nx/xx的参数,如果有的话则在flags中标记好类型
    for (; j < c->argc; j++) {
        char *opt = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        if ((opt[0] == 'n' || opt[0] == 'N') &&
            (opt[1] == 'x' || opt[1] == 'X') && opt[2] == '\0' &&
            !(*flags & OBJ_SET_XX) && (command_type == COMMAND_SET))
        {
            *flags |= OBJ_SET_NX;
        } else if ((opt[0] == 'x' || opt[0] == 'X') &&
                   (opt[1] == 'x' || opt[1] == 'X') && opt[2] == '\0' &&
                   !(*flags & OBJ_SET_NX) && (command_type == COMMAND_SET))
        {
            *flags |= OBJ_SET_XX;
        } else if ((opt[0] == 'g' || opt[0] == 'G') &&
                   (opt[1] == 'e' || opt[1] == 'E') &&
                   (opt[2] == 't' || opt[2] == 'T') && opt[3] == '\0' &&
                   (command_type == COMMAND_SET))
        {
            *flags |= OBJ_SET_GET;
        } else if (!strcasecmp(opt, "KEEPTTL") && !(*flags & OBJ_PERSIST) &&
            !(*flags & OBJ_EX) && !(*flags & OBJ_EXAT) &&
            !(*flags & OBJ_PX) && !(*flags & OBJ_PXAT) && (command_type == COMMAND_SET))
        {
            *flags |= OBJ_KEEPTTL;
        } else if (!strcasecmp(opt,"PERSIST") && (command_type == COMMAND_GET) &&
               !(*flags & OBJ_EX) && !(*flags & OBJ_EXAT) &&
               !(*flags & OBJ_PX) && !(*flags & OBJ_PXAT) &&
               !(*flags & OBJ_KEEPTTL))
        {
            *flags |= OBJ_PERSIST;
        } else if ((opt[0] == 'e' || opt[0] == 'E') &&
                   (opt[1] == 'x' || opt[1] == 'X') && opt[2] == '\0' &&
                   !(*flags & OBJ_KEEPTTL) && !(*flags & OBJ_PERSIST) &&
                   !(*flags & OBJ_EXAT) && !(*flags & OBJ_PX) &&
                   !(*flags & OBJ_PXAT) && next)
        {
            *flags |= OBJ_EX;
            *expire = next;
            j++;
        } else if ((opt[0] == 'p' || opt[0] == 'P') &&
                   (opt[1] == 'x' || opt[1] == 'X') && opt[2] == '\0' &&
                   !(*flags & OBJ_KEEPTTL) && !(*flags & OBJ_PERSIST) &&
                   !(*flags & OBJ_EX) && !(*flags & OBJ_EXAT) &&
                   !(*flags & OBJ_PXAT) && next)
        {
            *flags |= OBJ_PX;
            *unit = UNIT_MILLISECONDS;
            *expire = next;
            j++;
        } else if ((opt[0] == 'e' || opt[0] == 'E') &&
                   (opt[1] == 'x' || opt[1] == 'X') &&
                   (opt[2] == 'a' || opt[2] == 'A') &&
                   (opt[3] == 't' || opt[3] == 'T') && opt[4] == '\0' &&
                   !(*flags & OBJ_KEEPTTL) && !(*flags & OBJ_PERSIST) &&
                   !(*flags & OBJ_EX) && !(*flags & OBJ_PX) &&
                   !(*flags & OBJ_PXAT) && next)
        {
            *flags |= OBJ_EXAT;
            *expire = next;
            j++;
        } else if ((opt[0] == 'p' || opt[0] == 'P') &&
                   (opt[1] == 'x' || opt[1] == 'X') &&
                   (opt[2] == 'a' || opt[2] == 'A') &&
                   (opt[3] == 't' || opt[3] == 'T') && opt[4] == '\0' &&
                   !(*flags & OBJ_KEEPTTL) && !(*flags & OBJ_PERSIST) &&
                   !(*flags & OBJ_EX) && !(*flags & OBJ_EXAT) &&
                   !(*flags & OBJ_PX) && next)
        {
            *flags |= OBJ_PXAT;
            *unit = UNIT_MILLISECONDS;
            *expire = next;
            j++;
        } else {
            //说明参数有问题,需要上报异常
            addReplyErrorObject(c,shared.syntaxerr);
            return C_ERR;
        }
    }
    return C_OK;
}

其余的函数就不全部放出来了,与setCommand几乎一致,基本都是业务逻辑了,

t_string.c中隐含的redis机制

过期策略
redis中的k/v与expire为分开保存的,redis中无主动过期。被动过期策略中定时,定期完全都从expire中入手的
订阅发布
拿set来说,真正支持订阅与发布的机制统一在notifyKeyspaceEvent函数中,每个具体的例如setGenericCommand通用类中负责调用发布订阅的接口