Redis常用數據類型有字元串String、字典disct、列表List、集合Set、有序集合SortedSet,List常用於獲取最新topN條新聞等類似問題和生產者消費者模式,集合set可以求對象的共同標簽,而有序集合SortedSet用於游戲中的分數排名,SortedSet底層採用壓縮列表zi... ...
Redis常用數據類型有字元串String、字典dict、列表List、集合Set、有序集合SortedSet,本文將簡單介紹各數據類型及其使用場景,並重點剖析有序集合SortedSet的實現。
List的底層實現是類似Linked List雙端鏈表的結構,而不是數組,插入速度快,不需要節點的移動,但不支持隨機訪問,需要順序遍歷到索引所在節點。List有兩個主要的使用場景:
- 記住用戶最新發表的博文,每次用戶發表了文章,將文章id使用LPUSH加入到列表中,用戶訪問自己的主頁時,使用LRANGE 0 9獲取最新10條博文(使用LTRIM 0 9可以取出最新10條文章的同時,刪除舊的文章),而不用使用order by sql語句去後端資料庫取數據。
- 生產者/消費者模式,生產者往List中加入數據,消費者從List中取數據。當List為空時,消費者rpop返回為NULL,這是會進行輪詢,等待一段時間繼續去取。輪詢模式有如下缺點:
- 客戶端和redis耗費cpu和網路帶寬等資源執行無效命令。
- 取回NULL後,sleep會使有新數據時,客戶端消費不夠及時。
為瞭解決輪詢的問題,Redis提供了brpop和blpop實現Blocking讀,當List為空時,等待一段時間再返回,當有數據時,按請求順序返回給各客戶端。(當List為空時,可以將請求Blocking讀命令的客戶端加入此List的Blocking讀列表中,有數據時按列表序返回)
集合Set的底層實現是類似Hash,不過value全為NULL,set有求並、交、差集及隨機取的功能。使用場景如下:
- 表示對象之間的聯繫,比如求擁有標簽1、2、10的新聞,使用sinter tag:1:news tag:2:news tag:10:news。
- 隨機發牌,使用spop,spop隨機返回集合中的元素,比如給每位玩家發五張牌,每位玩家調用五次spop即可,為了下次發牌不需要再將牌加入set,可以在這次發牌前調用sunionstore將牌複製。
有序集合SortedSet(t_zset.c),集合中的每個值都帶有分數,按分數排序,底層實現較為複雜,用到了ziplist、skiplist和dict數據結構,後文將進行詳細介紹。使用場景如下:
- 排行榜問題,比如游戲排行榜,按用戶分數排序,並取top N個用戶。
在redis中,所有數據類型都被封裝在一個redisObject結構中,用於提供統一的介面,結構如下表1:
表1 redisObject
redisObject源碼(server.h) |
typedef struct redisObject { unsigned type:4;//對象類型,用於分辨字元串、列表、
|
有序列表有壓縮列表ziplist和跳錶skiplist兩種實現方式,通過encoding識別,當數據項數目小於zset_max_ziplist_entries(預設為128),且保存的所有元素長度不超過zset_max_ziplist_value(預設為64)時,則用ziplist實現有序集合,否則使用zset結構,zset底層使用skiplist跳錶和dict字典。創建有序集合的關鍵代碼如下表2:
表2 創建有序集合
zaddGenericCommand函數 |
if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { zobj = createZsetObject(); //創建zset } else { zobj = createZsetZiplistObject();//創建ziplist }
|
ziplist是一個記憶體連續的特殊雙向鏈表LinkList,減少了記憶體碎片和指針的占用,用於節省記憶體,但對ziplist進行操作會導致記憶體的重新分配,影響性能,故在元素較少時用ziplist。ziplist記憶體佈局如下:
<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
表3 ziplist在記憶體中各位元組含義
Field |
含義 |
zlbytes(uint32_t) |
ziplist占用的記憶體位元組數,包括zlbytes本身 |
zltail(uint32_t) |
最後一個entry的offset偏移值 |
zllen(uint16_t) |
數據項entry的個數 |
entry(變長) |
數據項 |
zlend(uint8_t) |
標識ziplist的結束,值為255 |
數據項entry的記憶體結構如下:<prevlen> <encoding> <entry-data>,當保存的是小整型數據時,entry沒有entry-data域, encoding本身包含了整型元素值。Entry各位元組含義如下表4:
表4 entry各Field含義
Field |
含義 |
prevlen |
上一個數據項entry的長度。當長度小於254位元組,則prevlen占1位元組,當長度大於或等於254位元組,則prevlen占5位元組,首位元組值為254,剩下4位元組表示上一entry長度。 |
encoding |
encoding的值依賴於數據entry-data。首位元組的前兩個bit為00、01、10,標識entry-data為字元串,同時表示encoding的長度分別為1、2、5位元組,除前兩個bit,剩下的bit表示字元串長度;前兩個bit為11,表示entry-data為整型,接下來的2 bit表示整數類型。entry-data不同類型及encoding如下: 1) |00pppppp| - 1 byte,字元串且長度小於等於63位元組(6bit) 2) |01pppppp|qqqqqqqq| - 2 bytes,字元串且長度小於等於16383位元組(14bit) 3) |10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| - 5 bytes,字元串且長度大於等於16384(後面四個位元組表示長度,首位元組的低位6bit設為0) 4) |11000000| - 1 bytes,len欄位為1位元組,後面的entry-data為整型且類型為int16_t (2 bytes) 5) |11010000| - 1 bytes, entry-data為整型且類型為int32_t (4 bytes) 6) |11100000| - 1 bytes, entry-data為整型且類型為int64_t (8 bytes) 7) |11110000| - 1 bytes, entry-data為整型且占3 bytes 8) |11111110| - 1 bytes, entry-data為整型且占1 bytes 9) |1111xxxx| - (with xxxx between 0000 and 1101),xxxx的值從1到13,可用於表示entry-data(1到12),encoding包含entry-data的值,從而不需要entry-data域 10) |11111111| - 用於標識ziplist的結束 |
entry-data |
具體的數據 |
ziplist在記憶體中的實例如圖1,zibytes占4位元組(小端存儲),值為15,表示此ziplist占用記憶體15位元組;zltail占4位元組,值為12,表示最後一個數據項entry(這裡是5所在的entry),距離ziplist的開頭offset為12位元組;entries占2位元組,表示數據項數目為2; "00 f3"表示第一個entry(值為2),”00”表示前一個entry的長度為0(prevlen),”f3”對應encoding中的第9種情況(“11110011”),表示數據為整型且值為2;”02 f6”表示第二個entry,”02”表示前一個entry的長度為2(prevlen),”f6”也對應encoding的第9種情況(“11110110”),表示數據為整型且值為6.
圖1 ziplist在記憶體中的實例
ziplist在redis中插入數據的源碼及註釋如表5:
表5 ziplist插入數據源碼
ziplist插入邏輯源碼(ziplist.c) |
/* Insert item at "p". */ unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) { size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; unsigned int prevlensize, prevlen = 0; size_t offset; int nextdiff = 0; unsigned char encoding = 0; long long value = 123456789; /* initialized to avoid warning. Using a value that is easy to see if for some reason we use it uninitialized. */ zlentry tail; /* Find out prevlen for the entry that is inserted. */ //插入位置前面一個entry節點占用的位元組數prevlen if (p[0] != ZIP_END) {//插入節點不在末尾節點,直接從p的前面位元組讀 ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } else {//插入節點在末尾位置,找到末尾節點 unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl); if (ptail[0] != ZIP_END) { prevlen = zipRawEntryLength(ptail); } } /* See if the entry can be encoded */ if (zipTryEncoding(s,slen,&value,&encoding)) {//判斷s是否可以轉化為整數,並將整數值和enconding分別存在value和encoding指針 /* 'encoding' is set to the appropriate integer encoding */ reqlen = zipIntSize(encoding);//整數值長度 } else { /* 'encoding' is untouched, however zipStoreEntryEncoding will use the * string length to figure out how to encode it. */ reqlen = slen;//字元串長度 } /* We need space for both the length of the previous entry and * the length of the payload. */ //得出新插入節點占用的總位元組數reqlen reqlen += zipStorePrevEntryLength(NULL,prevlen); reqlen += zipStoreEntryEncoding(NULL,encoding,slen); /* When the insert position is not equal to the tail, we need to * make sure that the next entry can hold this entry's length in * its prevlen field. */ //插入新節點不在末尾位置,則插入位置p所指向的entry節點的prevlen, //值會變成新插入節點的總長度,且prevlen所占用的位元組數可能會變化, //nextdiff表示新插入節點下一節點的prevlen需要空間的變化,負值表示變小, //正值表示擴大 int forcelarge = 0; nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0; if (nextdiff == -4 && reqlen < 4) { nextdiff = 0; forcelarge = 1; } /* Store offset because a realloc may change the address of zl. */ offset = p-zl; zl = ziplistResize(zl,curlen+reqlen+nextdiff);//重新分配空間,並將zl的每位元組都填充到新分配的記憶體中 p = zl+offset; //將p後面的數據項進行移動 /* Apply memory move when necessary and update tail offset. */ if (p[0] != ZIP_END) { /* Subtract one because of the ZIP_END bytes */ memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff); /* Encode this entry's raw length in the next entry. */ if (forcelarge)//設置下一個節點的prevlen zipStorePrevEntryLengthLarge(p+reqlen,reqlen); else zipStorePrevEntryLength(p+reqlen,reqlen); /* Update offset for tail */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen); /* When the tail contains more than one entry, we need to take * "nextdiff" in account as well. Otherwise, a change in the * size of prevlen doesn't have an effect on the *tail* offset. */ zipEntry(p+reqlen, &tail); if (p[reqlen+tail.headersize+tail.len] != ZIP_END) { ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff); } } else { /* This element will be the new tail. */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl); } /* When nextdiff != 0, the raw length of the next entry has changed, so * we need to cascade the update throughout the ziplist */ if (nextdiff != 0) { offset = p-zl; zl = __ziplistCascadeUpdate(zl,p+reqlen); p = zl+offset; } /* Write the entry */ //將新數據項放入插入位置 p += zipStorePrevEntryLength(p,prevlen); p += zipStoreEntryEncoding(p,encoding,slen); if (ZIP_IS_STR(encoding)) { memcpy(p,s,slen); } else { zipSaveInteger(p,value,encoding); } ZIPLIST_INCR_LENGTH(zl,1); return zl; }
|
zset在redis中的定義如表6:
表6 zset源碼
zset定義(server.h) |
typedef struct zset { dict *dict;//字典 zskiplist *zsl;//跳錶 } zset;
|
zset同時使用dict和zskiplist實現有序集合的功能,dict是為了快速獲得指定元素的分值(zscore命令,時間複雜度為O(1)),zskiplist是為了快速範圍查詢(zrank、zrange命令)。本文重點講解跳錶的知識。
skiplist是在有序鏈表的基礎上發展而來,在有序鏈表中進行查找,需要進行順序遍歷,時間複雜度為O(n),同樣,進行插入也需要順序遍歷到插入位置,時間複雜度也為O(n)。
圖2 有序鏈表
利用有序的性質,每兩個節點多加一個指針,指向下下個節點,如圖3所示,新增加的指針可以構成一個新的有序鏈表,新鏈表節點個數只有下層鏈表的一半,當查找元素時,可以從新鏈表開始向右查找,碰到比查找元素大的節點,則回到下一層鏈表查找,比如查找元素20,查找路徑如下圖中標記為紅的路徑(head->8->17->23,23比20大,到下一層查找,17->20),由於新增的指針,查找元素時不需要和每個節點進行比較,需要比較的節點大概為原來的一半。
圖3 雙層有序鏈表
可以在新產生的鏈表之上,每隔兩個節點,再增加一個指針,從而產生第三層鏈表,如圖4所示,紅色箭頭代表查找路徑,從最上層鏈表開始查找,一次可以跳過四個節點,進一步加快了查找速度。
圖4 多層有向鏈表
skiplist借鑒了多層鏈表的思想,但多層鏈表這種嚴格的2:1關係,會導致插入和刪除節點破壞上下層之間的2:1關係,導致插入位置和刪除位置及後續的所有節點都需要進行調整。skiplist並不採用這種嚴格的2:1對應關係,每個節點的層數採用隨機生成的方法,節點插入例子如下圖5所示,插入節點不會影響其它節點的層數,且只需調整插入節點前後的指針,不需要對所有節點進行調整,降低了插入的複雜度。
圖5 skiplist插入節點過程
skiplist隨機生成層數level的的代碼如表7:
表7 隨機生成節點層數
zslRandomLevel函數(t_zset.c) |
int zslRandomLevel(void) { //隨機生成節點層數,當第i層節點存在時,第i+1節點存在的概率為ZSKIPLIST_P = 1/4 //ZSKIPLIST_MAXLEVEL 64,表示節點的最大層數 int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
|
skiplist時間複雜度為o(),所占用空間的大小依賴於插入元素隨機生成的層數,每個元素level至少為1,層數越高,生成的概率越低,節點的層數服從一定的概率分佈,如下:
- 節點恰好只有一層的概率為1-p
- 節點層數大於等於2的概率為p,恰好等於2的概率為p(1-p)
- 節點層數大於等於k的概率為pk-1,恰好等於k的概率為pk-1(1-p)
每個節點的平均層數計算如下:
平均層數代表每個節點的平均指針數目,在redis中,p=1/4,因此平均指針數目為1.33。
在redis中skiplist的定義代碼如表8,zskiplist表示跳錶, zskiplistNode表示跳錶中的節點, zskiplistNode包含了分值,每個節點按分值排序,且節點包含後退指針,用於雙向遍歷。
表8 redis中跳錶結構
zskiplist及zskiplistNode(server.h) |
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele;//實際存儲的數據 double score;//分值 struct zskiplistNode *backward;//後退指針,指向前一個節點 struct zskiplistLevel { struct zskiplistNode *forward;//前進指針,指向下一個節點 unsigned long span;//跨度,表示該層鏈表的這一節點到下一節點跨越的節點數,用於計算rank } level[];//層級數組,每個層級都有到下一個節點的指針和跨度 } zskiplistNode;//跳錶節點 typedef struct zskiplist { struct zskiplistNode *header, *tail;//跳錶頭節點和尾節點 unsigned long length;//跳錶元素個數 int level;//跳錶的最高層數(不包括頭節點,頭節點實際上並不存儲數據) } zskiplist;
|
redis中,zskiplist插入元素的代碼如表9,在查找插入位置的過程中,記下每層需要更新的前一節點在update數組中。
表9 跳錶插入節點源代碼
zslInsert(t_zset.c) |
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ //rank[i]初始化為rank[i+1],所以rank[i]-rank[i+1]表示在i層走過的節點數 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } // 記錄將要和新節點相連接的節點,x表示新節點在i層連接的上一節點 update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ level = zslRandomLevel();//隨機生成此節點的層數 if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ //rank[0]表示0層鏈表,插入節點x左邊的節點數 //rank[i]表示i層鏈表,插入節點x左邊的節點數 //rank[0] - rank[i]+1表示i層鏈表,x前一節點到x的跨度 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ //在level及之上的每層,update[i]到下一節點的距離由於插入了x節點而加1 for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } //更新後退指針 x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; }
|
與平衡樹(AVL、紅黑樹)比,skiplist有如下優點,這也是redis使用跳錶做有序集合底層結構而不選用平衡樹的原因。
- 占用記憶體少。通過調節概率p,可以使每個節點的平均指針數發生變化,redis中為1.33,而二叉樹每個節點都有兩個指針。
- ZRANGE or ZREVRANGE等範圍查詢更簡單。Skiplist可以看作特殊的雙向鏈表,只需找到範圍中的最小節點,順序遍歷即可,而平衡樹找到範圍中的最小節點,仍需中序遍歷。
- 和紅黑樹等比,skiplist實現和調試簡單。
參考文獻
- An introduction to Redis data types and abstractions.
- Redis內部數據結構詳解(4)——ziplist.
- Pugh W. Skip lists: a probabilistic alternative to balanced trees[J]. Communications of the ACM, 1990, 33(6): 668-677.
- Redis為什麼用跳錶而不用平衡樹?
- Is there any particular reason you chose skip list instead of btrees except for simplicity?