接著上一篇,這篇文章分析一下redis事務操作中multi,exec,discard三個核心命令。 原文地址:http://www.jianshu.com/p/e22615586595 看本篇文章前需要先對上面文章有所瞭解: "redis源碼分析之事務Transaction(上)" 一、redis事 ...
接著上一篇,這篇文章分析一下redis事務操作中multi,exec,discard三個核心命令。
原文地址:http://www.jianshu.com/p/e22615586595
看本篇文章前需要先對上面文章有所瞭解:
redis源碼分析之事務Transaction(上)
一、redis事務核心命令簡介
redis事務操作核心命令:
//用於開啟事務
{"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
//用來執行事務中的命令
{"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
//用來取消事務
{"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
在redis中,事務並不具有ACID的概念,換句話說,redis中的事務僅僅是保證一系列的命令按順序一個一個執行,如果中間失敗了,並不會進行回滾操作。
使用redis事務舉例如下:
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set a a
QUEUED
127.0.0.1:6379> set b b
QUEUED
127.0.0.1:6379> set c c
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) OK
127.0.0.1:6379>
二、redis事務核心命令源碼分析
關於事務的幾個命令所對應的函數都放在multi.c文件中。
首先來看一下multi命令,該命令用於標記客戶端開啟事務狀態,因此它做的就是修改客戶端狀態,代碼很簡單,如下:
void multiCommand(client *c) {
//如果客戶端已經是事務模式,則返回錯誤提示信息
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
//設置客戶端為事務模式
c->flags |= CLIENT_MULTI;
//返回結果
addReply(c,shared.ok);
}
接下來看下redis處理命令邏輯中的一段源碼:
這段代碼在server.c文件中的processCommand方法中:
//如果客戶端處於事務狀態且當前執行的命令不是exec,discard,multi跟watch命令中的一個
//則把當前命令加入一個隊列
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
//加入隊列
queueMultiCommand(c);
//返回結果
addReply(c,shared.queued);
} else {
//執行當前命令
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
看入隊操作源碼前,先來熟悉幾個數據結構,redis會把每個連接的客戶端封裝成一個client對象,該對象中含有大量欄位用來保存需要的信息,發佈訂閱功能也使用對應的欄位進行存儲,事務當然也不例外,如下:
//每個客戶端對象中有一個mstate欄位用來保存事務上下文
typedef struct client {
multiState mstate;
}
//事務包裝類型
typedef struct multiState {
//當前事務中需要執行的命令數組
multiCmd *commands;
//需要執行的命令數量
int count;
//需要同步複製的最小數量
int minreplicas;
//同步複製超時時間
time_t minreplicas_timeout;
} multiState;
//事務中執行命令的封裝類型
typedef struct multiCmd {
//參數
robj **argv;
//參數數量
int argc;
//命令本身
struct redisCommand *cmd;
} multiCmd;
瞭解了基本的數據結構以後,再來看下入隊操作:
void queueMultiCommand(client *c) {
//類型前面有說明
multiCmd *mc;
int j;
//擴容,每次擴容一個命令的大小
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
//c++中給數組最後一個元素賦值語法實在是有點難懂...
mc = c->mstate.commands+c->mstate.count;
//初始化mc各個欄位
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
//把參數一個一個拷貝過來
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
}
上面是把命令加入事務命令數組的中的邏輯,由於在執行事務過程中也會執行刪除事務的操作,因此在看執行事務邏輯之前我們先看下刪除事務的實現原理。
當事務執行完成,執行錯誤或者客戶端想取消當前事務,都會跟discard命令有聯繫,一起看下源碼:
void discardCommand(client *c) {
//如果當前客戶端沒有處於事務狀態,則返回錯誤信息
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"DISCARD without MULTI");
return;
}
//刪除事務
discardTransaction(c);
//返回結果
addReply(c,shared.ok);
}
//具體的刪除邏輯
void discardTransaction(client *c) {
//釋放客戶端事務資源
freeClientMultiState(c);
//初始化客戶端事務資源
initClientMultiState(c);
//狀態位還原
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
//取消已watch的key,該函數上面文章中已經進行過分析,不贅述
unwatchAllKeys(c);
}
//釋放事務隊列中的每個命令
void freeClientMultiState(client *c) {
int j;
for (j = 0; j < c->mstate.count; j++) {
int i;
multiCmd *mc = c->mstate.commands+j;
//挨個釋放命令的參數
for (i = 0; i < mc->argc; i++)
decrRefCount(mc->argv[i]);
zfree(mc->argv);
}
//最後釋放命令本身
zfree(c->mstate.commands);
}
//事務相關欄位設為初始值
void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
}
到這裡,我們已經瞭解了開啟事務模式,把各個命令加入到事務命令執行數組中以及取消事務三個模塊的執行原理,最後一起看下事務的執行過程,代碼較長,需要慢慢看。
把一系列命令加入到事務命令數組中以後,客戶端執行exec命令就可以把其中的所有命令挨個執行完成了,分析exec命令源碼之前,我們應該可以想到redis的邏輯應該就是從客戶端的事務命令數組中取出所有命令一個一個執行,源碼如下:
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
//標記是否需要把MULTI/EXEC傳遞到AOF或者slaves節點
int must_propagate = 0;
//標記當前redis節點是否為主節點
int was_master = server.masterhost == NULL;
//如果客戶端沒有處於事務狀態,則返回錯誤提示信息
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
//首先對兩個需要終止當前事務的條件進行判斷
//1.當有WATCH的key被修改時則終止,返回一個nullmultibulk對象
//2.當之前有命令加入事務命令數組出錯則終止,例如傳入的命令參數數量不對,會返回execaborterr
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
//刪除當前事務信息,前面已經分析過,不贅述
discardTransaction(c);
goto handle_monitor;
}
//把watch的key都刪除,上面文章已經分析過,不贅述
unwatchAllKeys(c);
//保存當前命令上下文
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
//遍歷事務命令數組
for (j = 0; j < c->mstate.count; j++) {
//把事務隊列中的命令參數取出賦值給client,因為命令是在client維度執行的
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
//同步事務操作到AOF或者集群中的從節點
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
execCommandPropagateMulti(c);
must_propagate = 1;
}
//執行具體命令
call(c,CMD_CALL_FULL);
//由於命令可以修改參數的值或者數量,因此重新保存命令上下文
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
//恢複原始命令上下文
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
//事務執行完成,刪除該事務,前面已經分析過,不贅述
discardTransaction(c);
//確保EXEC會進行傳遞
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
}
//monitor命令操作
handle_monitor:
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
上面就是事務命令執行的整個邏輯,可以先排除集群跟AOF的同步邏輯,專註理解核心邏輯,代碼整體邏輯算是比較清晰的,搞明白了前面的幾個模塊以後,再看執行邏輯就不會太難。
三、redis事務命令總結
通過上、下兩篇文章對redis事務各個命令進行了分析,仔細閱讀應該可以瞭解整個事務執行框架,如果有任何問題或者疑惑,歡迎留言評論。