Redis持久化技術淺析

来源:https://www.cnblogs.com/freeweb/archive/2022/07/22/16506392.html
-Advertisement-
Play Games

Redis是一種記憶體資料庫,數據都存儲在記憶體中,因此可以快速地直接基於記憶體中的數據結構進行高性能的操作,但是所有數據都在記憶體中,一旦伺服器宕機,記憶體中的數據就會全部丟失,數據將無法恢復,因此Redis也有自己的持久化機制,但是要註意這個持久化和普通資料庫的持久化不同,持久化文件必須全部讀取到記憶體才可 ...


Redis是一種記憶體資料庫,數據都存儲在記憶體中,因此可以快速地直接基於記憶體中的數據結構進行高性能的操作,但是所有數據都在記憶體中,一旦伺服器宕機,記憶體中的數據就會全部丟失,數據將無法恢復,因此Redis也有自己的持久化機制,但是要註意這個持久化和普通資料庫的持久化不同,持久化文件必須全部讀取到記憶體才可以使用,而不是按需載入,同時後續會將最新的修改寫入到磁碟。

Redis持久化有兩種機制,分別是:AOF(Append Only File)和RDB(Redis Database)。

1.持久化全局入口

以Redis 5.0的源碼進行分析,入口在server.c代碼中,在main函數中會調用server初始化:

// https://github.com/redis/redis/blob/5.0/src/server.c

void initServer(void) {
    // ...
    server.hz = server.config_hz;
    // ...
    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    // 添加定時任務事件回調
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    
    // ...
}

int main(int argc, char **argv) {
    // ...
    server.supervised = redisIsSupervised(server.supervised_mode);
    int background = server.daemonize && !server.supervised;
    if (background) daemonize();

    // Server初始化
    initServer();
    if (background || server.pidfile) createPidFile();
    redisSetProcTitle(argv[0]);
    redisAsciiArt();
    checkTcpBacklogSettings();
    
    // ...
    
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

首先在main函數中調用initServer()進行服務初始化,初始化內容包括:信號監聽、DB初始化、添加各類回調任務等,看到其中添加了serverCron這個回調函數,這裡面就負責持久化相關的實現,具體的調用頻次是依賴於server.config_hz的配置,在redis.conf中有相關的配置:

# Redis calls an internal function to perform many background tasks, like
# closing connections of clients in timeout, purging expired keys that are
# never requested, and so forth.
#
# Not all tasks are performed with the same frequency, but Redis checks for
# tasks to perform according to the specified "hz" value.
#
# By default "hz" is set to 10. Raising the value will use more CPU when
# Redis is idle, but at the same time will make Redis more responsive when
# there are many keys expiring at the same time, and timeouts may be
# handled with more precision.
#
# The range is between 1 and 500, however a value over 100 is usually not
# a good idea. Most users should use the default of 10 and raise this up to
# 100 only in environments where very low latency is required.
hz 10

這個值預設是10,也就是說每秒會執行10次後臺任務,也就是每間隔100ms執行1次,如果提高這個值的設置會使空閑時CPU的占用更高,如果需要更低的延遲可以將參數適當調大,但是不要超過100,hz的範圍被限制在[1, 500]

具體的事件驅動是由專門的非同步庫來封裝,上面調用到的aeCreateTimeEventasMain都在ae.c中進行了封裝:

// https://github.com/redis/redis/blob/5.0/src/ae.c

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

// ...
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            // 由server.c定義回調, 用於前置準備文件描述符
            eventLoop->beforesleep(eventLoop);
        // 事件處理
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
// ...

Redis在aeMain中不斷迴圈進行事件處理,這裡底層使用的非同步庫分別為:evport、epoll、kqueue、select這幾個,其中evport屬於Solaris 10平臺,然後epoll屬於linux平臺,kqueue屬於BSD和OS X平臺,最後的選擇是select方式,其中evport/epoll/kqueue的複雜度都是O(1),select基於描述符掃描,複雜度是O(n)。

然後再回到server.c中重點來看一下serverCron函數的邏輯:

// server.h 靜態定義
#define CONFIG_DEFAULT_DYNAMIC_HZ 1             /* Adapt hz to # of clients.*/
#define CONFIG_DEFAULT_HZ        10             /* Time interrupt calls/sec. */
#define CONFIG_MIN_HZ            1
#define CONFIG_MAX_HZ            500
#define MAX_CLIENTS_PER_CLOCK_TICK 200          /* HZ is adapted based on that. */


#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
// ...

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    server.hz = server.config_hz;
    /* Adapt the server.hz value to the number of configured clients. If we have
     * many clients, we want to call serverCron() with an higher frequency. */
    // 動態調整任務處理頻率
    if (server.dynamic_hz) {
        while (listLength(server.clients) / server.hz >
               MAX_CLIENTS_PER_CLOCK_TICK)
        {
            server.hz *= 2;
            if (server.hz > CONFIG_MAX_HZ) {
                server.hz = CONFIG_MAX_HZ;
                break;
            }
        }
    }
    
    // ...
    /* We need to do a few operations on clients asynchronously. */
    clientsCron();

    /* Handle background operations on Redis databases. */
    databasesCron();

    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        int statloc;
        pid_t pid;

        // WNOHANG 非阻塞  WUNTRACED 表示當進程收到SIGTTIN, SIGTTOU, SIGSSTP, SIGTSTOP時也會返回
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == -1) {
                serverLog(LL_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else {
                if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
            }
            updateDictResizePolicy();
            closeChildInfoPipe();
        }
    } else {
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now. */
        for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            /* Save if we reached the given amount of changes,
             * the given amount of seconds, and if the latest bgsave was
             * successful or if, in case of an error, at least
             * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }

        /* Trigger an AOF rewrite if needed. */
        if (server.aof_state == AOF_ON &&
            server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
    }


    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }
    
    // ...
    server.cronloops++;
    return 1000/server.hz;
}

有幾個重點需要解釋一下:

  1. 最上面的動態調整任務頻率的邏輯是,如果配置文件中dynamic-hz配置項打開表示自動根據客戶端數量自動調整任務的頻率,這個配置項預設是開啟的,如果客戶端的數量除以當前的hz值大於MAX_CLIENTS_PER_CLOCK_TICK的值,會將實際的任務執行頻率變成之前的兩倍,直到不符合while條件或者大於CONFIG_MAX_HZ的值都會退出迴圈,其中MAX_CLIENTS_PER_CLOCK_TICK的值是200,CONFIG_MAX_HZ的值是500。客戶端數量除以當前配置的頻率,表示每個客戶端需要多少次周期才可以有一次被處理的機會,如果客戶端數量太多導致平均大於200個周期才可以處理,會導致響應過慢所以這個時候將當前的處理頻率加倍,但是如果超過500又會導致CPU占用比較高,因此最高會將頻率調整為500,從而保證客戶端的響應的實時性。
  2. 如果此時由用戶請求重寫AOF文件並且此時也沒有正在執行的AOF或RBD持久化進程在運行,則會啟動重寫任務。
  3. 然後就到了比較核心的持久化邏輯部分,如果此時正在有持久化任務在執行中或者存在腳本沒有執行完,那麼則獲取子進程的狀態用於資源的回收,否則將判斷是否達到持久化的條件,從而後臺執行持久化的任務。

主要分析下持久化的判斷部分,調用wait3函數除了獲取子進程的狀態還可以獲得子進程的資源信息,由rusage結構體指針帶出來,參數WNOHANG表示wait no hang,主進程不會阻塞等待子進程而是會馬上返回,如果子進程都處於正常運行狀態,直接返回0,上面的邏輯都會跳出,如果返回的不是0說明子進程執行完了,子進程執行完之後如果主進程還存在並且沒有顯示調用wait相關的函數,那麼子進程的狀態會變為defunct狀態而成為僵屍進程,Redis會在執行下一個周期任務時再次進來拿到進程狀態,如果返回的pid和RDB或者AOF子進程的pid一致,則會執行相關的回收工作,也就是backgroundSaveDoneHandler或者backgroundRewriteDoneHandler操作,主要是做一些狀態的設置,最後會執行updateDictResizePolicy函數開啟rehash操作。

反過來如果此時沒有運行任何的持久化任務,就進入else分支,遍歷相應的配置參數,如果滿足key的修改個數和時間的限制則優先執行RDB持久化的任務,然後判斷如果開啟AOF並且此時沒有其他任務運行,且滿足當前的文件大小大於最小的重寫大小閾值則出發AOF的重寫。最小的限制由配置文件中的auto-aof-rewrite-min-size配置,預設是64M,滿足條件也不一定進行重寫,而是將當前大小和基準大小進行比較,當比基準大小大1倍以上時才觸發重寫,基準大小在啟動Redis服務時被設置為AOF文件的初始大小並且在每一次重寫完成後更新為重寫後的大小,具體可以參考aof.cbackgroundRewriteDoneHandlerloadAppendOnlyFile這兩個函數的源碼。

最後上面的run_with_period函數表示多個周期執行1次的意思,具體可以參考server.h中定義的巨集,如果設置的值小於周期,也就是每個周期都執行,否則會用設置時間除以周期時間,得到餘數,餘數是0時則執行一次,也就是指定周期個數執行一次,具體迴圈通過cronloops變數來計數。

然後就要進入到具體的持久化邏輯中了,下麵主要來分析一下RDB和AOF持久化的大致過程。

2.RDB持久化

RDB持久化是Redis首選的預設持久化方式,通常我們叫做記憶體快照,表示記憶體中的數據在某一個時刻的狀態記錄,執行RDB持久化就是將當前記憶體中的數據寫入到磁碟的過程,當Redis重新啟動時,會從快照中恢複數據,RDB是比較緊湊的存儲格式,寫入和恢復速度都比較快,但是每一次持久化都是全量的數據寫入,所以當數據規模越大的時候,寫入的RDB文件也越大,磁碟寫入的開銷也會變大,所以要配置合適的參數在適當的時候執行持久化,避免頻繁的持久化操作。

另外由於Redis是單線程的,如果在主線程中執行持久化必然會帶來線程的阻塞,所以自動的持久化操作是採用fork一個子進程的方式來完成,這樣不會影響主進程的運行,另外Redis還提供兩個命令用於手動進行持久化,分別是savebgsave,其中save是同步方式執行,會阻塞其他所有的操作,所以幾乎不怎麼使用,在某些極端情況下例如當Linux系統進程耗盡的時候為了保存數據可能會用到,通常bgsave命令用的會比較多一些,這個和後臺自動的RDB持久化操作是一樣的,只是以手動的方式觸發。

和RDB相關的持久化配置如下:

save 900 1
save 300 10
save 60 10000

stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir ./

其中save配置決定持久化的時機,例如預設情況下,900s變動的key超過1個則進行持久化,300s變動的key超過10個則進行持久化,60s變動的key超過10000個則進行持久化,多個持久化的條件是或的關係,只要1個條件觸發就會執行,如果想關閉RDB持久化則可以註釋掉所有的指令或者配置為空:

save ""

這樣也就關閉了RDB持久化。

RDB持久化的操作在函數rdbSaveBackground中,大致的源碼如下:

int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);
    openChildInfoPipe();

    start = ustime();
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        closeClildUnusedResourceAfterFork();
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        }
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            closeChildInfoPipe();
            server.lastbgsave_status = C_ERR;
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}


void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}

可以看到進入rdbSaveBackground函數後,首先執行fork調用開闢1個子進程用於執行持久化的操作,父進程主要是執行了updateDictResizePolicy將全局哈希表的rehash關閉,就直接返回了,然後子進程會修改進程名為redis-rdb-bgsave然後進入rdbSave函數:

// server.h
#define REDIS_AUTOSYNC_BYTES (1024*1024*32) /* fdatasync every 32MB */

/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp;
    rio rdb;
    int error = 0;

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }

    rioInitWithFile(&rdb,fp);

    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }

    serverLog(LL_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

// https://github.com/redis/redis/blob/5.0/src/rio.c
void rioInitWithFile(rio *r, FILE *fp) {
    *r = rioFileIO;
    r->io.file.fp = fp;
    r->io.file.buffered = 0;
    r->io.file.autosync = 0;
}

// 設置自動提交
void rioSetAutoSync(rio *r, off_t bytes) {
    serverAssert(r->read == rioFileIO.read);
    r->io.file.autosync = bytes;
}

/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
    size_t retval;

    retval = fwrite(buf,len,1,r->io.file.fp);
    r->io.file.buffered += len;

    if (r->io.file.autosync &&
        r->io.file.buffered >= r->io.file.autosync)
    {
        fflush(r->io.file.fp);
        redis_fsync(fileno(r->io.file.fp));
        r->io.file.buffered = 0;
    }
    return retval;
}

首先Redis子進程創建了一個臨時文件,名為:temp-<pid>.rdb,然後調用rioInitWithFile初始化了rio,這個是Redis自己封裝的IO庫,然後如果在redis.conf中開啟了rdb-save-incremental-fsync配置則會啟動自動刷盤,預設這個參數是開啟的,每次寫入位元組的大小由巨集REDIS_AUTOSYNC_BYTES定義,為32M,在rioFileWrite可以看到當寫入位元組數大於autosync的值時,會執行flush操作將數據寫入到磁碟,預設文件是先寫入操作系統緩存中,刷盤時機不確定,開啟自動刷新後一方面可以提高數據的可靠性,另一方面也可以避免最終刷盤帶來的性能開銷。

然後會調用rdbSaveRio執行具體的數據持久化操作,最終執行完畢後會將臨時文件重命名為dump.rdb,這裡rename調用是原子性的。另外Redis執行異常處理的技巧是將資源關閉操作定義為一個label也就是werr,然後在遇到錯誤時通過goto統一跳轉執行,這是在C中異常處理的常用模式。

然後大致看一下rdbSaveRio執行的操作:

// rdb.h
#define RDB_VERSION 9
#define RDB_SAVE_NONE 0

int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);

        /* Write the SELECT DB opcode */
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
         * is currently the largest type we are able to represent in RDB sizes.
         * However this does not limit the actual size of the DB to load since
         * these sizes are just hints to resize the hash tables. */
        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

            /* When this RDB is produced as part of an AOF rewrite, move
             * accumulated diff from parent to child while rewriting in
             * order to have a smaller final write. */
            if (flags & RDB_SAVE_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                processed = rdb->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    /* If we are storing the replication information on disk, persist
     * the script cache as well: on successful PSYNC after a restart, we need
     * to be able to process any EVALSHA inside the replication backlog the
     * master will send us. */
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

    /* EOF opcode */
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
    int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
    int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;

    /* Add a few fields about the state when the RDB was created. */
    if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
    if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
    if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
    if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;

    /* Handle saving options that generate aux fields. */
    if (rsi) {
        if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
            == -1) return -1;
        if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
            == -1) return -1;
        if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
            == -1) return -1;
    }
    if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
    return 1;
}

static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
    if (rdb && rioWrite(rdb,p,len) == 0)
        return -1;
    return len;
}


首先頭部寫入了REDIS+<RDB_VERSION>的字元串,當前RDB_VERSION的值為9,也就是寫入REDIS0009,然後會執行rdbSaveInfoAuxFields寫入一些輔助信息,例如Redis的版本、位數、當前時間、記憶體占用等信息,如果查看rdb文件也可以看到頭部的一些信息:

head -c 128 dump.rdb

image-20220721104615473

然後往下會遍歷每一個庫,獲取裡面全局哈希表的Iterator,迴圈迭代,最後調用rdbSaveKeyValuePair將Key和Value保存到文件中,執行具體的保存是在rdbSaveObject函數中,這裡面做了所有類型的判斷並將其轉為位元組數組寫入。

在函數最後,會寫入checksum到文件尾部,這樣整個寫入就執行完畢並返回等待主進程的回收。

上面是RDB的大致過程,然後來總結一下:

redis-rdb

RDB持久化過程中主要的問題就是在生成快照的過程中,數據將如何進行修改的問題,快照其實就是在這一時刻的狀態,所以我們不希望快照期間快照本身的數據有變化,但是Redis主線程同時又能夠接受請求正常更新數據,更新的內容對快照子進程來說應該是不可見的,因為不希望對快照的狀態產生影響,而恰好操作系統解決了這些問題,首先bgsave進程是由主線程fork出來的,因此bgsave進程共用主線程的頁表,這點是操作系統為了提升fork的性能所做的優化,這樣不需要進行全量的記憶體拷貝,如果主線程此時來了讀操作,那麼直接讀就可以了,和子進程沒有任何影響,但是如果主線程接收到了寫操作,那麼要修改的這塊數據會在主線程中複製一份生成原來數據的副本,主線程會自動將映射指向這塊副本空間,然後執行寫操作,這時候子進程bgsave仍然是讀取到的原來的記憶體空間,所以保存快照的過程是不受影響的,這就是寫時複製(Copy-on-write)技術,在執行快照的同時,正常處理寫操作,當子進程運行完畢後,沒有引用的這部分記憶體會被釋放掉。

由於快照期間會發生數據的修改,如果兩次快照之間數據發生了變化,第二次快照還沒有執行伺服器就掛掉了,那麼這個時候數據仍然會出現丟失的情況,但是如果頻繁的執行全量快照,會給磁碟帶來巨大的壓力,在極端情況下如果持久化過程中執行頻繁的寫入那麼主線程和子進程的記憶體可能完全不一樣了,記憶體最高占用可以到原來的2倍,在生產環境配置時要調整好自動快照的頻率,在性能和可靠性上做一個平衡,Redis也考慮到的頻繁執行全量快照的情況,所以在代碼中限制在bgsave子進程執行的過程中是無法啟動第二個bgsave進程的。

2.AOF

AOF持久化其實是一種類似日誌的形式,會將所有執行過的命令寫入到日誌中,在恢復時讀取命令重新執行一遍就完成恢復了,這個和通常的WAL(Write Ahead Log)類似,也叫預寫日誌,也就是說在實際寫入數據前,先把數據記錄到日誌中,以便在故障時可以自動恢復,從而保證寫入的事務性。但是AOF不同的地方在於寫的時機正好反過來,可以稱之為“寫後”日誌,也就是Redis先執行命令,在記憶體中完成數據結構的操作,然後再將命令寫入日誌,那麼Redis為什麼要這麼做呢?因為考慮到解析語句會帶來額外的開銷,所以Redis寫入aof文件時並不會對命令做正確性檢查,所以如果先寫aof文件可能會寫入一條錯誤的命令,而先執行再寫,在執行階段出現問題的命令肯定就是不合法的命令,也就不會被記錄到aof文件中,這樣就避免命令解析校驗所帶來的開銷,主要就是避免記錄錯誤指令,另外就是在命令執行之後寫入日誌,不會阻塞當前客戶端的執行,也就是說客戶端不需要等待寫操作完成才繼續往下執行,只需要等待記憶體操作完之後客戶端就可以直接向下執行,所以也可以提高性能。

寫入的格式大致如下:

image-20220721172605024

*3表示3個操作符,$3表示下一個指令或者參數的長度,這樣依次類推。

AOF也存在一些問題,最明顯的就是數據丟失,例如剛執行完一個命令,還沒有來得及記錄日誌伺服器就宕機了,這時候這個已經對數據執行的修改也就丟失了,因此會帶來數據不一致的風險,如果是用作緩存沒什麼大礙,如果當做資料庫是達不到標準的。另外雖然上面提到過aof後寫的方式不會阻塞客戶端的執行,但是假如客戶端操作比較頻繁或者併發比較高,可能會出現下麵的情況:

客戶端執行 -> 記憶體操作完畢 -> 寫AOF(阻塞其他客戶端的操作) -> 客戶端執行 -> ...

由於Redis是單線程的方式,雖然第一個客戶端執行沒有影響,但是如果第二次執行間隔時間很短或者其他客戶端執行時,由於要寫AOF,因此後續的客戶端仍然會等待,正常日誌寫的比較快沒什麼問題,如果當磁碟壓力非常大的時候,寫盤很慢的話,那麼客戶端的操作也會變卡。

預設情況下AOF持久化的配置如下:

appendonly no
appendfilename "appendonly.aof"

# appendfsync always
appendfsync everysec
# appendfsync no

# rewrite時不執行fsync操作
no-appendfsync-on-rewrite no

# rewrite條件
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

aof-load-truncated yes

# 是否開啟混合持久化
aof-use-rdb-preamble yes

預設情況下appendonly選項是關閉的,當打開時才會執行AOF持久化,持久化的文件名稱由appendfilename配置。

對於寫回的策略由參數appendfsync配置,含義分別如下:

  1. always:同步寫回,也就是每個寫命令執行完成,立刻同步將日誌寫入aof文件。

  2. everysec:每秒寫回,當命令執行完成後,先將日誌寫入aof文件的記憶體緩衝區,然後每隔1s把緩衝區內容寫入磁碟。

  3. no:Redis不進行主動寫回,而是由操作系統控制寫回,每次僅把日誌寫入aof文件的記憶體緩衝區,由操作系統決定何時將緩衝區內容寫回磁碟。

這3種策略各有優缺點,像always基本可以做到不丟數據,但是每次執行命令都需要落盤操作,所以會影響主線程後續命令的性能,最慢也最安全。而no這種方式性能最高,每次只需要寫緩衝區,但是落盤不受控制,完全由操作系統來負責寫入,如果宕機可能會丟失比較多的數據。最後預設的策略是everysec,這是另外兩種策略的折中,也就是在速度和安全性方面的折中,每秒刷盤從一定程度上提高了性能,在宕機時丟失的數據也控制在1s的區間內,是Redis的預設選項,這3種模式彙總比較如下:

appendfsync 含義 優點 缺點
always 同步寫回 可靠性最高,數據幾乎不丟 性能低,開銷大
everysec 每秒寫回 性能較高、可靠性適中 數據丟失在1s以內
no 不主動刷盤 高性能 可靠性低

具體寫回策略要根據實際的場景配置,如果不確定,保持預設值。

然後auto-aof-rewrite-percentageauto-aof-rewrite-min-size這兩個參數是表示AOF重寫的條件,為什麼要進行AOF重寫,原因主要如下:

  1. 單文件大小過大,由於aof是單個文件,如果Redis不斷執行命令,那麼很容易就達到數十億甚至上百億的命令數,因此文件會非常大,效率也會逐漸降低。
  2. 如果Redis服務重啟,那麼所有的命令都要依次被重新執行,如果文件太大,要執行的命令也特別多,恢復就會非常緩慢。

針對上面的問題尤其是第2個問題,可以看出aof文件並不能一直無限制的增大,因此需要AOF重寫機制,重寫機制其實也很好理解,就是Redis根據當前實際存在的數據重新創建新的文件來覆蓋原來的文件,比如原來的操作是這樣的:

set hello 1
set hello 2
incr hello

hset program java spring
hset program python flask
hset program golang gin
hset program python tornado
hdel program java

其中有些key是做了多次操作的,當前記憶體中的數據應該是下麵這樣:

{
    "hello": "3",
    "program": {
        "python": "tornado",
        "golang": "gin"
    }
}

根據最後的狀態生成一遍寫入命令即可:

set hello 3
hmset program python tornado golang gin

所以上面的7條過程指令就被壓縮為這2條了,按照最終狀態重寫可以丟掉中間不必要的重覆過程,這樣會大大減小文件的體積。

上面兩個重寫條件參數的含義如下:

auto-aof-rewrite-percentage: 預設為100,表示當AOF日誌大小增長至指定百分比時觸發重寫,通過上面的代碼可以看出來Redis會以上次重寫後的AOF文件大小作為基準大小,如果初次啟動則以當前大小作為基準大小,然後拿當前大小和基準大小做比較,噹噹前大小超出基準大小指定的的百分比後,重寫會被觸發。例如當前配置為100,AOF文件初始大小為300M,當文件大小大於:300 + 300 * 100% = 600M時則觸發重寫,如果我們將該項配置為0則表示禁用重寫。

auto-aof-rewrite-min-size: 指定AOF文件重寫要達到的最小位元組數,這樣可以避免過早地重寫,比如剛開始大小可能為10K,那麼按照比例在20K時就會進行重寫,這樣太頻繁,因此指定最小大小後,即使百分比達到,也不進行重寫,需要超過這個指定文件的大小且滿足百分比時才進行重寫,預設這個最小大小是64M,所以是當AOF文件達到64M且超過基準大小的100%則觸發重寫操作。

然後aof-load-truncated配置項表示當文件被截斷時讀取到EOF後會做什麼操作,可能是由於文件系統或者其他原因導致文件沒有讀取完就結束,預設是允許這種情況並且正常啟動服務的僅會在日誌中給出提示,如果想讓Redis此時停下來可以配置為no,並且可以使用redis-check-aof工具嘗試修複。最後需要註意的就是允許截斷並不表示允許文件損壞,如果文件出現損壞,無論這個配置是開啟還是關閉Redis都會報錯退出。

aof-use-rdb-preamble表示是否開啟混合持久化,預設是開啟的,Redis從4.0版本開始就開始支持混合使用AOF和RDB的持久化方法,只是預設是關閉的,從5.0開始就預設將混合持久化打開了,也就是說我們在配置AOF持久化的時候,其實是一種混合的方式,這種混合的方式其實是在AOF的基礎上實現的,首先是按照AOF的方式追加命令,當AOF文件滿足一定的條件時會觸發重寫,而這個重寫的時機恰好會執行混合持久化的操作,在重寫的時候將內容以RDB的格式保存,但是仍然寫入AOF文件,當重寫完成之後,隨後的寫操作仍然按照指令文本的方式追加,到下一次重寫時仍然轉換為RDB重寫到文件頭部,如此往複,這樣在兩次快照之間通過比較輕量的AOF持久化來實時保存數據,在重寫時壓縮為快照以節省大量的空間,寫入速度比較快,同時恢復時也可以提升性能:

image-20220722084747285

AOF寫入要涉及到兩個部分,分別是實時寫入和AOF重寫。

2.1.AOF實時寫入

當開啟AOF時,Redis在實時處理請求時會先將內容寫入一個緩衝區,這個緩衝區在server.h中的redisServer結構體中進行了定義:

struct redisServer {
    // ...
    /* AOF persistence */
    int aof_state;                  /* AOF_(ON|OFF|WAIT_REWRITE) */
    int aof_fsync;                  /* Kind of fsync() policy */
    char *aof_filename;             /* Name of the AOF file */
    int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */
    int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */
    off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */
    off_t aof_rewrite_base_size;    /* AOF size on latest startup or rewrite. */
    off_t aof_current_size;         /* AOF current size. */
    off_t aof_fsync_offset;         /* AOF offset which is already synced to disk. */
    int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */
    pid_t aof_child_pid;            /* PID if rewriting process */
    list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. */
    sds aof_buf;      /* AOF buffer, written before entering the event loop */
    int aof_fd;       /* File descriptor of currently selected AOF file */
    int aof_selected_db; /* Currently selected DB in AOF */
    time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
    time_t aof_last_fsync;            /* UNIX time of last fsync() */
    time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */
    time_t aof_rewrite_time_start;  /* Current AOF rewrite start time. */
    int aof_lastbgrewrite_status;   /* C_OK or C_ERR */
    unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */
    int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */
    int rdb_save_incremental_fsync;   /* fsync incrementally while rdb saving? */
    int aof_last_write_status;      /* C_OK or C_ERR */
    int aof_last_write_errno;       /* Valid if aof_last_write_status is ERR */
    int aof_load_truncated;         /* Don't stop on unexpected AOF EOF. */
    int aof_use_rdb_preamble;       /* Use RDB preamble on AOF rewrites. */
    /* AOF pipes used to communicate between parent and child during rewrite. */
    int aof_pipe_write_data_to_child;
    int aof_pipe_read_data_from_parent;
    int aof_pipe_write_ack_to_parent;
    int aof_pipe_read_ack_from_child;
    int aof_pipe_write_ack_to_child;
    int aof_pipe_read_ack_from_parent;
    int aof_stop_sending_diff;     /* If true stop sending accumulated diffs
                                      to child process. */
    sds aof_child_diff;             /* AOF diff accumulator child side. */
    // ...
}

這其中定義了AOF相關的所有變數用於數據及狀態的保存,aof_buf就是寫入的緩衝區,類型是sds簡單動態字元串,所有客戶端指令的執行是通過void call(client *c, int flags)這個函數來執行:

void call(client *c, int flags) {
    // ...
    /* Propagate the command into the AOF and replication link */
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;

        /* Check if the command operated changes in the data set. If so
         * set for replication / AOF propagation. */
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        /* If the client forced AOF / replication of the command, set
         * the flags regardless of the command effects on the data set. */
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

        /* However prevent AOF / replication propagation if the command
         * implementations called preventCommandPropagation() or similar,
         * or if we don't have the call() flags to do so. */
        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
                propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
                propagate_flags &= ~PROPAGATE_AOF;

        /* Call propagate() only if at least one of AOF / replication
         * propagation is needed. Note that modules commands handle replication
         * in an explicit way, so we never replicate them automatically. */
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
    // ...
}

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

可以看到在call函數中調用了propagate函數,裡面會調用feedAppendOnlyFile寫入AOF緩衝區:

// aof.c
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appended. To issue a SELECT command is needed. */
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setCommand && argc > 3) {
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
        buf = catAppendOnlyGenericCommand(buf,3,argv);
        for (i = 3; i < argc; i ++) {
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));
        if (exarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                               exarg);
        if (pxarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                               pxarg);
    } else {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == AOF_ON)
        // 寫入AOF buffer
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    if (server.aof_child_pid != -1)
        // 寫入AOF重寫子進程的buffer
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    for (j = 0; j < argc; j++) {
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);
        decrRefCount(o);
    }
    return dst;
}

sds sdscatlen(sds s, const void *t, size_t len) {
    size_t curlen = sdslen(s);

    // sds空間擴容
    s = sdsMakeRoomFor(s,len);
    if (s == NULL) return NULL;
    memcpy(s+curlen, t, len);
    sdssetlen(s, curlen+len);
    s[curlen+len] = '\0';
    return s;
}


feedAppendOnlyFile函數中通過catAppendOnlyGenericCommand生成命令對應的寫入文本,然後調用sdscatlen和原來的aof_buf進行拼接,完成了向aof_buf的寫入操作。如果此時,執行AOF重寫的子進程正在運行,那麼還會向子進程的緩衝區寫入變化的內容,子進程會一併執行重寫,調用的函數是aofRewriteBufferAppend,這個等下再說,然後看一下回寫策略的執行部分,入口仍然是在serverCron大迴圈中:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    // ...
}

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) {
        /* Check if we need to do fsync even the aof buffer is empty,
         * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
         * called only when aof buffer is not empty, so if users
         * stop write commands before fsync called in one second,
         * the data in page cache cannot be flushed in time. */
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
            server.aof_fsync_offset != server.aof_current_size &&
            server.unixtime > server.aof_last_fsync &&
            !(sync_in_progress = aofFsyncInProgress())) {
            goto try_fsync;
        } else {
            return;
        }
    }

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = aofFsyncInProgress();

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponing, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */

    latencyStartMonitor(latency);
    // 將AOF buffer寫入內核緩存
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    
    // ...

    /* We performed the write so reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

    
    
    // ...
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

try_fsync:
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* redis_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_fsync_offset = server.aof_current_size;
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) {
            aof_background_fsync(server.aof_fd);
            server.aof_fsync_offset = server.aof_current_size;
        }
        server.aof_last_fsync = server.unixtime;
    }
}

ssize_t aofWrite(int fd, const char *buf, size_t len) {
    ssize_t nwritten = 0, totwritten = 0;

    while(len) {
        nwritten = write(fd, buf, len);

        if (nwritten < 0) {
            if (errno == EINTR) {
                continue;
            }
            return totwritten ? totwritten : -1;
        }

        len -= nwritten;
        buf += nwritten;
        totwritten += nwritten;
    }

    return totwritten;
}

void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}

int aofFsyncInProgress(void) {
    return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}

Redis服務初始化的時候會調用一次flushAppendOnlyFile(0),將變數server.aof_flush_postponed_start初始化為當前的Unix時間戳,然後後續在主進程的迴圈中不斷判斷是否滿足寫入的要求,無論是否滿足首先會先調用aofWrite函數將server.aof_buf寫入內核緩衝區,然後清空aof_buf,到下一次迴圈的時候如果看到aof_buf被清空的時候會gototry_fsync標簽部分,如果寫回策略配置的是always則直接調用redis_fsync寫入,否則如果配置的是everysec那麼會調用aof_background_fsync放到後臺線程執行,其實是調用bioCreateBackgroundJob將任務添加到隊列,這裡要涉及到bio操作,bio是採用多線程來實現的,Redis所有的事件、記憶體數據結構操作都是在主線程中處理,而文件句柄的關閉、AOF刷盤這些系統調用都是採用bio進行專門的管理,在Redis中一共開了3個線程來做這些事:

// bio.h
/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
#define BIO_NUM_OPS       3

// server.c
void InitServerLast() {
    bioInit();
    server.initial_memory_usage = zmalloc_used_memory();
}
// bio.c
static pthread_t bio_threads[BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
static list *bio_jobs[BIO_NUM_OPS];
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            // 條件變數等待
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

/* Return the number of pending jobs of the specified type. */
unsigned long long bioPendingJobsOfType(int type) {
    unsigned long long val;
    pthread_mutex_lock(&bio_mutex[type]);
    val = bio_pending[type];
    pthread_mutex_unlock(&bio_mutex[type]);
    return val;
}

首先,在Redis主進程啟動的時候在InitServerLast中調用了bioInit初始化了所有的線程並且啟動,bioProcessBackgroundJobs就在後臺開始運行了,然後會進入無限迴圈並通過條件變數等待,這個時候當通過aof_background_fsync創建任務時就會調用bioCreateBackgroundJob在任務列表中添加1個節點,並使用pthread_cond_signal喚醒等待的線程,這樣就在bioProcessBackgroundJobs線程中執行具體的寫入任務,bio部分體現了在併發中鎖和條件變數的經典用法。

以上就是AOF實時緩衝區的寫入過程,然後簡單看一下重寫的過程。

2.2.AOF重寫

AOF重寫是在serverCron當中判斷滿足重寫的條件時執行的操作,具體是調用aof.c中的rewriteAppendOnlyFileBackground函數執行:

// aof.c
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    // 創建父子進程通信的管道
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    start = ustime();
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

        /* Child */
        closeClildUnusedResourceAfterFork();
        redisSetProcTitle("redis-aof-rewrite");
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per seco

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 10 | MySQL為什麼有時候會選錯索引? 使用哪個索引是由 MySQL 來確定的 可能遇到的情況:一條本來可以執行得很快的語句,卻由於 MySQL 選錯了索引,而導致執行速度變得很慢 先建一個簡單的表,表裡有 a、b 兩個欄位,並分別建上索引: CREATE TABLE `t` ( `id` i ...
  • 11 | 怎麼給字元串欄位加索引? Q:如何在郵箱這樣的欄位上建立合理的索引? 用戶表的定義: create table SUser( ID bigint unsigned primary key, email varchar(64), ... )engine=innodb; 由於要使用郵箱登錄,所 ...
  • 06 | 全局鎖和表鎖 :給表加個欄位怎麼有這麼多阻礙? Connection連接與Session會話 通俗來講,會話(Session)是通信雙⽅從開始通信到通信結束期間的⼀個上下文(Context)。這個上下文是⼀段位於伺服器端的記憶體:記錄了本次連接的客戶端機器、通過哪個應用程式、哪個用戶登錄等信 ...
  • 09 | 普通索引和唯一索引,應該怎麼選擇? 每個人都有一個唯一的身份證號,而且業務代碼已經保證了不會寫入兩個重覆的身份證號。如果市民系統需要按照身份證號查姓名,就會執行類似這樣的 SQL 語句: select name from CUser where id_card = 'xxxxxxxyyyy ...
  • 分享嘉賓:張鴻志博士 美團 演算法專家 編輯整理:廖媛媛 美的集團 出品平臺:DataFunTalk **導讀:**美團作為中國最大的線上本地生活服務平臺,連接著數億用戶和數千萬商戶,其背後蘊含著豐富的與日常生活相關的知識。美團知識圖譜團隊從2018年開始著力於圖譜構建和利用知識圖譜賦能業務,改善用戶 ...
  • 什麼是 MyBatis? MyBatis 是一款優秀的持久層框架,它支持自定義 SQL、存儲過程以及高級映射。 MyBatis 免除了幾乎所有的 JDBC 代碼以及設置參數和獲取結果集的工作。 MyBatis 可以通過簡單的 XML 或註解來配置和映射原始類型、介面和 Java POJO(Plain ...
  • 1.自然連接 NATURAL JOIN SQL99中新增的自然連接相當於SQL92中的等值連接。它可以自動的查詢兩個表中所有的相同欄位,然後進行等值連接。 在SQL92中: SELECT 表1.欄位1,表2.欄位2 FROM 表1 JOIN 表2 ON 表1.欄位3 = 表2.同名欄位 AND 表2 ...
  • 預備知識梳理 本文中設定 block size 與 page size 大小相等。 什麼是 Block 文章的開始先解釋一下,磁碟的數據讀寫是以扇區 (sector) 為單位的,而操作系統從磁碟上讀寫數據是以塊 (block) 為單位的,一個 block 由若幹個連續的 sector 組成,使用 b ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...