引言 - 整體認識 redis ae 事件驅動模型, 網上聊得很多. 但當你仔細看完一篇又一篇之後, 可能你看的很舒服, 但對於 作者為什麼要這麼寫, 出發點, 好處, 缺點 ... 可能還是好模糊, 不是嗎? 我們這裡基於閱讀的人已經瞭解了 IO 復用大致流程且抄寫過 ae 的全部代碼. 好, 那 ...
引言 - 整體認識
redis ae 事件驅動模型, 網上聊得很多. 但當你仔細看完一篇又一篇之後, 可能你看的很舒服, 但對於
作者為什麼要這麼寫, 出發點, 好處, 缺點 ... 可能還是好模糊, 不是嗎?
我們這裡基於閱讀的人已經瞭解了 IO 復用大致流程且抄寫過 ae 的全部代碼. 好, 那開始吧, 希望後面的
點撥, 給同學們醍醐灌頂一下.
先看看 ae.h 設計
/* A simple event-driven programming library. Originally I wrote this code * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated * it in form of a library for easy reuse. * * Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com> * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef __AE_H__ #define __AE_H__ #include <time.h> #define AE_OK 0 #define AE_ERR -1 #define AE_NONE 0 /* No events registered. */ #define AE_READABLE 1 /* Fire when descriptor is readable. */ #define AE_WRITABLE 2 /* Fire when descriptor is writable. */ #define AE_BARRIER 4 /* With WRITABLE, never fire the event if the READABLE event already fired in the same event loop iteration. Useful when you want to persist things to disk before sending replies, and want to do that in a group fashion. */ #define AE_FILE_EVENTS 1 #define AE_TIME_EVENTS 2 #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) #define AE_DONT_WAIT 4 #define AE_CALL_AFTER_SLEEP 8 #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 /* Macros */ #define AE_NOTUSED(V) ((void) V) struct aeEventLoop; /* Types and data structures */ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; /* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *prev; struct aeTimeEvent *next; } aeTimeEvent; /* A fired event */ typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent; /* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop; /* Prototypes */ aeEventLoop *aeCreateEventLoop(int setsize); void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); int aeGetFileEvents(aeEventLoop *eventLoop, int fd); long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); int aeProcessEvents(aeEventLoop *eventLoop, int flags); int aeWait(int fd, int mask, long long milliseconds); void aeMain(aeEventLoop *eventLoop); char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); #endif
很多朋友首次看, 或者第一次手寫完畢 ae.h 結構設計文件, 印象里 60% 是模糊不可描述 ~ 也許大致知
道這巨集有點感覺應該是和 IO Event 事件有關吧 ...
我這裡先稍微要劇透點, 帶大家快速瞭解這個庫的結構設計的意圖. C 先看結構, 比先看介面設計更容
易獲取到核心信息. 上面代碼中最重要四個結構分別是
aeFileEvent, aeTimeEvent, aeFiredEvent, aeEventLoop
aeFileEvent 是文件描述符 Event, 註冊在 aeEventLoop 中, 當觸發後會生成事件結構 aeFiredEvent,
用於後續處理. aeTimeEvent 是 timer Event 同樣註冊在 aeEventLoop 中用於觸發定時事件. (太懶,
懶畫圖, 有興趣朋友可以自行理解畫出好理解的圖) 對於 aeEventLoop 內部欄位的設計, 先不劇透了.
後面正文部分會討論一些.
前言 - 底層解密
ae 文件整體結構如下
很清晰的看出 epoll, evport, kqueue, select IO 復用的核心包裝. 但寫完整個 ae.c 發現對其設計影響
最深可能就是 ae_select.c 中相容 select 思路.
/* Select()-based ae.c module. * * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include <sys/select.h> #include <string.h> typedef struct aeApiState { fd_set rfds, wfds; /* We need to have a copy of the fd sets as it's not safe to reuse * FD sets after select(). */ fd_set _rfds, _wfds; } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; FD_ZERO(&state->rfds); FD_ZERO(&state->wfds); eventLoop->apidata = state; return 0; } static int aeApiResize(aeEventLoop *eventLoop, int setsize) { /* Just ensure we have enough room in the fd_set type. */ if (setsize >= FD_SETSIZE) return -1; return 0; } static void aeApiFree(aeEventLoop *eventLoop) { zfree(eventLoop->apidata); } static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; if (mask & AE_READABLE) FD_SET(fd,&state->rfds); if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds); return 0; } static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; if (mask & AE_READABLE) FD_CLR(fd,&state->rfds); if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds); } static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; } static char *aeApiName(void) { return "select"; }
作者實現這個 select 思路不是很好, 他把 ae_select.c 當做局部文件去設計, 沒有想拆出來獨擋一面.
其次對於 select 的第四個參數 error fds 集合沒有處理(ae_epoll.c 中 EPOLLHUB 和 EPOLLERR 是
處理). 實現層面 aeApiPoll 也不夠好, 推薦採用下麵實現
#include "ae.h" #include <string.h> #include <sys/select.h> static int aeApiPoll(aeEventLoop * eventLoop, struct timeval * tvp) { aeApiState * state = eventLoop->apidata; int retval, j, numevents = 0; memcpy(&state->_rfds, &state->rfds, sizeof(fd_set)); memcpy(&state->_wfds, &state->wfds, sizeof(fd_set)); retval = select(eventLoop->maxfd+1, &state->_rfds, &state->_wfds, NULL, tvp); for (j = 0; j <= eventLoop->maxfd && numevents < retval; j++) { int mask = AE_NONE; aeFileEvent * fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j, &state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j, &state->_wfds)) mask |= AE_WRITABLE; if (mask == AE_NONE) continue; eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } return numevents; }
降低不需要處理的 AE_NONE 空事件. 隨後的 epoll kqueue 都差不多(evport 不熟, 有心的朋友也別看)
正文 - 細節點撥
整體看 ae 事件模型設計, 還是有些簡陋的. 我猜測是 redis 重IO和記憶體操作, 對很多文件描述符需求
較固定, 一個文件描述符多數自始至終. 應對的場景不是那種大量的創建, 交互, 關閉. 所以整體設計也能
接受.
1. setsize maxfd event fired 到底想表達什麼?
#include <time.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <poll.h> #include <unistd.h> #include <sys/time.h> #include <sys/types.h> #include "ae.h" #include "config.h" #include "zmalloc.h" /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_epoll.c" #else #include "ae_select.c" #endif #endif #endif aeEventLoop * aeCreateEventLoop(int setsize) { aeEventLoop * eventLoop; int i; if (!(eventLoop = zmalloc(sizeof(*eventLoop)))) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (!eventLoop->events || !eventLoop->fired) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
有心的同學可以關註 eventLoop->events 和 eventLoop->fired zmalloc 這塊, 這基本已經
把之前的 ae_select.c ae_epoll.c ae_kqueue.c ... 串起來了. 分別用於存要監控的事件和有變動的事件.
對於 setsize 也是個看點我們分別看 server.c server.h config.c 局部代碼
[server.c] server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); [server.h] #define CONFIG_MIN_RESERVED_FDS 32 /* When configuring the server eventloop, we setup it so that the total number * of file descriptors we can handle are server.maxclients + RESERVED_FDS + * a few more to stay safe. Since RESERVED_FDS defaults to 32, we add 96 * in order to make sure of not over provisioning more than 128 fds. */ #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) [config.c] /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
可以看出來 setsize 分為兩部分, 一分部分是配置的, 預設是 10000; 另外一部分是預留 128個.
(128 分為兩部分 CONFIG_MIN_RESERVED_FDS = 32 + 96, 前者是 redis fd 保留的最少個數)
和上面 aeCreateEventLoop 相似的功能有 aeResizeSetSize
/* Resize the maximum set size of the event loop. * If the requested set size is smaller than the current set size, but * there is already a file descriptor in use that is >= the requested * set size minus one, AE_ERR is returned and the operation is not * performed at all. * * Otherwise AE_OK is returned and the operation is successful. */ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { int i; if (setsize == eventLoop->setsize) return AE_OK; if (eventLoop->maxfd >= setsize) return AE_ERR; if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR; eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize); eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize); eventLoop->setsize = setsize; /* Make sure that if we created new slots, they are initialized with * an AE_NONE mask. */ for (i = eventLoop->maxfd+1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return AE_OK; }
透過這兩個函數希望你對 aeEventLoop 中 setsize maxfd event fired 這些欄位能瞭解透徹.
2. aeTimeEvent 怎麼用, 怎麼設計的?
redis 中 timer Event 設計比較簡單, 單純的無序時間鏈表. 下麵這段作者意圖表達的很清晰.
/* Search the first timer to fire. * This operation is useful to know how many time the select can be * put in sleep without to delay any event. * If there are no timers NULL is returned. * * Note that's O(N) since time events are unsorted. * Possible optimizations (not needed by Redis so far, but...): * 1) Insert the event in order, so that the nearest is just the head. * Much better but still insertion or deletion of timers is O(N). * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). */ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; while(te) { if (!nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; } return nearest; }
而其中到底怎麼跑起來的呢, 我截取 processTimeEvents 中部分代碼, 幫讀者瞭然於心
/* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); ... { ... aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { te->id = AE_DELETED_EVENT_ID; } } ... } ... return processed; }
從 retval = te->timeProc -> if 那段. 對於 id 打標為 AE_DELETED_EVENT_ID 標識輪循到的時候要刪除.
一旦 retval != AE_NOMORE 就再次修改這個timer Event 相關時間, 方便下次接著跑. 同樣我們抽一個例
子出來, 同樣核心也在 server.c 中
[server.c] /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } [server.c] int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ... return 1000/server.hz; }
整體看他這個 timer Event 很騷, 返回毫秒時間後, 繼續註入進去, 繼續當迴圈輪循定時器事件使用
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) { long cur_sec, cur_ms, when_sec, when_ms; aeGetTime(&cur_sec, &cur_ms); when_sec = cur_sec + milliseconds/1000; when_ms = cur_ms + milliseconds%1000; if (when_ms >= 1000) { when_sec ++; when_ms -= 1000; } *sec = when_sec; *ms = when_ms; }
設計的思路挺巧妙的. 多數正常思路通過類型特殊處理, 或者特殊地方再次主動註冊.
3. EventLoop 是怎麼跑的?
EventLoop 奔跑思路很簡單, 一個地方輪循, 內部先跑 aeFileEvent, 再跑 aeTimeEvent
[ae.c] void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } [server.c] int main(int argc, char **argv) { ... aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; } /* The End */
整體而言 redis ae 模型還是非常簡單, 處理的這些的事情完全是為 redis io 定製的. 夠用了.
後續有機會我再大家分析 redis 中特定的 socket io 是怎麼處理的.
後記 - 為愛展望
❤ 錯誤是難免的, 歡迎有心同學指正和補充圖, 文字是乾癟的 ~
Here We Are Again - https://music.163.com/#/song?id=27876900