redis对象编码源码阅读——有序集合编码和转码

redis对象编码源码阅读——有序集合编码和转码

1. 有序集合对象的编码类型

类型编码对象
REDIS_ZSETREDIS_ENCODING_ZIPLIST使用压缩列表对象实现的有序集合对象
REDIS_ZSETREDIS_ENCODING_SKIPLIST使用跳跃表和字典实现的有序集合对象

2. 两种编码的转换

void zsetConvert(robj *zobj, int encoding) {
    zset *zs;
    zskiplistNode *node, *next;
    robj *ele;
    double score;

    if (zobj->encoding == encoding) return;

2.1 压缩列表转跳表

    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;

        if (encoding != REDIS_ENCODING_SKIPLIST)
            redisPanic("Unknown target encoding");
  1. 重新申请空间
        zs = zmalloc(sizeof(*zs));
        zs->dict = dictCreate(&zsetDictType,NULL);
        zs->zsl = zslCreate();
  1. 得到 eptr = ziplist[0]sptr = ziplist[1] = ziplist[0]->next 指针
        eptr = ziplistIndex(zl,0);
        redisAssertWithInfo(NULL,zobj,eptr != NULL);
        sptr = ziplistNext(zl,eptr);
        redisAssertWithInfo(NULL,zobj,sptr != NULL);
  • ziplistIndex: 通过下标index,得到ziplist[index]
/* Returns an offset to use for iterating with ziplistNext. When the given
 * index is negative, the list is traversed back to front. When the list
 * doesn't contain an element at the provided index, NULL is returned. */
unsigned char *ziplistIndex(unsigned char *zl, int index) {
    unsigned char *p;
    unsigned int prevlensize, prevlen = 0;
    // 当 index < 0 的时候,反向遍历找到对应位置
    // 比如 index = -2, 就是倒数第二个节点
    // 注意:倒数第一个节点是 ZIP_END
    if (index < 0) {
        index = (-index)-1;
        // 得到压缩列表的尾节点
        p = ZIPLIST_ENTRY_TAIL(zl);
        if (p[0] != ZIP_END) {
            // 得到前一个节点的长度
            ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            while (prevlen > 0 && index--) {
                p -= prevlen;
                ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            }
        }
    // 正向遍历
    } else {
        // 得到压缩列表的头节点
        p = ZIPLIST_ENTRY_HEAD(zl);
        while (p[0] != ZIP_END && index--) {
            p += zipRawEntryLength(p);
        }
    }
    return (p[0] == ZIP_END || index > 0) ? NULL : p;
}

下面部分内容请结合《redis的设计与实现》第53-56页阅读

  • ZIPLIST_ENTRY_TAIL: 这里是用zl首地址加上zltail,直接指向了尾节点。(第53页)

  • ZIPLIST_ENTRY_HEAD: 直接用zl首地址加上zlbytes, zltail, zllen的长度。(第53页)

#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))
  • ZIP_DECODE_PREVLENSIZE, ZIP_DECODE_PREVLEN: 先看节点的第一个字节(ptr)[0],当它小于ZIP_BIGLEN: 254时,则(ptr)[0]直接表示前一个节点的长度previous_entry_length。否则,后面的四个字节(ptr)[1-4]来表示前一个节点的长度previous_entry_length。所以先用ZIP_DECODE_PREVLENSIZE来得到实际的previous_entry_length。(第54页)
  • 这里的previous_entry_length主要就是用来反向遍历用的。
  • 不同的previous_entry_length编码是为了节省空间
/* Decode the number of bytes required to store the length of the previous
 * element, from the perspective of the entry pointed to by 'ptr'. */
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {                          \
    if ((ptr)[0] < ZIP_BIGLEN) {                                               \
        (prevlensize) = 1;                                                     \
    } else {                                                                   \
        (prevlensize) = 5;                                                     \
    }                                                                          \
} while(0);

/* Decode the length of the previous element, from the perspective of the entry
 * pointed to by 'ptr'. */
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do {                     \
    ZIP_DECODE_PREVLENSIZE(ptr, prevlensize);                                  \
    if ((prevlensize) == 1) {                                                  \
        (prevlen) = (ptr)[0];                                                  \
    } else if ((prevlensize) == 5) {                                           \
        assert(sizeof((prevlensize)) == 4);                                    \
        memcpy(&(prevlen), ((char*)(ptr)) + 1, 4);                             \
        memrev32ifbe(&prevlen);                                                \
    }                                                                          \
} while(0);
  • zipRawEntryLength: 得到当前节点的长度
/* Return the total number of bytes used by the entry pointed to by 'p'. */
static unsigned int zipRawEntryLength(unsigned char *p) {
    unsigned int prevlensize, encoding, lensize, len;
    ZIP_DECODE_PREVLENSIZE(p, prevlensize);
    ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
    return prevlensize + lensize + len;
}
  • ZIP_ENTRY_ENCODING: 得到content的编码encoding。(第56页)
  • ZIP_DECODE_LENGTH:
/* Extract the encoding from the byte pointed by 'ptr' and set it into
 * 'encoding'. */
#define ZIP_ENTRY_ENCODING(ptr, encoding) do {  \
    (encoding) = (ptr[0]); \
    if ((encoding) < ZIP_STR_MASK) (encoding) &= ZIP_STR_MASK; \
} while(0)

/* Decode the length encoded in 'ptr'. The 'encoding' variable will hold the
 * entries encoding, the 'lensize' variable will hold the number of bytes
 * required to encode the entries length, and the 'len' variable will hold the
 * entries length. */
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {                    \
    ZIP_ENTRY_ENCODING((ptr), (encoding));                                     \
    // 字节数组编码
    if ((encoding) < ZIP_STR_MASK) {                                           \
        // 00bbbbbb
        if ((encoding) == ZIP_STR_06B) {                                       \
            (lensize) = 1;                                                     \
            (len) = (ptr)[0] & 0x3f;                                           \
        // 01bbbbbb xxxxxxxx
        } else if ((encoding) == ZIP_STR_14B) {                                \
            (lensize) = 2;                                                     \
            (len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];                       \
        // 10______ aaaaaaaa bbbbbbbb cccccccc dddddddd
        } else if (encoding == ZIP_STR_32B) {                                  \
            (lensize) = 5;                                                     \
            (len) = ((ptr)[1] << 24) |                                         \
                    ((ptr)[2] << 16) |                                         \
                    ((ptr)[3] <<  8) |                                         \
                    ((ptr)[4]);                                                \
        } else {                                                               \
            assert(NULL);                                                      \
        }                                                                      \
    // 整数编码
    } else {                                                                   \
        (lensize) = 1;                                                         \
        (len) = zipIntSize(encoding);                                          \
    }                                                                          \
} while(0);

/* Return bytes needed to store integer encoded by 'encoding' */
static unsigned int zipIntSize(unsigned char encoding) {
    switch(encoding) {
    case ZIP_INT_8B:  return 1;
    case ZIP_INT_16B: return 2;
    case ZIP_INT_24B: return 3;
    case ZIP_INT_32B: return 4;
    case ZIP_INT_64B: return 8;
    default: return 0; /* 4 bit immediate */
    }
    assert(NULL);
    return 0;
}
  • ziplistNext: 把指针指向下一个节点
/* Return pointer to next entry in ziplist.
 *
 * zl is the pointer to the ziplist
 * p is the pointer to the current element
 *
 * The element after 'p' is returned, otherwise NULL if we are at the end. */
unsigned char *ziplistNext(unsigned char *zl, unsigned char *p) {
    ((void) zl);

    /* "p" could be equal to ZIP_END, caused by ziplistDelete,
     * and we should return NULL. Otherwise, we should return NULL
     * when the *next* element is ZIP_END (there is no next entry). */
    if (p[0] == ZIP_END) {
        return NULL;
    }

    p += zipRawEntryLength(p);
    if (p[0] == ZIP_END) {
        return NULL;
    }

    return p;
}
  1. 取出score和节点的值ele

score的位置在ele的后面。所以zzlGetScore传递的是后面的节点sptr

        while (eptr != NULL) {
            score = zzlGetScore(sptr);
            redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));

zzlGetScore其实也是直接调用的ziplistGet。只不过调用了之后针对scoredouble类型进行了一些处理。

double zzlGetScore(unsigned char *sptr) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    char buf[128];
    double score;

    redisAssert(sptr != NULL);
    redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));

    if (vstr) {
        memcpy(buf,vstr,vlen);
        buf[vlen] = '\0';
        score = strtod(buf,NULL);
    } else {
        score = vlong;
    }

    return score;
}

ziplistGet就是把一个节点中的值(可能是字符串,也可能是整数)取出来,如果是字符串则把字符串存在sstr中,字符串长度存在slen中;如果是整数则把值存在sval中。

/* Get entry pointed to by 'p' and store in either '*sstr' or 'sval' depending
 * on the encoding of the entry. '*sstr' is always set to NULL to be able
 * to find out whether the string pointer or the integer value was set.
 * Return 0 if 'p' points to the end of the ziplist, 1 otherwise. */
unsigned int ziplistGet(unsigned char *p, unsigned char **sstr, unsigned int *slen, long long *sval) {
    zlentry entry;
    if (p == NULL || p[0] == ZIP_END) return 0;
    if (sstr) *sstr = NULL;

    // 把节点相关的数据提取到 zlentry 数据结构中,方便操作
    // 其中有 encoding, 各种长度
    entry = zipEntry(p);
    if (ZIP_IS_STR(entry.encoding)) {
        if (sstr) {
            *slen = entry.len;
            *sstr = p+entry.headersize;
        }
    } else {
        if (sval) {
            *sval = zipLoadInteger(p+entry.headersize,entry.encoding);
        }
    }
    return 1;
}

把取出来的值,重新再转换成 sds类型的 ele

            if (vstr == NULL)
                ele = createStringObjectFromLongLong(vlong);
            else
                ele = createStringObject((char*)vstr,vlen);
  1. 插入到跳表中

跳表的具体内容可参考Redis(2)——跳跃表

            /* Has incremented refcount since it was just created. */
            node = zslInsert(zs->zsl,score,ele);
            redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
            incrRefCount(ele); /* Added to dictionary. */
            // 注意这里的 zzlNext 和 ziplistNext 是不同的
            // zzlNext 是给有序集合用的,因为有序集合每一个值都伴随一个 score
            // 相当于两个压缩列表节点才是一个有序集合的节点
            zzlNext(zl,&eptr,&sptr);
        }
  1. 释放空间,调整编码
        zfree(zobj->ptr);
        zobj->ptr = zs;
        zobj->encoding = REDIS_ENCODING_SKIPLIST;

2.2 跳表转压缩列表

    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
        unsigned char *zl = ziplistNew();

        if (encoding != REDIS_ENCODING_ZIPLIST)
            redisPanic("Unknown target encoding");

释放掉部分跳表的空间

        /* Approach similar to zslFree(), since we want to free the skiplist at
         * the same time as creating the ziplist. */
        zs = zobj->ptr;
        dictRelease(zs->dict);
        node = zs->zsl->header->level[0].forward;
        zfree(zs->zsl->header);
        zfree(zs->zsl);

循环把节点插入到压缩列表中

        while (node) {
            ele = getDecodedObject(node->obj);
            // 第二个参数为 NULL 时表示压入表尾
            // 因为之前跳表中的数据已经是有序的了
            zl = zzlInsertAt(zl,NULL,ele,node->score);
            decrRefCount(ele);

            next = node->level[0].forward;
            zslFreeNode(node);
            node = next;
        }

        zfree(zs);
        zobj->ptr = zl;
        zobj->encoding = REDIS_ENCODING_ZIPLIST;
    } else {
        redisPanic("Unknown sorted set encoding");
    }
}

TODO: 最后这里压缩列表的insert也挺复杂的,以后再补吧。。。

3. 为什么有了ziplistIndex还需要ziplistNext

因为ziplistIndex每次都需要从头开始循环,一直循环到ziplist[index]ziplistNext可以避免这种情况。