redis源码学习-ziplist篇

ziplist介绍

ziplist翻译为压缩链表,是一个经过特殊编码的用于存储字符串或整数的双向链表。ziplist整体为连续内存块组成,达到压缩效果。但对于ziplist底层实现看,并没有真正的链表结构,连续两个节点之间内存是连续的,next和last的操作为移动地址偏移量得到的。ziplist对于内存利用十分高效,也因此有着较大的复杂性。

ziplist分析

ziplist定义相对于其他redis的底层数据结构较为特殊,无明确struct,创建时仅返回ziplist的首地址。ziplist的其余信息则通过连续内存块获得。
ziplist图示结构如下:

其中可以手动将ziplist分为三部分,header,entrys,end
header:

  1. zlbytes:既为ziplist的起始地址,又负责记录ziplist的字节长度,zlbytes固定4字节,也就代表了一个ziplist最长为(2^32)-1字节
  2. zltail:ziplist尾元素相对于起始地址的偏移量,可以直接定位到ziplist的尾部,快速地执行push或pop操作,固定为4字节
  3. zlen:记录ziplist中的entry个数
    entry:
    entry负责存储ziplist中的每个节点的值。entry中也是由特殊的结构构成,每个entry也由图示加说明介绍:

    每个entry也可以分为三部分,分别为entry-header,entry-encoding,entry-data
    entry-header:
    prevlength:每个entry的prevlength默认都为1字节,记录的是上一个节点的长度。
  4. 假如上一个节点的长度<254则entry中的header仅仅为占据一个字节的prevlength。
  5. 假如上一个节点的长>=254的话,prevlength则变形为固定的254(一个字节)+占据三个字节的prevlength。
  6. 其中prevlength变形的前提为大于254而非大于等于254,原因在于255被预留作为zlend的值,它用于判断ziplist是否到达尾部。254则被预留作为判断prevlength变形的标志。
    entry-encoding:每个ziplist的entry都可以用来保存整数或字符串,
  7. 对于整数来说,encoding固定为1字节即8位,高2位用来区分整数节点和字符串节点(高2位为11时是整数节点),低6位用来区分整数节点的类型,分为INT_8,INT_16,INT_24,INT_32,INT_64。比较特殊的是,整数值1~13的节点没有data,采用encoding的低四位用来表示data。
  8. 对于字符串来说,encoding根据data的长度,分为8位、16位、40位三种长度。data小于64字节时(26),encoding为8位,高2位为00,低6位表示data的长度。data在64-16383字节时(214),encoding为16位,高2位为01,低14位表示data的长度。data在16383-4294967296字节时(2^32),encoding为40位,高2位为10,低6为不使用,这8位后的连续32位表示data的长度
    entry-data:真正存储的内容,其中较为特殊的是,如果string本身长度已经超过32,那么不进行操作,否则将字符串转为long long类型的结果进行压缩编码。
    entry部分结束
    end:
    为ziplist的结尾,恒等于0xFF
    如上介绍,不难看出ziplist为内存连续的,首先定义header,预留空间用来记录首尾地址,其中每个具体entry中也记录上个节点偏移量与自身所占长度,没有使用链表,也实现了next与tail操作。整个ziplist唯一空间浪费的地方在于字符串中encoding在40位情况下的第3-8位。

ziplist源码分析

create:
/n/n/n/n/n

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
    zl[bytes-1] = ZIP_END;
    return zl;
}/n/n/n/n/n

create不复杂,采用zmalloc分配堆内存,初始化好header与end共11字节。
insert:
/n/n/n/n/n

/* Insert item at "p". */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, newlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;
    //根据插入的位置得到前一个entry节点的长度。
    /* Find out prevlen for the entry that is inserted. */
    if (p[0] != ZIP_END) {
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            //获取entryN元素的长度
            prevlen = zipRawEntryLengthSafe(zl, curlen, ptail);
        }
    }
    //获取到上一个节点长度后尝试对需要存放的数据进行编码(对于小于32位的字符串转long long)
    /* See if the entry can be encoded */
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    //将会计算前一节点长度与当前数据的字节大小之和,作为新插入节点下一个节点的起始位置
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry's length in
     * its prevlen field. */
    int forcelarge = 0;
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    if (nextdiff == -4 && reqlen < 4) {
        nextdiff = 0;
        forcelarge = 1;
    }

    /* Store offset because a realloc may change the address of zl. */
    offset = p-zl;
    newlen = curlen+reqlen+nextdiff;
    zl = ziplistResize(zl,newlen);
    //重新计算偏移量,重新申请长度,通过新的地址与偏移量算出插入位置。
    p = zl+offset;
     
    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        //p+reqlen为下一个节点的起始位置,将新节点后的数据移动到目标位置
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
        //假如说forcelarge=1,那么说明新的节点大小需要强制扩展为4个字节,以适应原本的节点表示上一节点的长度大小
        /* Encode this entry's raw length in the next entry. */
        if (forcelarge)
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);

        /* Update offset for tail */
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        assert(zipEntrySafe(zl, newlen, p+reqlen, &tail, 1));
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }
    
    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    if (nextdiff != 0) {
        offset = p-zl;
        //连锁反应,如果下一个节点的由于当前节点的插入需要增加的长度超过了254,那么也需要将其下一个节点连锁增加,直到不需要增加长度的节点出现为止。
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    p += zipStorePrevEntryLength(p,prevlen);
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}/n/n/n/n/n

insert或delete引出的连锁更新:
/n/n/n/n/n

unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    zlentry cur;
    size_t prevlen, prevlensize, prevoffset; /* Informat of the last changed entry. */
    size_t firstentrylen; /* Used to handle insert at head. */
    size_t rawlen, curlen = intrev32ifbe(ZIPLIST_BYTES(zl));
    size_t extra = 0, cnt = 0, offset;
    size_t delta = 4; /* Extra bytes needed to update a entry's prevlen (5-1). */
    unsigned char *tail = zl + intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl));

    /* Empty ziplist */
    if (p[0] == ZIP_END) return zl;

    zipEntry(p, &cur); /* no need for "safe" variant since the input pointer was validated by the function that returned it. */
    firstentrylen = prevlen = cur.headersize + cur.len;
    prevlensize = zipStorePrevEntryLength(NULL, prevlen);
    prevoffset = p - zl;
    p += prevlen;

    /* Iterate ziplist to find out how many extra bytes do we need to update it. */
    while (p[0] != ZIP_END) {
        assert(zipEntrySafe(zl, curlen, p, &cur, 0));

        /* Abort when "prevlen" has not changed. */
        if (cur.prevrawlen == prevlen) break;

        /* Abort when entry's "prevlensize" is big enough. */
        if (cur.prevrawlensize >= prevlensize) {
            if (cur.prevrawlensize == prevlensize) {
                zipStorePrevEntryLength(p, prevlen);
            } else {
                /* This would result in shrinking, which we want to avoid.
                 * So, set "prevlen" in the available bytes. */
                zipStorePrevEntryLengthLarge(p, prevlen);
            }
            break;
        }

        /* cur.prevrawlen means cur is the former head entry. */
        assert(cur.prevrawlen == 0 || cur.prevrawlen + delta == prevlen);

        /* Update prev entry's info and advance the cursor. */
        rawlen = cur.headersize + cur.len;
        prevlen = rawlen + delta; 
        prevlensize = zipStorePrevEntryLength(NULL, prevlen);
        prevoffset = p - zl;
        p += rawlen;
        extra += delta;
        cnt++;
    }

    /* Extra bytes is zero all update has been done(or no need to update). */
    if (extra == 0) return zl;

    /* Update tail offset after loop. */
    if (tail == zl + prevoffset) {
        /* When the the last entry we need to update is also the tail, update tail offset
         * unless this is the only entry that was updated (so the tail offset didn't change). */
        if (extra - delta != 0) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra-delta);
        }
    } else {
        /* Update the tail offset in cases where the last entry we updated is not the tail. */
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
    }

    /* Now "p" points at the first unchanged byte in original ziplist,
     * move data after that to new ziplist. */
    offset = p - zl;
    zl = ziplistResize(zl, curlen + extra);
    p = zl + offset;
    memmove(p + extra, p, curlen - offset - 1);
    p += extra;

    /* Iterate all entries that need to be updated tail to head. */
    while (cnt) {
        zipEntry(zl + prevoffset, &cur); /* no need for "safe" variant since we already iterated on all these entries above. */
        rawlen = cur.headersize + cur.len;
        /* Move entry to tail and reset prevlen. */
        memmove(p - (rawlen - cur.prevrawlensize), 
                zl + prevoffset + cur.prevrawlensize, 
                rawlen - cur.prevrawlensize);
        p -= (rawlen + delta);
        if (cur.prevrawlen == 0) {
            /* "cur" is the previous head entry, update its prevlen with firstentrylen. */
            zipStorePrevEntryLength(p, firstentrylen);
        } else {
            /* An entry's prevlen can only increment 4 bytes. */
            zipStorePrevEntryLength(p, cur.prevrawlen+delta);
        }
        /* Forward to previous entry. */
        prevoffset -= cur.prevrawlen;
        cnt--;
    }
    return zl;
}/n/n/n/n/n

连锁扩容代码并不复杂,出现的原因在于新节点的加入,假如插入了一个新的节点p,他的下一个节点p+1本身存储了节点p的上个节点p-1的length。如果说p-1节点是个长度小于254个字节的数,p+1中将会一个字节的大小来存放entry中的数据,但是当节点p插入后,且长度大于等于254,p+1就会申请更大的空间来存放prevlength数据。p+1新申请了空间,有可能导致p+2存储的长度不够用,直到p+n不用新申请长度就能更新prevlength或者到达ziplist的末端为止。最坏的时间复杂度为O(n^2)
Delete:
/n/n/n/n/n

/* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;
    size_t zlbytes = intrev32ifbe(ZIPLIST_BYTES(zl));

    zipEntry(p, &first); /* no need for "safe" variant since the input pointer was validated by the function that returned it. */
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLengthSafe(zl, zlbytes, p);
        deleted++;
    }

    assert(p >= first.p);
    totlen = p-first.p; /* Bytes taken by the element(s) to delete. */
    if (totlen > 0) {
        uint32_t set_tail;
        if (p[0] != ZIP_END) {
            /* Storing `prevrawlen` in this entry may increase or decrease the
             * number of bytes required compare to the current `prevrawlen`.
             * There always is room to store this, because it was previously
             * stored by an entry that is now being deleted. */
            nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);

            /* Note that there is always space when p jumps backward: if
             * the new previous entry is large, one of the deleted elements
             * had a 5 bytes prevlen header, so there is for sure at least
             * 5 bytes free and we need just 4. */
            p -= nextdiff;
            assert(p >= first.p && p<zl+zlbytes-1);
            zipStorePrevEntryLength(p,first.prevrawlen);

            /* Update offset for tail */
            set_tail = intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen;

            /* When the tail contains more than one entry, we need to take
             * "nextdiff" in account as well. Otherwise, a change in the
             * size of prevlen doesn't have an effect on the *tail* offset. */
            assert(zipEntrySafe(zl, zlbytes, p, &tail, 1));
            if (p[tail.headersize+tail.len] != ZIP_END) {
                set_tail = set_tail + nextdiff;
            }

            /* Move tail to the front of the ziplist */
            /* since we asserted that p >= first.p. we know totlen >= 0,
             * so we know that p > first.p and this is guaranteed not to reach
             * beyond the allocation, even if the entries lens are corrupted. */
            size_t bytes_to_move = zlbytes-(p-zl)-1;
            memmove(first.p,p,bytes_to_move);
        } else {
            /* The entire tail was deleted. No need to move memory. */
            set_tail = (first.p-zl)-first.prevrawlen;
        }

        /* Resize the ziplist */
        offset = first.p-zl;
        zlbytes -= totlen - nextdiff;
        zl = ziplistResize(zl, zlbytes);
        p = zl+offset;

        /* Update record count */
        ZIPLIST_INCR_LENGTH(zl,-deleted);

        /* Set the tail offset computed above */
        assert(set_tail <= zlbytes - ZIPLIST_END_SIZE);
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(set_tail);

        /* When nextdiff != 0, the raw length of the next entry has changed, so
         * we need to cascade the update throughout the ziplist */
        if (nextdiff != 0)
            zl = __ziplistCascadeUpdate(zl,p);
    }
    return zl;
}

删除操作与insert操作类似,看懂insert即可,需注意,delete中也会导致节点p的上一个节点的length发生变化,因此也需要引入连锁更新修复ziplist的结构。
find:
/n/n/n/n/n

/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries
 * between every comparison. Returns NULL when the field could not be found. */
unsigned char *ziplistFind(unsigned char *zl, unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
    int skipcnt = 0;
    unsigned char vencoding = 0;
    long long vll = 0;
    size_t zlbytes = ziplistBlobLen(zl);

    while (p[0] != ZIP_END) {
        struct zlentry e;
        unsigned char *q;

        assert(zipEntrySafe(zl, zlbytes, p, &e, 1));
        q = p + e.prevrawlensize + e.lensize;

        if (skipcnt == 0) {
            /* Compare current entry with specified entry */
            if (ZIP_IS_STR(e.encoding)) {
                if (e.len == vlen && memcmp(q, vstr, vlen) == 0) {
                    return p;
                }
            } else {
                /* Find out if the searched field can be encoded. Note that
                 * we do it only the first time, once done vencoding is set
                 * to non-zero and vll is set to the integer value. */
                if (vencoding == 0) {
                    if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
                        /* If the entry can't be encoded we set it to
                         * UCHAR_MAX so that we don't retry again the next
                         * time. */
                        vencoding = UCHAR_MAX;
                    }
                    /* Must be non-zero by now */
                    assert(vencoding);
                }

                /* Compare current entry with specified entry, do it only
                 * if vencoding != UCHAR_MAX because if there is no encoding
                 * possible for the field it can't be a valid integer. */
                if (vencoding != UCHAR_MAX) {
                    long long ll = zipLoadInteger(q, e.encoding);
                    if (ll == vll) {
                        return p;
                    }
                }
            }

            /* Reset skip count */
            skipcnt = skip;
        } else {
            /* Skip entry */
            skipcnt--;
        }

        /* Move to next entry */
        p = q + e.len;
    }

    return NULL;
}/n/n/n/n/n

find相对实现起来就更加无脑了,循环遍历。需要注意的是,find函数需传一个skip的值。skip的引入主要是上层容器的使用。假如说上层容器是一个hash结构,使用了ziplist,那么会按照顺序先存储key,再存储value,那么find的时候只需要查key即可,通过skip控制多跳一下,提升查找效率。

ziplist总结

  1. ziplist在空间利用率上极高,每个entry最多只有6字节的浪费。
  2. ziplist底层结构无链表,通过内存偏移量获取next或last节点位置
  3. ziplist在插入和删除的时候有相当大的概率出现连锁更新,因此在使用时尽量保证所存value位数相同,否则最坏会出现O(n^2)的操作,虽然整体来说性能还是很强。这也是大多数同学在使用redis时都不会注意优化的地方