redis源碼分析之發佈訂閱(pub/sub)

来源:http://www.cnblogs.com/lfls/archive/2017/11/05/7788310.html
-Advertisement-
Play Games

redis算是緩存界的老大哥了,最近做的事情對redis依賴較多,使用了裡面的發佈訂閱功能,事務功能以及SortedSet等數據結構,後面準備好好學習總結一下redis的一些知識點。 原文地址:http://www.jianshu.com/p/8209554b36ce 先看下redis發佈訂閱的結構 ...


redis算是緩存界的老大哥了,最近做的事情對redis依賴較多,使用了裡面的發佈訂閱功能,事務功能以及SortedSet等數據結構,後面準備好好學習總結一下redis的一些知識點。

原文地址:http://www.jianshu.com/p/8209554b36ce

先看下redis發佈訂閱的結構:

redis發佈訂閱結構

其中發佈者跟訂閱者之間通過channel進行交互,channel分為兩種模式。

一、redis發佈訂閱命令簡介

redis中為發佈訂閱(pub/sub)功能提供了六個命令,分為兩種模式。

  1. 由subscribe,unsubscribe組成,它們是負責訂閱有確定名稱的channel,例如subscribe test表示訂閱名字為test的channel。
  2. 由psubscribe,punsubscribe組成,是負責訂閱模糊名字的channel,例如psubscribe test* 表示訂閱所有以test開頭的channel。

最後再加上發佈命令publish以及查看訂閱相關信息的pubsub命令組成。

二、redis發佈訂閱源碼分析

redis所有的命令及其處理函數都放在了server.c文件的開頭,從其中找出發佈訂閱功能相關的命令信息。

    {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
    {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
    {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},

這裡可以看出創建一條命令需要很多參數,我們這裡只需要關註前兩個參數,第一個參數表示命令的內容,第二個表示該命令對應的處理函數。

普通模式訂閱subscribe函數:
該命令支持多個參數,即subscribe channel1,channel2...

void subscribeCommand(client *c) {
    int j;
    //這裡挨個處理subscribe的參數,因為命令本身被作為參數0所以從1開始處理後面的參數
    for (j = 1; j < c->argc; j++)
        //訂閱每個頻道
        pubsubSubscribeChannel(c,c->argv[j]);
    //這裡設置客戶端的狀態,下麵會解釋這個狀態的作用
    c->flags |= CLIENT_PUBSUB;
}

在server.c文件中,processCommand函數是在調用具體命令函數之前的判斷邏輯,其中有一段:

/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }

這裡註釋也寫的很清楚,就是當client處於pub/sub上下文時,只接收訂閱相關命令以及一個ping命令,這就解釋了上面subscribeCommand函數中為什麼要設置客戶端flag欄位。

接下來看下訂閱的具體邏輯:

int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    //把指定channel加入到client的pubsub_channels哈希表中
    //不成功說明已經訂閱了該頻道
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        //這裡是把該channel加入到client的哈希表中,引用加1
        incrRefCount(channel);
        //在server的發佈訂閱哈希表中查找指定channel
        de = dictFind(server.pubsub_channels,channel);
        //如果該channel還不存在,則創建
        if (de == NULL) {
            //創建一個空list
            clients = listCreate();
            //把channel加入到server的哈希表中,value就是該channel的所有訂閱者
            dictAdd(server.pubsub_channels,channel,clients);
            //該channel引用加1
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        //把client加入到該channel的訂閱列表中
        listAddNodeTail(clients,c);
    }
    //一系列通知客戶端的操作
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

總結一下,訂閱其實就是把指定channel分別加入到client跟server的pub/sub哈希表中,然後在server端保存訂閱了該channle的所有client列表,如下圖:

普通模式發佈訂閱數據結構

下麵看一下publish發佈命令:
例如:publish channelName msg

void publishCommand(client *c) {
    //發佈邏輯
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    //這裡是關於集群或者AOF的操作
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    //返回給client通知了的訂閱者數
    addReplyLongLong(c,receivers);
}

重點看下發佈函數的源碼:

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    //根據上面的訂閱源碼,這裡就是取出訂閱該channel的所有clients
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        //獲取client的鏈表
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
        //由client鏈表創建它的迭代器,c++代碼真是無力吐槽
        listRewind(list,&li);
        //遍歷所有client併發送消息
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    
    //開始模糊匹配的邏輯處理,模糊模式使用的是鏈表而不是哈希表,後面會講
    if (listLength(server.pubsub_patterns)) {
        //創建模糊規則的迭代器li
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        //遍歷所有的模糊模式,如果匹配成功則發送消息
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            //判斷當前channel是否可以匹配模糊規則
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

從上面的publish處理函數可以看出每次進行消息發佈的時候,都會向普通模式跟模糊模式發佈消息,同時也能看出普通模式跟模糊模式使用的是兩種不同的數據結構,下麵看下模糊訂閱模式。

模糊模式訂閱psubscribe函數:

//psubscribe命令對應的處理函數
void psubscribeCommand(client *c) {
    int j;
    //挨個訂閱client指定的pattern
    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
    //修改client狀態
    c->flags |= CLIENT_PUBSUB;
}

int pubsubSubscribePattern(client *c, robj *pattern) {
    int retval = 0;
    //判斷client是否已經訂閱該pattern,這裡與普通模式不同,是個鏈表
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        //把指定pattern加入到client的pattern鏈表中
        listAddNodeTail(c->pubsub_patterns,pattern);
        //引用計數+1
        incrRefCount(pattern);
        //這裡是創建一個pattern對象,並指向該client,加入到server的pattern鏈表中
        //從這裡可以看出,多個client訂閱同一個pattern會創建多個patter對象,與普通模式不同
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    //通知客戶端
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

通過分析上面的源碼可以總結一下模糊訂閱中的數據結構,如下圖:

模糊發佈訂閱模式數據結構

註:正如上面提到的,模糊模式中,一個pat對象中包含一個pattern規則跟一個client指針,也就是說當多個client模糊訂閱同一個pattern時同樣會為每個client都創建一個節點。

普通模式取消訂閱unsubscribe函數:
取消就相對簡單了,說白了就是把上面鎖保存在server跟client端的數據刪除。

取消訂閱入口
void unsubscribeCommand(client *c) {
    //如果該命令沒有參數,則把channel全部取消
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;
        //迭代取消置頂channel
        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
    //如果channel被全部取消,則修改client狀態,這樣client就可以發送其他命令了
    if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

//一次性取消訂閱所有channel
int pubsubUnsubscribeAllChannels(client *c, int notify) {
     //取出client端所有的channel
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;

    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);
        //最終也是挨個取消channel
        count += pubsubUnsubscribeChannel(c,channel,notify);
    }
    
    //如果client上面都沒有訂閱,依然返迴響應
    if (notify && count == 0) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReply(c,shared.nullbulk);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    //釋放空間
    dictReleaseIterator(di);
    return count;
}

//取消訂閱指定channel
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;
    //從client中刪除指定channel
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        //刪除服務端該channel中的指定client
        de = dictFind(server.pubsub_channels,channel);
        serverAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        serverAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            //如果刪除完以後channel沒有了訂閱者,則把channel也刪除
            dictDelete(server.pubsub_channels,channel);
        }
    }
    //返回client響應
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));

    }
    //引用計數-1
    decrRefCount(channel); 
    return retval;
}

由於模糊模式的取消訂閱與普通模式類似,這裡就不再貼代碼了。

三、redis發佈訂閱總結

整個發佈訂閱的代碼比較簡單清晰,一個值得思考的問題時普通模式跟模糊模式中分別使用了哈希表跟鏈表兩種結構進行處理,而不是統一的,原因在於模糊模式不能精確匹配,需要遍歷挨個判斷,而哈希表的優勢在於快速定位查找,在需要遍歷跟模糊匹配的場景中並不適用。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • db.getCollection('product').update({status:"offline"},{$set:{status:"online"}},false,true) update更新,把所有status為offline的都改為online,註意加引號 db.collection.up... ...
  • MySQL資料庫的性能的影響分析及其優化 MySQL資料庫的性能的影響 一. 伺服器的硬體的限制 二. 伺服器所使用的操作系統 三. 伺服器的所配置的參數設置不同 四. 資料庫存儲引擎的選擇 五. 資料庫的參數配置的不同 六. (重點)資料庫的結構的設計和SQL語句 1). 伺服器的配置和設置(cp ...
  • 避免自己遺忘,在這裡做個筆記: SET XACT_ABORT ON:強制事務回滾,如果不加這句的話事務有可能回滾失敗。 ...
  • 先看一下Redis是一個什麼東西。官方簡介解釋到:Redis是一個基於BSD開源的項目,是一個把結構化的數據放在記憶體中的一個存儲系統,你可以把它作為資料庫,緩存和消息中間件來使用。同時支持strings,lists,hashes,sets,sorted sets,bitmaps,hyperloglo ...
  • create database test default charset utf8 collate utf8_general_ci; ...
  • 最近在工作中接到了一個需求,要求統計當月以10天為一個周期,每個周期的數據彙總信息。假設有一張表如下: 表table_test中 ID AMOUNT CREATE_DATE 1 50 2017-01-01 2 50 2017-01-09 3 50 2017-01-11 4 50 2017-01-19 ...
  • NoSQL資料庫 1、NoSQL簡介 最初表示“反SQL”運動,用新型的非關係型資料庫取代關係資料庫;現在表示“Not only SQL”關係和非關係型資料庫各有優缺點,彼此都無法互相取代。 通常,NoSQL資料庫具有以下幾個特點: (1)靈活的可擴展性 (2)靈活的數據模型 (3)與雲計算近緊密融 ...
  • 方法一: a.第一步:在job中載入兩個文件所在的位置 FileInputFormat.setInputPaths(job, new Path[] { new Path("hdfs://192.168.9.13:8020/gradeMarking"), new Path("hdfs://192.16 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...