redis算是緩存界的老大哥了,最近做的事情對redis依賴較多,使用了裡面的發佈訂閱功能,事務功能以及SortedSet等數據結構,後面準備好好學習總結一下redis的一些知識點。 原文地址:http://www.jianshu.com/p/8209554b36ce 先看下redis發佈訂閱的結構 ...
redis算是緩存界的老大哥了,最近做的事情對redis依賴較多,使用了裡面的發佈訂閱功能,事務功能以及SortedSet等數據結構,後面準備好好學習總結一下redis的一些知識點。
原文地址:http://www.jianshu.com/p/8209554b36ce
先看下redis發佈訂閱的結構:
其中發佈者跟訂閱者之間通過channel進行交互,channel分為兩種模式。
一、redis發佈訂閱命令簡介
redis中為發佈訂閱(pub/sub)功能提供了六個命令,分為兩種模式。
- 由subscribe,unsubscribe組成,它們是負責訂閱有確定名稱的channel,例如subscribe test表示訂閱名字為test的channel。
- 由psubscribe,punsubscribe組成,是負責訂閱模糊名字的channel,例如psubscribe test* 表示訂閱所有以test開頭的channel。
最後再加上發佈命令publish以及查看訂閱相關信息的pubsub命令組成。
二、redis發佈訂閱源碼分析
redis所有的命令及其處理函數都放在了server.c文件的開頭,從其中找出發佈訂閱功能相關的命令信息。
{"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
{"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
{"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
這裡可以看出創建一條命令需要很多參數,我們這裡只需要關註前兩個參數,第一個參數表示命令的內容,第二個表示該命令對應的處理函數。
普通模式訂閱subscribe函數:
該命令支持多個參數,即subscribe channel1,channel2...
void subscribeCommand(client *c) {
int j;
//這裡挨個處理subscribe的參數,因為命令本身被作為參數0所以從1開始處理後面的參數
for (j = 1; j < c->argc; j++)
//訂閱每個頻道
pubsubSubscribeChannel(c,c->argv[j]);
//這裡設置客戶端的狀態,下麵會解釋這個狀態的作用
c->flags |= CLIENT_PUBSUB;
}
在server.c文件中,processCommand函數是在調用具體命令函數之前的判斷邏輯,其中有一段:
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
這裡註釋也寫的很清楚,就是當client處於pub/sub上下文時,只接收訂閱相關命令以及一個ping命令,這就解釋了上面subscribeCommand函數中為什麼要設置客戶端flag欄位。
接下來看下訂閱的具體邏輯:
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
//把指定channel加入到client的pubsub_channels哈希表中
//不成功說明已經訂閱了該頻道
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
//這裡是把該channel加入到client的哈希表中,引用加1
incrRefCount(channel);
//在server的發佈訂閱哈希表中查找指定channel
de = dictFind(server.pubsub_channels,channel);
//如果該channel還不存在,則創建
if (de == NULL) {
//創建一個空list
clients = listCreate();
//把channel加入到server的哈希表中,value就是該channel的所有訂閱者
dictAdd(server.pubsub_channels,channel,clients);
//該channel引用加1
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
//把client加入到該channel的訂閱列表中
listAddNodeTail(clients,c);
}
//一系列通知客戶端的操作
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
總結一下,訂閱其實就是把指定channel分別加入到client跟server的pub/sub哈希表中,然後在server端保存訂閱了該channle的所有client列表,如下圖:
下麵看一下publish發佈命令:
例如:publish channelName msg
void publishCommand(client *c) {
//發佈邏輯
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
//這裡是關於集群或者AOF的操作
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
//返回給client通知了的訂閱者數
addReplyLongLong(c,receivers);
}
重點看下發佈函數的源碼:
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
//根據上面的訂閱源碼,這裡就是取出訂閱該channel的所有clients
de = dictFind(server.pubsub_channels,channel);
if (de) {
//獲取client的鏈表
list *list = dictGetVal(de);
listNode *ln;
listIter li;
//由client鏈表創建它的迭代器,c++代碼真是無力吐槽
listRewind(list,&li);
//遍歷所有client併發送消息
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
//開始模糊匹配的邏輯處理,模糊模式使用的是鏈表而不是哈希表,後面會講
if (listLength(server.pubsub_patterns)) {
//創建模糊規則的迭代器li
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
//遍歷所有的模糊模式,如果匹配成功則發送消息
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
//判斷當前channel是否可以匹配模糊規則
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
從上面的publish處理函數可以看出每次進行消息發佈的時候,都會向普通模式跟模糊模式發佈消息,同時也能看出普通模式跟模糊模式使用的是兩種不同的數據結構,下麵看下模糊訂閱模式。
模糊模式訂閱psubscribe函數:
//psubscribe命令對應的處理函數
void psubscribeCommand(client *c) {
int j;
//挨個訂閱client指定的pattern
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
//修改client狀態
c->flags |= CLIENT_PUBSUB;
}
int pubsubSubscribePattern(client *c, robj *pattern) {
int retval = 0;
//判斷client是否已經訂閱該pattern,這裡與普通模式不同,是個鏈表
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
//把指定pattern加入到client的pattern鏈表中
listAddNodeTail(c->pubsub_patterns,pattern);
//引用計數+1
incrRefCount(pattern);
//這裡是創建一個pattern對象,並指向該client,加入到server的pattern鏈表中
//從這裡可以看出,多個client訂閱同一個pattern會創建多個patter對象,與普通模式不同
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
}
//通知客戶端
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
通過分析上面的源碼可以總結一下模糊訂閱中的數據結構,如下圖:
註:正如上面提到的,模糊模式中,一個pat對象中包含一個pattern規則跟一個client指針,也就是說當多個client模糊訂閱同一個pattern時同樣會為每個client都創建一個節點。
普通模式取消訂閱unsubscribe函數:
取消就相對簡單了,說白了就是把上面鎖保存在server跟client端的數據刪除。
取消訂閱入口
void unsubscribeCommand(client *c) {
//如果該命令沒有參數,則把channel全部取消
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
//迭代取消置頂channel
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
//如果channel被全部取消,則修改client狀態,這樣client就可以發送其他命令了
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
//一次性取消訂閱所有channel
int pubsubUnsubscribeAllChannels(client *c, int notify) {
//取出client端所有的channel
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
int count = 0;
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
//最終也是挨個取消channel
count += pubsubUnsubscribeChannel(c,channel,notify);
}
//如果client上面都沒有訂閱,依然返迴響應
if (notify && count == 0) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReply(c,shared.nullbulk);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
//釋放空間
dictReleaseIterator(di);
return count;
}
//取消訂閱指定channel
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
//從client中刪除指定channel
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
//刪除服務端該channel中的指定client
de = dictFind(server.pubsub_channels,channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
//如果刪除完以後channel沒有了訂閱者,則把channel也刪除
dictDelete(server.pubsub_channels,channel);
}
}
//返回client響應
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
//引用計數-1
decrRefCount(channel);
return retval;
}
由於模糊模式的取消訂閱與普通模式類似,這裡就不再貼代碼了。
三、redis發佈訂閱總結
整個發佈訂閱的代碼比較簡單清晰,一個值得思考的問題時普通模式跟模糊模式中分別使用了哈希表跟鏈表兩種結構進行處理,而不是統一的,原因在於模糊模式不能精確匹配,需要遍歷挨個判斷,而哈希表的優勢在於快速定位查找,在需要遍歷跟模糊匹配的場景中並不適用。