redis对象编码源码阅读——有序集合编码和转码
redis对象编码源码阅读——有序集合编码和转码
1. 有序集合对象的编码类型
类型 | 编码 | 对象 |
---|---|---|
REDIS_ZSET | REDIS_ENCODING_ZIPLIST | 使用压缩列表对象实现的有序集合对象 |
REDIS_ZSET | REDIS_ENCODING_SKIPLIST | 使用跳跃表和字典实现的有序集合对象 |
2. 两种编码的转换
void zsetConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
robj *ele;
double score;
if (zobj->encoding == encoding) return;
2.1 压缩列表转跳表
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (encoding != REDIS_ENCODING_SKIPLIST)
redisPanic("Unknown target encoding");
- 重新申请空间
zs = zmalloc(sizeof(*zs));
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
- 得到
eptr = ziplist[0]
和sptr = ziplist[1] = ziplist[0]->next
指针
eptr = ziplistIndex(zl,0);
redisAssertWithInfo(NULL,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,zobj,sptr != NULL);
ziplistIndex
: 通过下标index
,得到ziplist[index]
/* Returns an offset to use for iterating with ziplistNext. When the given
* index is negative, the list is traversed back to front. When the list
* doesn't contain an element at the provided index, NULL is returned. */
unsigned char *ziplistIndex(unsigned char *zl, int index) {
unsigned char *p;
unsigned int prevlensize, prevlen = 0;
// 当 index < 0 的时候,反向遍历找到对应位置
// 比如 index = -2, 就是倒数第二个节点
// 注意:倒数第一个节点是 ZIP_END
if (index < 0) {
index = (-index)-1;
// 得到压缩列表的尾节点
p = ZIPLIST_ENTRY_TAIL(zl);
if (p[0] != ZIP_END) {
// 得到前一个节点的长度
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
while (prevlen > 0 && index--) {
p -= prevlen;
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
}
}
// 正向遍历
} else {
// 得到压缩列表的头节点
p = ZIPLIST_ENTRY_HEAD(zl);
while (p[0] != ZIP_END && index--) {
p += zipRawEntryLength(p);
}
}
return (p[0] == ZIP_END || index > 0) ? NULL : p;
}
下面部分内容请结合《redis的设计与实现》第53-56页阅读
-
宏
ZIPLIST_ENTRY_TAIL
: 这里是用zl
首地址加上zltail
,直接指向了尾节点。(第53页) -
宏
ZIPLIST_ENTRY_HEAD
: 直接用zl
首地址加上zlbytes
,zltail
,zllen
的长度。(第53页)
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
- 宏
ZIP_DECODE_PREVLENSIZE
,ZIP_DECODE_PREVLEN
: 先看节点的第一个字节(ptr)[0]
,当它小于ZIP_BIGLEN: 254
时,则(ptr)[0]
直接表示前一个节点的长度previous_entry_length
。否则,后面的四个字节(ptr)[1-4]
来表示前一个节点的长度previous_entry_length
。所以先用ZIP_DECODE_PREVLENSIZE
来得到实际的previous_entry_length
。(第54页) - 这里的
previous_entry_length
主要就是用来反向遍历用的。 - 不同的
previous_entry_length
编码是为了节省空间
/* Decode the number of bytes required to store the length of the previous
* element, from the perspective of the entry pointed to by 'ptr'. */
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do { \
if ((ptr)[0] < ZIP_BIGLEN) { \
(prevlensize) = 1; \
} else { \
(prevlensize) = 5; \
} \
} while(0);
/* Decode the length of the previous element, from the perspective of the entry
* pointed to by 'ptr'. */
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do { \
ZIP_DECODE_PREVLENSIZE(ptr, prevlensize); \
if ((prevlensize) == 1) { \
(prevlen) = (ptr)[0]; \
} else if ((prevlensize) == 5) { \
assert(sizeof((prevlensize)) == 4); \
memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); \
memrev32ifbe(&prevlen); \
} \
} while(0);
zipRawEntryLength
: 得到当前节点的长度
/* Return the total number of bytes used by the entry pointed to by 'p'. */
static unsigned int zipRawEntryLength(unsigned char *p) {
unsigned int prevlensize, encoding, lensize, len;
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
return prevlensize + lensize + len;
}
- 宏
ZIP_ENTRY_ENCODING
: 得到content
的编码encoding
。(第56页) - 宏
ZIP_DECODE_LENGTH
:
/* Extract the encoding from the byte pointed by 'ptr' and set it into
* 'encoding'. */
#define ZIP_ENTRY_ENCODING(ptr, encoding) do { \
(encoding) = (ptr[0]); \
if ((encoding) < ZIP_STR_MASK) (encoding) &= ZIP_STR_MASK; \
} while(0)
/* Decode the length encoded in 'ptr'. The 'encoding' variable will hold the
* entries encoding, the 'lensize' variable will hold the number of bytes
* required to encode the entries length, and the 'len' variable will hold the
* entries length. */
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do { \
ZIP_ENTRY_ENCODING((ptr), (encoding)); \
// 字节数组编码
if ((encoding) < ZIP_STR_MASK) { \
// 00bbbbbb
if ((encoding) == ZIP_STR_06B) { \
(lensize) = 1; \
(len) = (ptr)[0] & 0x3f; \
// 01bbbbbb xxxxxxxx
} else if ((encoding) == ZIP_STR_14B) { \
(lensize) = 2; \
(len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1]; \
// 10______ aaaaaaaa bbbbbbbb cccccccc dddddddd
} else if (encoding == ZIP_STR_32B) { \
(lensize) = 5; \
(len) = ((ptr)[1] << 24) | \
((ptr)[2] << 16) | \
((ptr)[3] << 8) | \
((ptr)[4]); \
} else { \
assert(NULL); \
} \
// 整数编码
} else { \
(lensize) = 1; \
(len) = zipIntSize(encoding); \
} \
} while(0);
/* Return bytes needed to store integer encoded by 'encoding' */
static unsigned int zipIntSize(unsigned char encoding) {
switch(encoding) {
case ZIP_INT_8B: return 1;
case ZIP_INT_16B: return 2;
case ZIP_INT_24B: return 3;
case ZIP_INT_32B: return 4;
case ZIP_INT_64B: return 8;
default: return 0; /* 4 bit immediate */
}
assert(NULL);
return 0;
}
ziplistNext
: 把指针指向下一个节点
/* Return pointer to next entry in ziplist.
*
* zl is the pointer to the ziplist
* p is the pointer to the current element
*
* The element after 'p' is returned, otherwise NULL if we are at the end. */
unsigned char *ziplistNext(unsigned char *zl, unsigned char *p) {
((void) zl);
/* "p" could be equal to ZIP_END, caused by ziplistDelete,
* and we should return NULL. Otherwise, we should return NULL
* when the *next* element is ZIP_END (there is no next entry). */
if (p[0] == ZIP_END) {
return NULL;
}
p += zipRawEntryLength(p);
if (p[0] == ZIP_END) {
return NULL;
}
return p;
}
- 取出
score
和节点的值ele
score
的位置在ele
的后面。所以zzlGetScore
传递的是后面的节点sptr
。
while (eptr != NULL) {
score = zzlGetScore(sptr);
redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
zzlGetScore
其实也是直接调用的ziplistGet
。只不过调用了之后针对score
的double
类型进行了一些处理。
double zzlGetScore(unsigned char *sptr) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
char buf[128];
double score;
redisAssert(sptr != NULL);
redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
if (vstr) {
memcpy(buf,vstr,vlen);
buf[vlen] = '\0';
score = strtod(buf,NULL);
} else {
score = vlong;
}
return score;
}
而ziplistGet
就是把一个节点中的值(可能是字符串,也可能是整数)取出来,如果是字符串则把字符串存在sstr
中,字符串长度存在slen
中;如果是整数则把值存在sval
中。
/* Get entry pointed to by 'p' and store in either '*sstr' or 'sval' depending
* on the encoding of the entry. '*sstr' is always set to NULL to be able
* to find out whether the string pointer or the integer value was set.
* Return 0 if 'p' points to the end of the ziplist, 1 otherwise. */
unsigned int ziplistGet(unsigned char *p, unsigned char **sstr, unsigned int *slen, long long *sval) {
zlentry entry;
if (p == NULL || p[0] == ZIP_END) return 0;
if (sstr) *sstr = NULL;
// 把节点相关的数据提取到 zlentry 数据结构中,方便操作
// 其中有 encoding, 各种长度
entry = zipEntry(p);
if (ZIP_IS_STR(entry.encoding)) {
if (sstr) {
*slen = entry.len;
*sstr = p+entry.headersize;
}
} else {
if (sval) {
*sval = zipLoadInteger(p+entry.headersize,entry.encoding);
}
}
return 1;
}
把取出来的值,重新再转换成 sds
类型的 ele
if (vstr == NULL)
ele = createStringObjectFromLongLong(vlong);
else
ele = createStringObject((char*)vstr,vlen);
- 插入到跳表中
跳表的具体内容可参考Redis(2)——跳跃表
/* Has incremented refcount since it was just created. */
node = zslInsert(zs->zsl,score,ele);
redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
// 注意这里的 zzlNext 和 ziplistNext 是不同的
// zzlNext 是给有序集合用的,因为有序集合每一个值都伴随一个 score
// 相当于两个压缩列表节点才是一个有序集合的节点
zzlNext(zl,&eptr,&sptr);
}
- 释放空间,调整编码
zfree(zobj->ptr);
zobj->ptr = zs;
zobj->encoding = REDIS_ENCODING_SKIPLIST;
2.2 跳表转压缩列表
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
unsigned char *zl = ziplistNew();
if (encoding != REDIS_ENCODING_ZIPLIST)
redisPanic("Unknown target encoding");
释放掉部分跳表的空间
/* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the ziplist. */
zs = zobj->ptr;
dictRelease(zs->dict);
node = zs->zsl->header->level[0].forward;
zfree(zs->zsl->header);
zfree(zs->zsl);
循环把节点插入到压缩列表中
while (node) {
ele = getDecodedObject(node->obj);
// 第二个参数为 NULL 时表示压入表尾
// 因为之前跳表中的数据已经是有序的了
zl = zzlInsertAt(zl,NULL,ele,node->score);
decrRefCount(ele);
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zs);
zobj->ptr = zl;
zobj->encoding = REDIS_ENCODING_ZIPLIST;
} else {
redisPanic("Unknown sorted set encoding");
}
}
TODO: 最后这里压缩列表的
insert
也挺复杂的,以后再补吧。。。
3. 为什么有了ziplistIndex
还需要ziplistNext
因为ziplistIndex
每次都需要从头开始循环,一直循环到ziplist[index]
。ziplistNext
可以避免这种情况。