hiredis 是 redis 的client端C語言 lib, hiredis擁有同步和非同步的API, 非同步API的實現有多種方法,分別依賴libev, libevent, libuv, ae等等,其中ae是redis內部實現的一個非同步事件處理模塊。 稍微修改了hiredis的example-ae
hiredis 是 redis 的client端C語言 lib, hiredis擁有同步和非同步的API, 非同步API的實現有多種方法,分別依賴libev, libevent, libuv, ae等等,其中ae是redis內部實現的一個非同步事件處理模塊。 稍微修改了hiredis的example-ae.c代碼:在一個線程裡面迴圈10次執行命令ping, 檢查redisserver, 如下所示, 線程發完10次ping後,調用disconnect, 發現aeMain函數並未退出,程式一直阻塞住.
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <signal.h> #include <thread> #include<functional> #ifdef __cplusplus extern "C"{ #endif #include <hiredis.h> #include <async.h> #include <adapters/ae.h> #ifdef __cplusplus } #endif /* Put event loop in the global scope, so it can be explicitly stopped */ static aeEventLoop *loop; void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; if (reply == NULL) return; printf("argv[%s]: %s\n", (char*)privdata, reply->str); } void connectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); aeStop(loop); return; } printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); aeStop(loop); return; } printf("Disconnected...\n"); aeStop(loop); } void quitConnCallBack(redisAsyncContext *c, void *r, void *privdata) { printf("quit"); redisAsyncDisconnect(c); } void testThreadLoop(void * p) { static int num = 10; char c11[64]; strcpy(c11, "test"); while(1) { std::this_thread::sleep_for(std::chrono::milliseconds(1500)); num--; if (num < 0) { //在這裡調用disconnect, 並不能使aeMain退出 redisAsyncDisconnect((redisAsyncContext *)p); //正確做法,應該調用如下 //redisAsyncCommand((redisAsyncContext *)p, quitConnCallBack, c11, "quit"); printf("exit\n"); return; } redisAsyncCommand((redisAsyncContext *)p, getCallback, c11, "ping"); } } int main (int argc, char **argv) { signal(SIGPIPE, SIG_IGN); redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); if (c->err) { /* Let *c leak for now... */ printf("Error: %s\n", c->errstr); return 1; } loop = aeCreateEventLoop(64); redisAeAttach(loop, c); redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); std::thread t(testThreadLoop, c); t.detach(); aeMain(loop); return 0; }首先檢查下兩個主要函數aeStop, aeMain的邏輯: aeStop, aeMain函數代碼如下:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } void aeStop(aeEventLoop *eventLoop) { eventLoop->stop = 1; }1. stop分析 aeStop僅設置stop標誌為true, aeMain裡面在一直迴圈處理事件,第一印象是,直接設了stop為true後,aeMain在處理完事件後,跳出aeProcessEvents函數後,檢查stop為true就會跳出while迴圈。但是事實是aeMain並未跳出迴圈,難道因為是不同線程間操作,要將stop設置為volatile類型?嘗試修改了stop為volatile int類型,測試結果:aeMain 仍然未推出,程式阻塞,無法推出。
2.aeProcessEvents分析 這時就只能推測由於aeProcessEvents沒有退出,導致aeMain執行無法檢測stop值,分析該函數,推測可能阻塞在aeApiPoll函數,同時發現tvp變數是個NULL, 查看aeApiPoll代碼(ae_epoll.c),如下
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { //... //tvp 會是NULL, 推測阻塞在aeApiPoll, 查看aeApiPoll代碼進行證實 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ } static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; //真相在這邊,epoll_wait, 第三個參數為-1, epoll_wait將一直等待下去! retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
整理下aeMain的流程如下圖所示,
我們的disconnect回調, 內部調用 aeStop函數,如果剛好發生在processEvents之後,aeMain檢查stop值之前,那麼就沒問題,當然這種概率極其小,如果這都中了,那可以買彩票了~~,現在我們知道aestop調用是有立即生效的限制範圍,我們最好在processEvents的時候,判斷是否應該退出aeMain, 如果是就調用aeStop. processEvents內部會調用到我們外部定義的各種命令的回調函數, 剛好redis有個quit的命令(讓redisserver關閉連接), 我們就增加一個quit命令回調函數調用aeStop: redisAsyncCommand((redisAsyncContext *)p, quitConnCallBack, c11, "quit");
void quitConnCallBack(redisAsyncContext *c, void *r, void *privdata) { printf("quit"); redisAsyncDisconnect(c); }