整数集合 IntSet

2021-07-24 Redis Redis 原理

整数集合「IntSet」是一个有序的、存储整型数据的结构。当 Redis 集合类型的元素都是整数并且都处在 64 位有符号整数范围之内时,使用该结构体存储。

在以下两种情况时,底层编码会发生转换:

  1. 元素个数超过一定数量之后(默认值为 512)会转换为 hashtable,配置项为 set-max-intset-entries;
  2. 增加非整型变量时会转换为 hashtable。

# 一、数据存储

整数集合在 Redis 中可以保存 int16_t、int32_t、int64_t 类型的整型数据,并且可以保证集合中不会出现重复数据。每个整数集合使用一个 intset 类型的数据结构表示,定义如下。

typedef struct intset {
    uint32_t encoding;  // 编码类型
    uint32_t length;  // 元素个数
    int8_t contents[];  // 柔性数组,根据 encoding 字段决定几个字节表示一个元素
} intset;
1
2
3
4
5

encoding 的取值有以下几种:

  1. INTSET_ENC_INT16:当元素值都位于 INT16_MIN 和 INT16_MAX 之间时使用。该编码方式下每个元素占用 2 个字节;
  2. INTSET_ENC_INT32:当元素值位于 INT16_MAX 到 INT32_MAX 或者 INT32_MIN 到 INT16_MIN 之间时使用。该编码方式下每个元素占用 4 个字节;
  3. INTSET_ENC_INT64:当元素值位于 INT32_MAX 到 INT64_MAX 或者 INT64_MIN 到 INT32_MIN 之间时使用。该编码方式下每个元素占用 8 个字节。

具体如下图所示:

intset 结构体会根据待插入的值决定是否需要进行扩容操作。扩容会修改 encoding 字段,而 encoding 字段决定了一个元素在 contents 柔性数组中占用几个字节。所以当修改 encoding 字段之后,intset 中原来的元素也需要在 contents 中进行相应的扩展。

注意

根据上图能得到一个简单的结论,只要待插入的值导致了扩容,则该值在待插入的 intset 中不是最大值就是最小值。这个结论在下文插入元素时会用到。

encoding 字段在 Redis 中使用宏来表示,其定义如下:

#define INTSET_ENC_INT16 (sizeof(int16_t))  // 2
#define INTSET_ENC_INT32 (sizeof(int32_t))  // 4
#define INTSET_ENC_INT64 (sizeof(int64_t))  // 8
1
2
3

因为 encoding 字段实际取值为 2、4、8,所以 encoding 字段可以直接比较大小。当待插入值的 encoding 大于待插入 intset 的 encoding 时,说明需要进行扩容操作,并且也能表明该待插入值在该 intset 中肯定不存在。

# 二、基本操作

# 2.1 查询元素

查询元素的入口函数是 intsetFind,该函数首先进行一些防御性判断,如果没有通过判断则直接返回。

intset 是按从小到大有序排列的,所以通过防御性判断之后使用二分法进行元素的查找,以下是 Redis 中 intset 查询代码的实现:

/* Determine whether a value belongs to this set */
uint8_t intsetFind(intset *is, int64_t value) {
    // 获取查找元素的编码方式
    uint8_t valenc = _intsetValueEncoding(value);
    // 满足编码方式再进行搜索
    return valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,NULL);
}

/* Search for the position of "value". Return 1 when the value was found and
 * sets "pos" to the position of the value within the intset. Return 0 when
 * the value is not present in the intset and sets "pos" to the position
 * where "value" can be inserted. */
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
    if (intrev32ifbe(is->length) == 0) {  //如果 intset 中没有元素,直接返回 0
        if (pos) *pos = 0;
        return 0;
    } else {  // 如果元素大于最大值或者小于最小值,直接返回 0
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

    while(max >= min) {  // 二分查找
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {  // 找到返回 1,未找到返回 0
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

可以看到,在实际查询前做了一些防御性检查,具体为:

  1. 待查找的值需要的 encoding 不能超过该 intset 的 encoding
  2. intset 不能为空,若有值则待查找值必须介于该 intset 的最大值与最小值之间;

最后,在通过检查后,使用二分查找返回查询结果。

# 2.2 添加元素

添加元素的入口函数是 intsetAdd,该函数根据插入值的 encoding 和当前 intset 的 encoding 决定是直接插入还是先进行 intset 升级再执行插入(升级插入的函数为 intsetUpgradeAndAdd),如下是 Redis 中 intset 添加元素的代码实现:

/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    uint8_t valenc = _intsetValueEncoding(value);  // 获取添加元素的编码值
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */
    if (valenc > intrev32ifbe(is->encoding)) {  // 如果大于当前 intset 编码,则进行升级插入
        /* This always succeeds, so we don't need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */
        if (intsetSearch(is,value,&pos)) {  // 否则先进行查重,如果已存在,则插入失败,直接返回
            if (success) *success = 0;
            return is;
        }

        // 如果元素不存在,则插入元素
        is = intsetResize(is,intrev32ifbe(is->length)+1);  // 在集合中为元素分配空间
        // 如果插入元素在 intset 中间位置,调用 intsetMoveTail 给元素挪出空间
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    _intsetSet(is,pos,value);  // 保存元素
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);  // 修改 intset 长度
    return is;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

具体过程如下:

  1. 判断是否能按现有的编码方式插入,若不能,则调用升级插入函数 intsetUpgradeAndAdd 并返回;
  2. 调用搜索函数,若找到,则直接返回(集合中不能有重复的值);
  3. 调用 intsetResize 为插入元素分配空间;
  4. 调用 intsetMoveTail 为插入元素挪出位置;
  5. 插入新值并将 intset 的长度加 1。

注意

intsetMoveTail 函数中使用的是 memmove 函数,而非 memcpy 函数。

  • memcpy 函数的目的地址和源地址不能有重叠,否则会发生数据覆盖;
  • memmove 地址之间可以有重叠,其实现原理为先将源地址数据拷贝到一个临时的缓冲区中,然后再从缓冲区中逐字节拷贝到目的地址。

以上是不升级插入元素的过程。当 intsetAdd 函数判断当前 encoding 不能存放需要添加的元素时,会调用 intsetUpgradeAndAdd 函数以先升级当前的 encoding,并且按新 encoding 重新存储现有数据,然后将新的元素添加进去。代码流程如下:

/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(is->length);

    // 如果待插入元素小于 0,说明需要插入到 intset 的头部位置
    // 如果待插入元素大于 0,说明需要插入到 intset 的尾部位置
    int prepend = value < 0 ? 1 : 0;

    /* First set new encoding and resize */
    is->encoding = intrev32ifbe(newenc);  // 更新集合的编码方式
    is = intsetResize(is,intrev32ifbe(is->length)+1);  // 根据新编码对集合(的底层数组)进行空间调整

    /* Upgrade back-to-front so we don't overwrite values.
     * Note that the "prepend" variable is used to make sure we have an empty
     * space at either the beginning or the end of the intset. */
    // 根据集合原来的编码方式,从底层数组中取出集合元素
    // 然后再将元素以新编码的方式添加到集合中
    // 当完成了这个步骤之后,集合中所有原有的元素就完成了从旧编码到新编码的转换
    // 因为新分配的空间都放在数组的后端,所以程序先从后端向前端移动元素
    // 举个例子,假设原来有 curenc 编码的三个元素,它们在数组中排列如下:
    // | x | y | z | 
    // 当程序对数组进行重分配之后,数组就被扩容了(符号 ? 表示未使用的内存):
    // | x | y | z | ? |   ?   |   ?   |
    // 这时程序从数组后端开始,重新插入元素:
    // | x | y | z | ? |   z   |   ?   |
    // | x | y |   y   |   z   |   ?   |
    // |   x   |   y   |   z   |   ?   |
    // 最后,程序可以将新元素添加到最后 ? 号标示的位置中:
    // |   x   |   y   |   z   |  new  |
    // 上面演示的是新元素比原来的所有元素都大的情况,也即是 prepend == 0
    // 当新元素比原来的所有元素都小时(prepend == 1),调整的过程如下:
    // | x | y | z | ? |   ?   |   ?   |
    // | x | y | z | ? |   ?   |   z   |
    // | x | y | z | ? |   y   |   z   |
    // | x | y |   x   |   y   |   z   |
    // 当添加新值时,原本的 | x | y | 的数据将被新值代替
    // |  new  |   x   |   y   |   z   |
    // T = O(N)
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    /* Set the value at the beginning or the end. */
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);  // 修改 intset 长度
    return is;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

具体过程如下:

  1. 根据新的编码方式调用 intsetResize 重新申请空间;
  2. 移动并扩容原来的元素。注意扩容原来的元素时,按照从后往前的顺序依次扩容,从而避免数据被覆盖;
    • 如果待插入值是正数,则该值是最大值,在最后位置;
    • 如果待插入值是负数,则该值为最小值,在开始位置;
  3. 根据新插入值是正数还是负数,将值插入相应的位置;
  4. 插入新值并将 intset 的长度字段 length 加 1。

# 2.3 删除元素

intset 删除元素的入口函数是 intsetRemove,该函数查找需要删除的元素然后通过内存地址的移动直接将该元素覆盖掉。删除元素的代码流程如下:

/* Delete integer from intset */
intset *intsetRemove(intset *is, int64_t value, int *success) {
    uint8_t valenc = _intsetValueEncoding(value);  // 获取待删除元素编码
    uint32_t pos;
    if (success) *success = 0;

    // 待删除元素编码必须小于等于 intset 编码并且查找到该元素,才会执行删除操作
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
        uint32_t len = intrev32ifbe(is->length);

        /* We know we can delete */
        if (success) *success = 1;

        /* Overwrite value with tail and update length */
        // 如果待删除元素不在 intset 末尾,则调用 intsetMoveTail 直接覆盖掉该元素
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
        // 如果待删除元素位于 intset 末尾,则 intset 收缩内存后直接将其丢弃
        is = intsetResize(is,len-1);
        is->length = intrev32ifbe(len-1);  // intset 长度减 1
    }
    return is;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

具体过程如下:

  1. 首先判断编码是否小于等于当前编码,若不是,直接返回;
  2. 调用 intsetSearch 查找该值是否存在,不存在则直接返回;存在则获取该值所在位置 position;
  3. 通过 intsetMoveTailintsetResize 将元素覆盖或丢弃;
  4. 最后将 intset 长度减 1。

# 三、总结

intset 用于Redis 中集合类型的数据。当集合元素都是整型并且元素不多时使用 intset 保存,并且元素按从小到大顺序保存。本文介绍了 intset 的存储结构,以及 intset 添加、查找元素等常用方法。

# 四、参考资料

  • 《Redis 5 设计与源码分析》– 陈雷
  • 《Redis 设计与实现》– 黄健宏
Last Updated: 2023-01-28 4:31:25