關於Redis中交互的過程

来源:http://www.cnblogs.com/chenpingzhao/archive/2016/03/08/5215467.html
-Advertisement-
Play Games

一、Redis啟動 載入配置(命令行或者配置文件) 啟動TCP監聽,客戶端的列表保存在redisserver的clients中 啟動AE Event Loop事件,非同步處理客戶請求 事件處理器的主迴圈 aeMain void aeMain(aeEventLoop *eventLoop) { even


一、Redis啟動

  • 載入配置(命令行或者配置文件)

  • 啟動TCP監聽,客戶端的列表保存在redisserver的clients中

  • 啟動AE Event Loop事件,非同步處理客戶請求

事件處理器的主迴圈

aeMain

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有需要在事件處理前執行的函數,那麼運行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);   // 開始處理事件
    }
}

事件處理框架非常簡單,從初始化、服務到結束,分別對應的函數:aeCreateEventLoop、aeMain、aeDeleteEventLoop 其中,aeMain是事件迴圈的主體函數,它又會調用 aeProcessEvents函數,三個主體函數會調用aeApiCreate、aeApiPool、aeApiFree三個介面函數進行處理。 這三個介面函數又會映射到具體的某一種網路模型中

aeDeleteEventLoop

void aeDeleteEventLoop(aeEventLoop *eventLoop) {
    aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);
    zfree(eventLoop);
}

aeCreateEventLoop

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
 
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
 
err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }                                                                                                                                                                                        
    return NULL;
}

處理事件的時候,aeMain函數調用aeProcessEvents函數,在一個迴圈中處理文件事件和到期的時間事件。 aeProcessEvents函數調用aeSearchNearestTimer函數來查詢事件迴圈中最先要過期的事件,時間複雜度為O(N)。先處理文件事件,然後再處理時間事件。

aeProcessEvents

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{    
    int processed = 0, numevents;
     
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
     
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
    
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
     
            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
     
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
     
|       /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
     
    return processed; /* return the number of processed file/time events */
}

二、接受客戶端TCP連接流程

acceptTcpHandler()該函數會調用acceptCommonHander(),而acceptCommonHander()又會調用createClient()來為該client創建一個redisClient對象,最終,redis會根據用戶輸入的命令通過查找命令表調用已經寫好的命令執行函數

acceptTcpHandler

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
 
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }    
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0);
    }
}

client輸入get命令,redis server最終會調用getCommand函數,client輸入set命令,redis最終會調用setCommand函數

redis執行完用戶的一個命令後,會將結果寫入到redisClient對象中的reply list中,而sendReplyToClient函數會不斷的從該list中數據,非同步地發送給client。需要註意的是,sendReplyToClient函數也是通過aeCreateFileEvent註冊的

三、處理客戶請求流程

通過processInputBuffer()來解析querybuf, 若c->querybuf存在多條命令,則依次解析並處理這些命令

processInputBuffer

void processInputBuffer(redisClient *c) {
    /* Keep processing while there is something in the input buffer */
    while(sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
      
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & REDIS_BLOCKED) return;
     
        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands). */
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
      
        /* Determine request type when unknown. */
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = REDIS_REQ_MULTIBULK;
            } else {
                c->reqtype = REDIS_REQ_INLINE;
            }
        }
      
        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }
      
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    } 
}
  • 如果是telnet發送的裸協議數據是沒有*打頭的表示參數個數的輔助信息,用processInlineBuffer()函數解析輸入

  • 其他則通過processMultibulkBuffer()函數解析

  • 若解析函數返回REDIS_ERR,則等待下一次read(),是因為客戶端緩存數據還沒構成一條命令即不滿足Redis協議格式;否則返回REDIS_OK, 處理命令

四、響應客戶流程

數據讀取 readQueryFromClient

調用系統函數read來讀取客戶端傳送過來的數據, 調用read後對讀取過程中所遇到的情況:

  • 系統中斷(nread == -1 && errno == EAGAIN)

  • 讀取出錯(nread == -1 && errno != EAGAIN) freeClient()

  • 客戶端關閉(nread == 0) freeClient()

  • 超過讀取數據限制(1GB)則報錯。 讀取完後進入processInputBuffer進行協議解析

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    int nread, readlen;
    size_t qblen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
      
    server.current_client = c;
    readlen = REDIS_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= REDIS_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
      
        if (remaining < readlen) readlen = remaining;
    } 
      
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            nread = 0;
        } else {
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        redisLog(REDIS_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } 
    if (nread) {
        sdsIncrLen(c->querybuf,nread);
        c->lastinteraction = server.unixtime;
        if (c->flags & REDIS_MASTER) c->reploff += nread;
        server.stat_net_input_bytes += nread;
    } else {
        server.current_client = NULL;
        return;
    } 
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
      
        bytes = sdscatrepr(bytes,c->querybuf,64);
        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    } 
    processInputBuffer(c);
    server.current_client = NULL;
}  

數據解析

從readQueryFromClient()函數讀取客戶端傳過來的數據,進入processInputBuffer()函數進行協議解析,可以把processInputBuffer函數看作是輸入數據的協議解析器

Redis支持兩種協議,一種是inline,一種是multibulk。inline協議是老協議,現在一般只在命令行下的redis客戶端使用,其他情況一般是使用multibulk協議。

如果客戶端傳送的數據的第一個字元時‘*’,那麼傳送數據將被當做multibulk協議處理,否則將被當做inline協議處理。Inline協議的具體解析函數是processInlineBuffer(),multibulk協議的具體解析函數是processMultibulkBuffer()。 當協議解析完畢,即客戶端傳送的數據已經解析出命令欄位和參數欄位,接下來進行命令處理,命令處理函數是processCommand。

發送數據 sendReplyToClient

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = privdata;
    int nwritten = 0, totwritten = 0, objlen;
    size_t objmem;
    robj *o;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
 
    while(c->bufpos > 0 || listLength(c->reply)) {
        if (c->bufpos > 0) {
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
 
            /* If the buffer was sent, set bufpos to zero to continue with
             * the remainder of the reply. */
            if (c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o->ptr);
            objmem = getStringObjectSdsUsedMemory(o);
 
            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                c->reply_bytes -= objmem;
                continue;
            }
 
            nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
 
            /* If we fully sent the object on head go to the next one */
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objmem;
            }
        }
        /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
         * bytes, in a single threaded server it's a good idea to serve
         * other clients as well, even if a very large request comes from
         * super fast link that is always able to accept data (in real world
         * scenario think about 'KEYS *' against the loopback interface).
         *
         * However if we are over the maxmemory limit we ignore that and
         * just deliver as much data as it is possible to deliver. */
        server.stat_net_output_bytes += totwritten;
        if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory)) break;
    }
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            redisLog(REDIS_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }
    if (totwritten > 0) {
        /* For clients representing masters we don't count sending data
         * as an interaction, since we always send REPLCONF ACK commands
         * that take some time to just fill the socket output buffer.
         * We just rely on data / pings received for timeout detection. */
        if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime;
    }
    if (c->bufpos == 0 && listLength(c->reply) == 0) {
        c->sentlen = 0;
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 
        /* Close connection after entire reply has been sent. */
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);
    }
}

通過調用系統函數write給客戶端發送數據,如果緩衝區有數據就把緩衝區的數據發送給客戶端,緩衝區的數據發送完了,如果有排隊數據,則繼續發送。

  • 發送緩衝區(c->buf)的內容

  • 發送回覆鏈表(c->reply)的內容

寫入異常處理

  • 被系統中斷(nwritten == -1 && errno == EAGAIN)

  • 寫數據出錯(nwritten == -1 && errno != EAGAIN),釋放客戶端freeClient()

 

參考文章

http://arc8.riaos.com/?p=6061

https://github.com/microheart/annotated_memcached/blob/master/note/Redis/Redis_main_flow.md

http://blog.csdn.net/ordeder/article/details/12791359

http://blog.nosqlfan.com/tags/redis


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 改下build.gradle文件,將裡面的compileSdkVersion改為23即可 apply plugin: 'com.android.application' android { compileSdkVersion 23 buildToolsVersion "23.0.2" default
  • 該APP堪比陌陌、微信的即時通訊,功能齊全,運行正常,是一個做即時通訊軟體參考的好源碼。具體的功能有獲取好友,設置頭像,發表說說,查看好友說說,相冊,上傳圖片,與好友聊天,發送圖片,位置,文件,語音消息,表情等等功能,效果直接看下圖。測試賬號:[email protected]密碼:123456 詳細說明:
  • 我為何要封裝DialogFragment 最近在重構項目代碼,項目中創建對話框用的是Dialog,AlertDialog。但是官方推出了DialogFragment來代替Dialog。那我就去認真的瞭解下DialogFragment。 DialogFragment DialogFragment是在A
  • 仿大眾點評UI項目,實現了搜全城、餐廳排行榜頁面、按分類、地區展示、商品詳情頁面、留言點評頁面、單獨的分類頁面、登錄註冊頁面、簽到展示頁面、個人中心頁面、更多頁面等效果。 詳細說明:http://android.662p.com<ignore_js_op><ignore_js_op><ignore_
  • 一、簡介 可執行鏈接格式(Executable and Linking Format)最初是由 UNIX 系統實驗室(UNIX System Laboratories,USL)開發併發布的,作為應用程式二進位介面(Application Binary Interface,ABI)的一部分。工具介面標
  • 1 作為互聯網技術從業人 或者粗暴點說:作為一個程式猿、測試從業者 如果沒掉過一些坑,都不好意思說自己混過技術圈 2 今天重點講:mysql開啟遠程訪問許可權的那些坑~ 對於mysql開啟遠程訪問許可權 網上各種文章,一抓一大把 今天不重點講:百度搜索“mysql遠程訪問”,可以看到你需要的很多文章 -
  • 可以用來跟蹤執行的sql語句。安裝SqlServer之後SqlServerManagementStudio自帶一個SqlProfiler,但是如果安裝的SqlExpress,那就沒有了。 項目的主頁在https://expressprofiler.codeplex.com/,點右邊的“Downloa...
  • 一.amoeba介紹 Amoeba(變形蟲)項目,該開源框架於2008年 開始發佈一款 Amoeba for Mysql軟體。這個軟體致力於MySQL的分散式資料庫前端代理層,它主要在應用層訪問MySQL的 時候充當SQL路由功能,專註於分散式資料庫代理層(Database Proxy)開發。座落與
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...