redis扩容缩容渐进式rehash

在Redis中,键值对(Key-Value Pair)存储方式是由字典(Dict)保存的,而字典底层是通过哈希表来实现的。通过哈希表中的节点保存字典中的键值对。我们知道当HashMap中由于Hash冲突(负载因子)超过某个阈值时,出于链表性能的考虑,会进行Resize的操作。Redis也一样。

在redis的具体实现中,使用了一种叫做渐进式哈希(rehashing)的机制来提高字典的缩放效率,避免 rehash 对服务器性能造成影响,渐进式 rehash 的好处在于它采取分而治之的方式, 将 rehash 键值对所需的计算工作均摊到对字典的每个添加、删除、查找和更新操作上, 从而避免了集中式 rehash 而带来的庞大计算量。

image-20221203172758386

1、dict的数据结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/* 哈希表节点 */
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
} v;
// 指向下个哈希表节点,形成链表
struct dictEntry *next;
} dictEntry;

/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
/* 哈希表
* 每个字典都使用两个哈希表,以实现渐进式 rehash 。
*/
typedef struct dictht {
// 哈希表数组
// 可以看作是:一个哈希表数组,数组的每个项是entry链表的头结点(链地址法解决哈希冲突)
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小掩码,用于计算索引值
// 总是等于 size - 1
unsigned long sizemask;
// 该哈希表已有节点的数量
unsigned long used;
} dictht;
/* 字典 */
typedef struct dict {
// 类型特定函数
dictType *type;
// 私有数据
void *privdata;
// 哈希表
dictht ht[2];
// rehash 索引
// 当 rehash 不在进行时,值为 -1
int rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 目前正在运行的安全迭代器的数量
int iterators; /* number of iterators currently running */
} dict;
  • dictht::table:哈希表内部的table结构使用了链地址法来解决哈希冲突,刚开始看的时候我很奇怪,这怎么是个二维数组?这其实是一个指向数组的指针,数组中的每一项都是entry链表的头结点。
  • dictht ht[2]:在dict的内部,维护了两张哈希表,作用等同于是一对滚动数组,一张表是旧表,一张表是新表,当hashtable的大小需要动态改变的时候,旧表中的元素就往新开辟的新表中迁移,当下一次变动大小,当前的新表又变成了旧表,以此达到资源的复用和效率的提升。
  • rehashidx:因为是渐进式的哈希,数据的迁移并不是一步完成的,所以需要有一个索引来指示当前的rehash进度。当rehashidx为-1时,代表没有哈希操作。

2、缩容 扩容

随着redis的操作的不断执行,哈希表保存的键值会逐渐地增多或者减少,为了让哈希表的负载因子(ratio)维持在一个合理的范围之内,当哈希表保存的键值对数量太多或者太少时,程序需要对哈希表的大小进行相应的扩展或者收缩。

ratio = ht[0].used / ht[0].size

比如,hash表的size为4,如果已经插入了4个k-v的话,则ratio为1。

redis的默认负载因子为1,负载因子最大可以达到5(持久化的时候,需要 fork 操作,这个时候不会分配内存,所以 redis 源码中有判断,如果大于数据长度的5倍(5 * used),则马上扩容)。

扩展和收缩哈希表的工作可以执行rehash(重新散列)操作来完成。Redis对字典的哈希表执行rehash的策略

缩容情况

如果ratio小于0.1,则会对hash表进行收缩操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#define HASHTABLE_MIN_FILL        10      /* Minimal hash table fill 10% */

/* Expand or create the hash table,
* when malloc_failed is non-NULL, it'll avoid panic if malloc fails (in which case it'll be set to 1).
* Returns DICT_OK if expand was performed, and DICT_ERR if skipped. */
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;

/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;

dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);

/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;

/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
if (malloc_failed) {
n.table = ztrycalloc(realsize*sizeof(dictEntry*));
*malloc_failed = n.table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
n.table = zcalloc(realsize*sizeof(dictEntry*));

n.used = 0;

/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}

/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}

/* return DICT_ERR if expand was not performed */
int dictExpand(dict *d, unsigned long size) {
return _dictExpand(d, size, NULL);
}

/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
unsigned long minimal;

if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal);
}

int htNeedsResize(dict *dict) {
long long size, used;

size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}

/*
当HashTable的使用率为10%的时候,开始缩容,进行rehash操作。
*/
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
if (htNeedsResize(server.db[dbid].dict))
dictResize(server.db[dbid].dict);
if (htNeedsResize(server.db[dbid].expires))
dictResize(server.db[dbid].expires);
}

扩容情况

  • 服务器目前没有在执行BGSAVE命令或者BGREWRITEAOF命令,并且哈希表的负载因子大于等于1,则哈希表扩容,扩容大小为当前 ht[0].used * 2
  • 服务器目前正在执行BGSAVE命令或者BGREWRITEAOF命令,并且哈希表的负载因子大于等于5,则扩容hash表,并且扩容为当前 ht[0].used * 2
  • 上面的说法稍微有点偏颇,实际上虽然传进去的参数是这样,比如你的 ht[0].used 为5,传进去就是10,但是扩容会是 2^4 = 16即实际扩容量为 2^n
1
2
3
4
5
6
7
8
9
10
11
12
/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size)
{
unsigned long i = DICT_HT_INITIAL_SIZE;

if (size >= LONG_MAX) return LONG_MAX + 1LU;
while(1) {
if (i >= size)
return i;
i *= 2; //体现了扩容的大小不是传入的size(也就是ht[0].used*2),而是距离这个size最近的2^n。
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#define dictIsRehashing(d) ((d)->rehashidx != -1)

/* Because we may need to allocate huge memory chunk at once when dict
* expands, we will check this allocation is allowed or not if the dict
* type has expandAllowed member function. */
static int dictTypeExpandAllowed(dict *d) {
if (d->type->expandAllowed == NULL) return 1;
return d->type->expandAllowed(
_dictNextPower(d->ht[0].used + 1) * sizeof(dictEntry*),
(double)d->ht[0].used / d->ht[0].size);
}

/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht[0].used + 1);
}
return DICT_OK;
}

扩容的步骤

  1. 为字典 ht[1] 哈希表分配合适的空间;
  2. ht[0] 中所有的键值对rehash到 ht[1] :rehash 指的是重新计算键的哈希值和索引值, 然后将键值对 放置到 ht[1] 哈希表的指定位置上;
  3. ht[0] 包含的所有键值对都迁移到了 ht[1] 之后 (ht[0] 变为空表), 释放 ht[0] , 将 ht[1] 设置 为 ht[0] , 并在 ht[1] 新创建⼀个空⽩哈希表, 为下⼀次 rehash 做准备。

3、渐进式rehash

扩展或收缩哈希表需要将 ht[0] 里面的所有键值对rehash到 ht[1] 里面,但是,这个rehash动作并不是一次性、集中式地完成的,而是分多次、渐进式地完成的。

这样做的原因在于,如果 ht[0] 里保存着四个键值对,那么服务器可以在瞬间就将这些键值对全部rehash到 ht[1] ;但是,如果hash表里保存的键值对不是四个,而是四百万、四千万甚至四亿个键值对,那么要一次性将这些键值对全部rehash到 ht[1] 的话,庞大的计算量可能会导致服务器在一段时间内停止服务。

因此,为了避免rehash对服务器性能造成影响,服务器不是一次性将对 ht[0] 里面所有键值对全部rehash到 ht[1] ,而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地rehash到 ht[1]

渐进式rehash的详细步骤

  1. ht[1] 分配空间,让字典同时持有 ht[0]ht[1] 两个哈希表。
  2. 在字典中维持一个索引计数器变量 rehashidx,并将它的指设置为0,表示rehash工作正式开始
  3. 在rehash进行期间,每次对字典执行添加、删除、查找或者更新操作时,程序除了执行指定的操作以外,还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对rehash到 ht[1]
  4. 随着字典操作的不断执行,最终在某个时间点, ht[0] 的所有键值对都会被rehash至 ht[1] ,这时程序rehashidx 属性设置为 -1,表示rehash已经操作完成

渐进式rehash的好处在于它采取分而治之的方式,将rehash键值对所需的计算工作均摊到对字典的每个crud操作上,甚至是后台启动一个定时器,每次时间循环时只工作一毫秒,从而避免了集中式rehash而带来的庞大计算量。

我们以dictAddRaw为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#define dictIsRehashing(d) ((d)->rehashidx != -1)

/* Low level add or find:
* This function adds the entry but instead of setting a value returns the
* dictEntry structure to the user, that will make sure to fill the value
* field as they wish.
*
* This function is also directly exposed to the user API to be called
* mainly in order to store non-pointers inside the hash value, example:
*
* entry = dictAddRaw(dict,mykey,NULL);
* if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
*
* Return values:
*
* If key already exists NULL is returned, and "*existing" is populated
* with the existing entry if existing is not NULL.
*
* If key was added, the hash entry is returned to be manipulated by the caller.
*/
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;

if (dictIsRehashing(d)) _dictRehashStep(d); //如果rehash操作未完成,进行一次rehash操作

/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;

/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry)); //为ht[1]开辟新的空间
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;

/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}

每次rehash都搬运多少个元素

可以看到在上面添加过程中,会先判断一下还有没有有需要执行的rehash操作,如果有,那就顺带进行rehash操作,那每次rehash都搬运多少个元素呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/* This function performs just a step of rehashing, and only if hashing has
* not been paused for our hash table. When we have iterators in the
* middle of a rehashing we can't mess with the two hash tables otherwise
* some element can be missed or duplicated.
*
* This function is called by common lookup or update operations in the
* dictionary so that the hash table automatically migrates from H1 to H2
* while it is actively used. */
static void _dictRehashStep(dict *d) {
if (d->pauserehash == 0) dictRehash(d,1);
}

/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table, however
* since part of the hash table may be composed of empty spaces, it is not
* guaranteed that this function will rehash even a single bucket, since it
* will visit at max N*10 empty buckets in total, otherwise the amount of
* work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}

/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table); //当ht[0]的元素全部rehash到ht[1]的时候释放ht[0]的空间
d->ht[0] = d->ht[1]; //将原来的ht[1]设置为ht[0]
_dictReset(&d->ht[1]); //ht[1] 设置为NULL
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

根据代码显然我们可以看出每次最多rehash10个元素。