redis源码学习-t_list篇
0 条评论redis源码学习-t_list篇
t_list介绍
t_list,负责处理与list相关的命令容器,比如rpush,lpush等命令。底层使用quicklist,不熟悉的同学可以看
t_list中负责处理的具体命令
- lpush:将一个或多个值插入到列表头部
- blpop:移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
- brpop:移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
- lindex:通过索引获取列表中的元素
- linsert:在列表的元素前或者后插入元素
- llen:获取列表长度
- lpop:移出并获取列表的第一个元素
- lpush:将一个或多个值插入到列表头部
- lpushx:将一个值插入到已存在的列表头部
- lrange:获取列表指定范围内的元素
- lrem:移除列表元素
- lset:通过索引设置列表元素的值
- ltrim:对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
- rpop:移除列表的最后一个元素,返回值为移除的元素。
- rpoplpush:移除列表的最后一个元素,并将该元素添加到另一个列表并返回
- rpush:在列表中添加一个或多个值
- rpushx:为已存在的列表添加值
其中,每个具体命令都对应着一个具体的函数,全部定义在/src/server.h下,如上所属的具体命令的具体实现全部在t_list.c文件中,函数名称为命令名称+Command,采用驼峰命名法,比如lpush对应的就是lpushCommand函数。
t_list所采用的底层数据结构
数据结构编码方式适用类型quicklistQUICKLISTall
t_list分析
与tstring不同,t_list定义了两个结构体 listTypeIterator 与listTypeEntry
/* Structure to hold list iteration abstraction. */
typedef struct {
robj *subject;
unsigned char encoding;
unsigned char direction; /* Iteration direction */
quicklistIter *iter;
} listTypeIterator;
/* Structure for an entry while iterating over a list. */
typedef struct {
listTypeIterator *li;
quicklistEntry entry; /* Entry in quicklist */
} listTypeEntry;
其中subject为redisObject,后续文章将会分析到。结合listTypeIterator和listTypeEntry会发现,listTypeEntry定义了list的头与一个listTypeIterator,listTypeIterator中负责保存迭代的方向以及list的编码方式。
t_list中也定义了通用的底层函数
pushGenericCommand:
/* Implements LPUSH/RPUSH/LPUSHX/RPUSHX.
* 'xx': push if key exists. */
void pushGenericCommand(client *c, int where, int xx) {
int j;
//查看redis中是否存在这个key,如果存在key但类型不符将直接返回
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
if (checkType(c,lobj,OBJ_LIST)) return;
if (!lobj) {
//xx默认为0,仅lpush与rpush使用到了
if (xx) {
//将值添加到输出缓存区中
addReply(c, shared.czero);
return;
}
//如果对象不存在,那么使用quicklist创建对象,并且初始化
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
//将键添加到相对应的db中(常用的都是db0)
dbAdd(c->db,c->argv[1],lobj);
}
for (j = 2; j < c->argc; j++) {
// 真正的push操作
listTypePush(lobj,c->argv[j],where);
//设置当前为脏(dirty),每次修改一个key后,都会对脏键(dirty)增1
server.dirty++;
}
// 返回添加的节点数量
addReplyLongLong(c, listTypeLength(lobj));
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
// 发送键修改通知
signalModifiedKey(c,c->db,c->argv[1]);
//供订阅/消费模块使用
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
从这里可以看到,t_list中全部为quicklist
popGenericCommand:
/* Implements the generic list pop operation for LPOP/RPOP.
* The where argument specifies which end of the list is operated on. An
* optional count may be provided as the third argument of the client's
* command. */
void popGenericCommand(client *c, int where) {
long count = 0;
robj *value;
//格式校验
if (c->argc > 3) {
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return;
} else if (c->argc == 3) {
/* Parse the optional count argument. */
if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK)
return;
if (count == 0) {
/* Fast exit path. */
addReplyNullArray(c);
return;
}
}
//查看key是否存在或类型是否正确
robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]);
if (o == NULL || checkType(c, o, OBJ_LIST))
return;
if (!count) {
/* Pop a single element. This is POP's original behavior that replies
* with a bulk string. */
value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyBulk(c,value);
decrRefCount(value);
//其中供订阅模块通知的函数写在了这里,作为remove后的收尾统一函数
listElementsRemoved(c,c->argv[1],where,o,1,NULL);
} else {
/* Pop a range of elements. An addition to the original POP command,
* which replies with a multi-bulk. */
long llen = listTypeLength(o);
long rangelen = (count > llen) ? llen : count;
long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
int reverse = (where == LIST_HEAD) ? 0 : 1;
addListRangeReply(c,o,rangestart,rangeend,reverse);
listTypeDelRange(o,rangestart,rangelen);
//其中供订阅模块通知的函数写在了这里,作为remove后的收尾统一函数
listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL);
}
}
listElementsRemoved(remove的收尾函数):
/* A housekeeping helper for list elements popping tasks.
*
* 'deleted' is an optional output argument to get an indication
* if the key got deleted by this function. */
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted) {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
if (listTypeLength(o) == 0) {
if (deleted) *deleted = 1;
dbDelete(c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
} else {
if (deleted) *deleted = 0;
}
// 发送键修改通知
signalModifiedKey(c, c->db, key);
//设置当前为脏(dirty),remove可能操作多个key,需对脏键(dirty)+count
server.dirty += count;
}
list中另外特殊的地方在于,blpop或brpop会阻塞, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。这个机制的具体实现在blockingPopGenericCommand中
blockingPopGenericCommand:
/* Blocking RPOP/LPOP/LMPOP
*
* 'numkeys' is the number of keys.
* 'timeout_idx' parameter position of block timeout.
* 'where' LIST_HEAD for LEFT, LIST_TAIL for RIGHT.
* 'count' is the number of elements requested to pop, or 0 for plain single pop.
*
* When count is 0, a reply of a single bulk-string will be used.
* When count > 0, an array reply will be used. */
void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, int timeout_idx, long count) {
robj *o;
robj *key;
mstime_t timeout;
int j;
//获取timeout
if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS)
!= C_OK) return;
/* Traverse all input keys, we take action only based on one key. */
for (j = 0; j < numkeys; j++) {
key = keys[j];
//从这到紧接着的两个if都是校验,校验不通过则直接报错
o = lookupKeyWrite(c->db, key);
/* Non-existing key, move to next key. */
if (o == NULL) continue;
if (checkType(c, o, OBJ_LIST)) return;
// 当前列表为空,则跳过当前开始对下个key操作
long llen = listTypeLength(o);
/* Empty list, move to next key. */
if (llen == 0) continue;
//非空的话需要pop
if (count != 0) {
/* BLMPOP, non empty list, like a normal [LR]POP with count option.
* The difference here we pop a range of elements in a nested arrays way. */
listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
/* Replicate it as [LR]POP COUNT. */
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
rewriteClientCommandVector(c, 3,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
key, count_obj);
decrRefCount(count_obj);
return;
}
/* Non empty list, this is like a normal [LR]POP. */
robj *value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyArrayLen(c,2);
addReplyBulk(c,key);
addReplyBulk(c,value);
decrRefCount(value);
listElementsRemoved(c,key,where,o,1,NULL);
/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
key);
return;
}
/* If we are not allowed to block the client, the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_DENY_BLOCKING) {
addReplyNullArray(c);
return;
}
/* If the keys do not exist we must block */
struct blockPos pos = {where};
//如果说key都不存在的话,那么开始阻塞
blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL);
}
t_list特性
相对来说,t_list中定义了大量GenericCommand,好猜到的有pushGenericCommand,popGenericCommand,特殊写的有mpopGenericCommand,lmoveGenericCommand等,大部分调用的Command函数基本上只有一行,拼装不同的from与to实现不同的命令。
t_list另一个特殊点在于blocking的通用类,实现redis中少见的阻塞命令比如blpop等。