1、serverCron簡介 在 Redis 中, 常規操作由 redis.c/serverCron 實現, 它主要執行以下操作 /* This is our timer interrupt, called server.hz times per second. * Here is where we
1、serverCron簡介
在 Redis 中, 常規操作由 redis.c/serverCron
實現, 它主要執行以下操作
/* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: * * - Active expired keys collection (it is also performed in a lazy way on * lookup). * - Software watchdog. * - Update some statistic. * - Incremental rehashing of the DBs hash tables. * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. * - Clients timeout of different kinds. * - Replication reconnection. * - Many more... * * Everything directly called here will be called server.hz times per second, * so in order to throttle execution of things we want to do less frequently * a macro is used: run_with_period(milliseconds) { .... } */
- 更新伺服器的各類統計信息,比如時間、記憶體占用、資料庫占用情況等
- 清理資料庫中的過期鍵值對
- 對不合理的資料庫進行大小調整
- 關閉和清理連接失效的客戶端
- 嘗試進行 AOF 或 RDB 持久化操作
- 如果伺服器是主節點的話,對附屬節點進行定期同步
- 如果處於集群模式的話,對集群進行定期同步和連接測試
Redis 將 serverCron
作為時間事件來運行, 從而確保它每隔一段時間就會自動運行一次, 又因為 serverCron
需要在 Redis 伺服器運行期間一直定期運行, 所以它是一個迴圈時間事件: serverCron
會一直定期執行,直到伺服器關閉為止。
在 Redis 2.6 版本中, 程式規定 serverCron
每秒運行 10
次, 平均每 100
毫秒運行一次。 從 Redis 2.8 開始, 用戶可以通過修改 hz
選項來調整 serverCron
的每秒執行次數, 具體信息請參考 redis.conf
文件中關於 hz
選項的說明,也叫定時刪除,這裡的“定期”指的是Redis定期觸發的清理策略,由位於src/redis.c的activeExpireCycle(void)函數來完成。
serverCron是由redis的事件框架驅動的定位任務,這個定時任務中會調用activeExpireCycle函數,針對每個db在限制的時間REDIS_EXPIRELOOKUPS_TIME_LIMIT內遲可能多的刪除過期key,之所以要限制時間是為了防止過長時間 的阻塞影響redis的正常運行。這種主動刪除策略彌補了被動刪除策略在記憶體上的不友好。
因此,Redis會周期性的隨機測試一批設置了過期時間的key併進行處理。測試到的已過期的key將被刪除。典型的方式為,Redis每秒做10次如下的步驟:
- 隨機測試100個設置了過期時間的key
- 刪除所有發現的已過期的key
- 若刪除的key超過25個則重覆步驟1
這是一個基於概率的簡單演算法,基本的假設是抽出的樣本能夠代表整個key空間,redis持續清理過期的數據直至將要過期的key的百分比降到了25%以下。這也意味著在任何給定的時刻已經過期但仍占據著記憶體空間的key的量最多為每秒的寫操作量除以4.
Redis-3.0.0中的預設值是10,代表每秒鐘調用10次後臺任務。
除了主動淘汰的頻率外,Redis對每次淘汰任務執行的最大時長也有一個限定,這樣保證了每次主動淘汰不會過多阻塞應用請求,以下是這個限定計算公式:
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ ... timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
hz調大將會提高Redis主動淘汰的頻率,如果你的Redis存儲中包含很多冷數據占用記憶體過大的話,可以考慮將這個值調大,但Redis作者建議這個值不要超過100。我們實際線上將這個值調大到100,觀察到CPU會增加2%左右,但對冷數據的記憶體釋放速度確實有明顯的提高(通過觀察keyspace個數和used_memory大小)。
可以看出timelimit和server.hz是一個倒數的關係,也就是說hz配置越大,timelimit就越小。換句話說是每秒鐘期望的主動淘汰頻率越高,則每次淘汰最長占用時間就越短。這裡每秒鐘的最長淘汰占用時間是固定的250ms(1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/100),而淘汰頻率和每次淘汰的最長時間是通過hz參數控制的。
從以上的分析看,當redis中的過期key比率沒有超過25%之前,提高hz可以明顯提高掃描key的最小個數。假設hz為10,則一秒內最少掃描200個key(一秒調用10次*每次最少隨機取出20個key),如果hz改為100,則一秒內最少掃描2000個key;另一方面,如果過期key比率超過25%,則掃描key的個數無上限,但是cpu時間每秒鐘最多占用250ms。
當REDIS運行在主從模式時,只有主結點才會執行上述這兩種過期刪除策略,然後把刪除操作”del key”同步到從結點。
二、serverCron函數
serverCron函數的三個參數,在函數內部都沒有被使用,會有警告出來,所以使用REDIS_NOTUSED去除,不使用,為什麼還傳遞這三個參數呢?
一個特殊的巨集
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
這個巨集類似於條件判斷,每ms時間執行一次後續的操作。如:
run_with_period(100) trackOperationsPerSecond();
每百微秒,執行一次跟蹤操作函數,記錄這段時間的命令執行情況
1、如果設置了watchdog_period,那麼每過watchdog_period,都會發送sigalrm信號,該信號又會得到處理,來記錄此時執行的命令。這個過程主要是為了瞭解一些過長命令的執行影響伺服器的整體運行,是一個debug過程
/* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
2、每百微秒記錄過去每秒的命令執行情況
/* Update the time cache. */ updateCachedTime(); run_with_period(100) { trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(REDIS_METRIC_NET_INPUT, server.stat_net_input_bytes); trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT, server.stat_net_output_bytes); }
3、更新統計變數,如記憶體使用總數,更新server.lruclock,getLRUClock()函數如下
return (mstime()/REDIS_LRU_CLOCK_RESOLUTION) & REDIS_LRU_CLOCK_MAX;
大概是380天的樣子,lruclock每380天一個輪迴
4、是否得到關閉程式的信號,如果是,就進入關閉程式的節奏,如aof,rdb文件的處理,文件描述符的關閉等;如果之前收到了 SIGTERM 信號,並不會立即做什麼事情,只是將server.shutdown_asap 置位,這裡判斷shutdown_asap , 調用prepareForShutdown ,關閉伺服器,退出執行。但是如果沒有退出成功,就不退出了,列印Log,然後移除標誌位。
5、每5秒輸出一次redis資料庫的使用情況,連接數,總鍵值數
6、每次都嘗試resize每個db,resize是讓每個db的dict結構進入rehash狀態,rehash是為了擴容dict或者縮小dict。然後每次都嘗試執行每個db的rehash過程一微秒
7、每次調用clientCron,這是一個對server.clients列表進行處理的過程。在每次執行clientCron時,會對server.clients進行迭代,並且保證 1/(REDIS_HZ*10) 的客戶端每次調用。也就是每次執行clientCron,如果clients過多,clientCron不會遍歷所有clients,而是遍歷一部分clients,但是保證每個clients都會在一定時間內得到處理。處理過程主要是檢測client連接是否idle超時,或者block超時,然後會調整每個client的緩衝區大小
8、對aof,rdb等過程進行開啟或終結
9、如果是master節點的話,就開始對過期的鍵值進行處理,與處理clients類似,不是所有有時間限制的鍵值進行迭代,而是在一個限定的數量內迭代一部分,保證一定時間內能檢測所有鍵值
10、對非同步io過程中可能需要關閉的clients進行處理
11、每秒調用複製線程和集群線程,每0.1秒調用哨兵線程
aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el);
在每次ae迴圈進入阻塞時,都會先執行beforeSleep(),在該函數中,會對unblock的clients(指使用blpop等阻塞命令的clients)進行處理,並且執行fsync函數,同步記憶體到磁碟上
附上源代碼
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ updateCachedTime(); run_with_period(100) { trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(REDIS_METRIC_NET_INPUT, server.stat_net_input_bytes); trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT, server.stat_net_output_bytes); } /* We have just REDIS_LRU_BITS bits per object for LRU information. * So we use an (eventually wrapping) LRU clock. * * Note that even if the counter wraps it's not a big problem, * everything will still work but some object will appear younger * to Redis. However for this to happen a given object should never be * touched for all the time needed to the counter to wrap, which is * not likely. * * Note that you can change the resolution altering the * REDIS_LRU_CLOCK_RESOLUTION define. */ server.lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used_memory(); /* Sample the RSS here since this is a relatively slow call. */ server.resident_set_size = zmalloc_get_rss(); /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (server.shutdown_asap) { if (prepareForShutdown(0) == REDIS_OK) exit(0); redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; } /* Show some info about non-empty databases */ run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } } } /* Show information about connected clients */ if (!server.sentinel_mode) { run_with_period(5000) { redisLog(REDIS_VERBOSE, "%lu clients connected (%lu slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } } /* We need to do a few operations on clients asynchronously. */ clientsCron(); /* Handle background operations on Redis databases. */ databasesCron(); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); if (pid == -1) { redisLog(LOG_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); } else { redisLog(REDIS_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } updateDictResizePolicy(); } } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now */ for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > REDIS_BGSAVE_RETRY_DELAY || server.lastbgsave_status == REDIS_OK)) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveBackground(server.rdb_filename); break; } } /* Trigger an AOF rewrite if needed */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } } /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); /* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * an higher frequency. */ run_with_period(1000) { if (server.aof_last_write_status == REDIS_ERR) flushAppendOnlyFile(0); } /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); /* Clear the paused clients flag if needed. */ clientsArePaused(); /* Don't check return value, just use the side effect. */ /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ run_with_period(1000) replicationCron(); /* Run the Redis Cluster cron. */ run_with_period(100) { if (server.cluster_enabled) clusterCron(); } /* Run the Sentinel timer if we are in sentinel mode. */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } /* Cleanup expired MIGRATE cached sockets. */ run_with_period(1000) { migrateCloseTimedoutSockets(); } server.cronloops++; return 1000/server.hz; }
參考鏈接
http://www.tuicool.com/articles/vQNZJb3
http://olylakers.iteye.com/blog/1288040
http://unasm.com/2015/07/394/
http://www.cnblogs.com/liuhao/archive/2012/06/06/2538751.html
http://ifeve.com/redis-eventlib/
http://www.wzxue.com/%E8%A7%A3%E8%AF%BBredis%E8%BF%90%E8%A1%8C%E6%A0%B8%E5%BF%83%E5%BE%AA%E7%8E%AF%E8%BF%87%E7%A8%8B/