前言

这篇文章是上一篇SDS的后续,也是讲的Redis的底层数据结构。SDS是String的底层,这个次讲的是Zset(sorted set)的底层实现,跳表SkipList。SkipList是一种很好的数据结构。他的查找速度是O(logn)的时间复杂度。而且存储空间开销也还好,可以接受。简单点说SkipList就是一种通过关键节点(类似索引的方式)。将数据通过多层来快速划分和查找的过程。而且这个跳表好不用像树一样要经常的翻转。

SkipList

SkipList: 跳表即跳跃链表。他的查找和插入时间复杂度是O(logn),优于数组O(n)插入和查找的时间复杂度。跳表实现快速查询时通过多层次链表做索引,然后通过不同层次节点大小来快速判断底层数据在那个范围存储,概率性跳过不需要查找的元素,来实现快速定位查找。

跳表时个有层次的数据结构,它越上层的链表越稀疏(类似漏斗,越稀疏过滤元素越多),对比队列中的值,然后判断跳跃范围,通过层层跳跃找到最后的搜索元素。

跳表的最底层时存储所有的数据有序的链表,它的每个上层都是最底层的索引。层中元素i出现在索引层,也就是当前层的上一层的概率时p.(P值一般时1/2或者1/4)。平均每个元素出现在列表的平均数$\frac{1}{1-p}$。我们来解释一下这个平均次数。
首先肯定大于1,因为最底层是n个。,第二层是$\frac{n}{p}$,第三层是$\frac{n}{p^2}$,第四层是$\frac{n}{p^3}$。所以最后的公式是: n是元素总个数。
$\frac{n+\frac{n}{p}+\frac{n}{p^2}+….\frac{n}{p^m}}{n}$
这是一个约掉n,就变成$1+\frac{1}{p}+\frac{1}{p^2}+…..\frac{1}{p^m}$ 这个是等比数列求和最后根据等比求和公式Sm=$\frac{a*(1-q^m)}{1-q}$当m趋近无穷大的时候($\frac{1}{q^m}$趋近无穷小)。所以Sm=$\frac{a*1}{1-q}$然后首相a=1所以最后平均数就是$\frac{1}{1-p}$
如果与不明白等比数列求和的请自行百度,老铁这个是高中数学。(大学数学忘完了)而最高层的元素(通常是在跳跃列表前端的一个特殊的头元素)在$log_\frac{1}{p}n$个列表中出现。
也就是最头上的那个。

SkipList查找过程

上面说了那么多,我们来看下跳表的查找数据的过程。跳表的查找数据是在最上层然后一层一层找到最底层。当然如果直接找到了节点的值可以直接返回。下面是跳表查找的过程,根据在跳表中查找路径。

首先数据是10,所以高度是4层,这里p=1/2。查找,数据为12的节点,根据头节点来到第4层数据6节点。12大于6,走向后置节点,发现后置节点指向null,向下层查找数据;第3层数据6节点的后置节点是25大于12,向下层查找数据;第二层数据6的节点的后置节点值是9小于12,向后查找第三层数据9的后置节点是17大于12,向下层查找;在第1层数据9节点的后置节点等于12,查找结束。

从最上层查找,你发现每层最多查两边(p=1/2),如果超过2个节点说明上你这个不是有序的。所以时间复杂度

O(n)=$2*log_2n$常数2可以省去等于O(logn),也就是和红黑树,AVL树查找性能差不多,不过这个正常的情况下,最差的情况下是SkipList退化成链表是O(n)。

SkipList的插入过程

插入其实和查找差不多,因为SkipList是一个有序的链表,所以每次的插入也要找到对应的位置,然后插入到底层列表中,然后在变动上层的索引。

插入新元素80,先与第一层头元素对比,对比30,大于30元素节点,然后向后查找;30元素节点的后置为null,向下查找;查找第二层30元素节点的后节点元素为50,与50元素节点比较80大于50,向后查找;50元素节点的后置节点是null,向下查找;查找第三层,第三层50元素的后置节点是70,与70元素比较80大于70,向后查找70元素节点是null;,向下查找;查找第四层,第四层70节点的后置节点90,90大于80,应该是插在70元素节点和90元素节点中建,向下查找是不是存储节点,第四层是最后一层,然后插入70点的后续。到此为止是插入节点,后续是调整上层索引。根据p为1/2的限制。调整个数原来是6节点,上层应该是3,现在是7个需要在最后80和90元素中随机建立第三层索引,然后向第二层查找,发现第二层符合,不需要建立索引,插入结束。后面45插入我就不详细说了,大家估计也可看出来,反正有动图。

从上面插入的过程,我们能看出来,插入大概分几个过程。

1、查找对比,找到要插入元素的区间,保证大小顺序。

2、如果是最底层直接插入,否则根据链接找到底层插入数据。

3、插入数据完之后逐级区对比,创建各个层次的索引。

上面简单额讲述了链表中的一些概念了流程,真正Redis在使用SkipList作为链表存储的结构也加入了一些自己需要的数据元素。下面我就看一下Redis是怎么实现跳跃链表。

Redis的SkipList的实现

Redis的对SkipList的声明是在server.h的头文件里。然后对SkipList的使用是在t_zset.c的文件里,即对SkipLis操作进行封装,然后抽象成zSet的方法。使用,下面我们来看一下Redis是怎么定义一个SkipList的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {// 跳表数据节点结构的定义
sds ele; //存放着该节点对于的成员对象,一般指向一个sds结构
double score; //分数
struct zskiplistNode *backward;//后置指针,单向链表。相当于before
struct zskiplistLevel {
struct zskiplistNode *forward; //层次节点的前进指针,指向跳表,后面的元素,相当于next
unsigned long span; //跨度
} level[];//这个是个数组
} zskiplistNode;

typedef struct zskiplist { //跳表
struct zskiplistNode *header, *tail; //双向链表,头部尾部指针
unsigned long length; //链表长度
int level; //表中层数最大的节点层数
} zskiplist;

typedef struct zset { //zet的数据结构
dict *dict;
zskiplist *zsl;
} zset;

  • ele 存放着该节点对于的成员对象,一般指向一个sds结构
  • score 表示该节点你的分值,跳跃表按照分值大小进行顺序排列
  • backward 指向跳跃表的前一个节点
  • level[] 这个属性至关重要,是跳跃表的核心所在,初始化一个跳跃表节点的时候会为其随机生成一个层大小,每个节点的每一层以链表的形式连接起来。
  • span跨度,只非底层元素的的跳跃范围(即从两个相邻节点间隔了多少节点。譬如 level 1,-1 的 span 就是 1;level 2,-1 的 span 为 2。 因为新出入数据的层数是随机的)。

上面数据结构的大致实现方式。从图中我们能看出来,SkipList的最大高度是32层,肯定有人会问为什么是32层。32层能装多少数据。上面的公式我们可以根据p的大小就可以推算出数据的量。32=$log_\frac{1}{p}n$—>n=$\frac{1}{p}^32$如果p=1/2,那么n=$2^32$如果p=1/4,那么n=$2^64$;这些元素基本上就够用了。除了表头节点,其他节点的层数都是随机生成,1~32之间。

1
2
3
#redis 里跳表的参数   server.h
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */

常见的Redis操作skipList的函数(redis如果操作这个函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
//创建一个新的跳表
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

zsl = zmalloc(sizeof(*zsl)); //申请空间
zsl->level = 1; //设置层数为1
zsl->length = 0;//初始化长度为0
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//创建头节点,指定节点最大层数32,分数为0,元素为null
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {//循环32次指定32层的头节点
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;//头节点和后置节点指向bull
zsl->tail = NULL; //尾部节点指向null
return zsl; //返回指针
}

//创建一个跳表节点
/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
//申请空间,zskiplistLevel这个server.h有声明主要是存的是跨度和后续指针
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score; //赋值分数
zn->ele = ele; //赋值元素值
return zn; //返回节点指针(引用)
}

//创建一个节点,同时插入一个元素,返回节点信息。要插入的跳表是zsl
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// update 是插入节点所在位置的前一个节点。我们在学习链表插入的时候,需要找到插入
// 位置的前一个节点。因为在跳表中一个节点是有多个前驱指针的,所以这里需要保存的
// 是多个节点,而不是一个节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;//x要插入的值,update是插入值的后驱节点数组
unsigned int rank[ZSKIPLIST_MAXLEVEL]; //用于存储排序。后面计算跨度
int i, level;

serverAssert(!isnan(score)); //校验分数信息
x = zsl->header; //拿到头节点信息
// 遍历skiplist 中所有的层,找到数据将要插入的位置,并保存在update 中。遍历zsl
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the */
//每次使用的都是后继值去比较
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
//逐步循环,找到大于的大于节点跳出,大于节点的上一次循环的后继值,就是这个节点的前驱节点
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;//记录了新数据项的前驱,
}
//解决重复插入问题,在表里相同分数和相同元素不会存在。下面是翻译
//自从允许重复分数存在,重新插入相同的元素也不会发生。我们假定元素是不存在跳表里,每次插入会调用zsInsert()方法去测试hash表里是否存在这个值
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();//生成插入元素的层级
if (level > zsl->level) {// random level 比原有的zsl->level 大,需要增加skiplist 的level
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);//根据获取的level创建插入节点
for (i = 0; i < level; i++) {
// 新节点项插到update[i] 的后面
x->level[i].forward = update[i]->level[i].forward;//x前驱节点的后继指针,赋予x节点的后继指针
update[i]->level[i].forward = x;//x的前驱节点的后继指针赋值x

/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);//计算x节点的跨度
update[i]->level[i].span = (rank[0] - rank[i]) + 1;//x的前置节点的跨度+1,插入一个值
}
//增加跨度,对于没有更新的几点,也就是影响节点的跨度。也是+1(只插入了一个值)
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 调整新节点的后继指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++; // 调整skiplist 的长度
return x;
}

//删除一个元素的匹配(分数)节点元素,在跳表上,如果存在就返回要删除的元素节点,否则返回0。通过**node传递删除节点的信息。zsl跳表信息,score和ele是删除元素的信息
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; //update删除的前驱节点数组,x删除的节点
int i;

x = zsl->header;//拿到头节点信息,开始遍历,根据元素获取前驱节点的信息。
for (i = zsl->level-1; i >= 0; i--) {//找到分数比他大,或者分数相同但是排序在x查找值的后面的节点。跳出循环最后一次的元素就是删除元素前驱节点
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward; //获取删除的前驱节点,用后继节点指针比较所以赋值也是一样
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward; //获取最底层的前驱节点指针
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update); //删除节点
if (!node)
zslFreeNode(x); //删除后释放内存,毕竟没有内存回收,如果是null的话
else
*node = x;//将删除的元素信息返回
return 1;
}
return 0; /* not found */
}

// x 是需要删除的节点 update 是每一个层x 的后驱数组
/* Internal function used by zslDelete, zslDeleteRangeByScore and
* zslDeleteRangeByRank. */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
// 调整span 和forward 指针
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) { //如果前驱的后继指针有x,改变指针
update[i]->level[i].span += x->level[i].span - 1;//跨度减一
update[i]->level[i].forward = x->level[i].forward;//将x的前驱后继指针,赋值x节点的后继指针。
} else {
update[i]->level[i].span -= 1;//如果没有跨度减一
}
}
//调整后继指针
if (x->level[0].forward) {
//
x->level[0].forward->backward = x->backward;
} else {//如果没有直接指向尾部
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;如果有需要,层数也减一
zsl->length--;//长度减一
}

//生成一个随机的层次节点,对于一个新的元素节点。它返回额是1与最大层级的中间值、类似于幂律分布(幂律分布我也不太懂)。
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

Redis为什么选择了跳表没有选择树

1、在做范围插值的时候,平衡树(AVL)比跳表要复杂(更没有B+树方便,b+树可以把说有节点串联起来做范围查找很方便)。在平衡树上我们查找到最小的节点,然后通过中序查找到不超过大小的节点。如果想要实现要改造,这里面中序遍历不容易实现。不过如果是跳表就很容易实现,它找到最小节点后,直接来到最底层然后逐个对比,然后找到最大值,获取完整的数据。(如果使用zrange 0,-1这种直接把最底层的返回,不用去中序遍历。)

2、插入和删除数据后的平衡复杂度;平衡树插入和删除节点的时候要查找,而且还要进行树的反转(反转的次数和高度有关系,越高反转次数越多,IO就越频繁,性能越差),跳表是没有反转的,它只需要查找后,在临近节点插入,同时构建索引就可以了。

3、从内存占用上来说,skiplist比平衡树更灵活一些。一般来说,平衡树每个节点包含2个指针(分别指向左右子树),而skiplist每个节点包含的指针数目平均为1/(1-p),具体取决于参数p的大小。如果像Redis里的实现一样,取p=1/4,那么平均每个节点包含1.33个指针,比平衡树更有优势。

4、从算法实现的角度来讲,跳表也比树更容易实现。在查找单个key的时候两种数据结构都是的时间复杂度都是O(logn)时间差距不大。

Redis中的跳表与传统跳表做了那些优化

1、Redis的最底层链表是个双向链表,而不是单向链表,这样方便正序和倒序两种查找方式。

2、score(分数)是可以重复的,即跳表节点的key。经典链表是不允许重复的。

3、在比较时,不仅比较分数(相当于skiplist的key),还比较数据本身。在Redis的skiplist实现中,数据本身的内容唯一标识这份数据,而不是由key来唯一标识。另外,当多个元素分数相同的时候,还需要根据数据内容来进字典排序。

4、redis的skipList可以很方便计算出每个元素的rank(排名)。