概述 lab2中實現了raft協議,本lab將在raft之上實現一個可容錯的k/v存儲服務,第一部分是實現一個不帶日誌壓縮的版本,第二部分是實現日誌壓縮。時間原因我只完成了第一部分。 設計思路 ![kvserver](https://blog 1253119293.cos.ap beijing.my ...
概述
lab2中實現了raft協議,本lab將在raft之上實現一個可容錯的k/v存儲服務,第一部分是實現一個不帶日誌壓縮的版本,第二部分是實現日誌壓縮。時間原因我只完成了第一部分。
設計思路
如上圖,lab2實現了raft協議,本lab將實現kvserver。每個raft都關聯一個kvserver,Clerks發送Put(), Append(), Get() RPC給leader伺服器中的kvserver,kvserver收到請求後將操作打包成Log Entry提交給raft,然後阻塞等待raft將這個Entry拷貝到其它server,當Log Entry被拷貝到大部分的server後,leader 的raft會通知kvserver(raft往管道中塞comitted Entry,kvserver通過讀這個管道獲取通知),kvserver執行命令,然後響應Clerk。
Clerk
客戶端通過Clerk發送請求,來看下Clerk代碼:
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
lastLeader int
cid int64
seq int
}
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
// 參數: 要讀的key, 當前clerk的id, 請求序列號
getArgs := GetArgs{Key: key, Cid:ck.cid, Seq:ck.seq}
reply := GetReply{}
for {
doneCh := make(chan bool, 1)
go func() {
//發送Get() RPC
ok := ck.servers[ck.lastLeader].Call("KVServer.Get", &getArgs, &reply)
doneCh <- ok
}()
select {
case <-time.After(600 * time.Millisecond):
DPrintf("clerk(%d) retry PutAppend after timeout\n", ck.cid)
continue
case ok := <- doneCh:
//收到響應後,並且是leader返回的,那麼說明這個命令已經執行了
if ok && reply.WrongLeader != WrongLeader {
//請求序列號加1
ck.seq++
return reply.Value
}
}
//換一個server重試
ck.lastLeader++
ck.lastLeader %= len(ck.servers)
}
return ""
}
這裡只給出了Get()的代碼,Put()和Append()類似,發送KVServer.Get給一個server,如果這個server不是leader,換一個server重試。直到發給真正的leader,並且leader將這個命令拷貝到大部分其它server後,然後成功執行該命令,Clerk.Get()才會返回。
KVServer
再來看下服務端的代碼,KVServer處理Clerk的RPC請求:
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
maxraftstate int // snapshot if log grows this big
// Your definitions here.
// 保存鍵值對
db map[string]string
latestReplies map[int64]*LatestReply
notify map[int]chan struct{}
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
if _, isLeader := kv.rf.GetState(); !isLeader {
reply.WrongLeader = WrongLeader
reply.Err = ""
return
}
// 防止重覆請求
kv.mu.Lock()
if latestReply, ok := kv.latestReplies[args.Cid]; ok && args.Seq <= latestReply.Seq {
reply.WrongLeader = IsLeader
reply.Value = latestReply.Reply.Value
reply.Err = latestReply.Reply.Err
kv.mu.Unlock()
return
}
kv.mu.Unlock()
command := Op{Operation:"Get", Key:args.Key, Cid:args.Cid, Seq:args.Seq}
index, term, _ := kv.rf.Start(command)
// 阻塞等待結果
kv.mu.Lock()
ch := make(chan struct{})
kv.notify[index] = ch
kv.mu.Unlock()
select {
case <-ch:
curTerm, isLeader := kv.rf.GetState()
DPrintf("%v got notify at index %v, isLeader = %v\n", kv.me, index, isLeader)
if !isLeader || curTerm != term {
reply.WrongLeader = WrongLeader
reply.Err = ""
} else {
reply.WrongLeader = IsLeader
kv.mu.Lock()
if value, ok := kv.db[args.Key]; ok {
reply.Value = value
reply.Err = OK
} else {
reply.Err = ErrNoKey
}
kv.mu.Unlock()
}
}
}
KVServer.db用於保存鍵值對。
KVServer.Get()首先判斷自己是不是leader,如果不是leader,直接返回,這樣Clerk好重試其它server。如果是leader,先在緩存中找,看這個請求是否已經執行過了。
因為可能出現這麼一種情況:如果leader commit一個Entry後立即奔潰了,那麼Clerk就收不到響應,那麼Clerk會將這個請求發給新的leader,新的leader收到請求後如果不做任何措施,將會二次commit該Log Entry,對於Put()和Append()請求執行兩次是不正確的,所以需要一個辦法防止一個請求執行兩次。
可以這麼做:每個Clerk都分配一個唯一的cid,每個請求分配一個唯一的序列號seq,每成功一個請求,該序列號加一。服務端記錄每個客戶端cid最近一次apply的請求的序列號seq和對應的響應結果,根據這個信息可知,當再次收到這個客戶端的序列號小於seq的請求時,說明已經執行過了,直接返回結果。
如果之前沒有執行過,那麼調用
kv.rf.Start(command)
將Log Entry提交給raft,並且阻塞等待raft將這個Entry拷貝到其它大部分server,從阻塞返回後,說明這個Entry已經被拷貝到大部分server了,並且已經執行了命令,這時可以將結果返回給Clerk了。
那麼在哪裡接收raft的消息呢?KVServer在創建的時候會在一個線程中執行如下函數:
func (kv *KVServer) applyDaemon() {
for appliedEntry := range kv.applyCh {
command := appliedEntry.Command.(Op)
// 執行命令, 過濾已經執行過得命令
kv.mu.Lock()
if latestReply, ok := kv.latestReplies[command.Cid]; !ok || command.Seq > latestReply.Seq {
switch command.Operation {
case "Get":
latestReply := LatestReply{Seq:command.Seq,}
reply := GetReply{}
if value, ok := kv.db[command.Key]; ok {
reply.Value = value
} else {
reply.Err = ErrNoKey
}
latestReply.Reply = reply
kv.latestReplies[command.Cid] = &latestReply
case "Put":
kv.db[command.Key] = command.Value
latestReply := LatestReply{Seq:command.Seq}
kv.latestReplies[command.Cid] = &latestReply
case "Append":
kv.db[command.Key] += command.Value
latestReply := LatestReply{Seq:command.Seq}
kv.latestReplies[command.Cid] = &latestReply
default:
panic("invalid command operation")
}
}
DPrintf("%d applied index:%d, cmd:%v\n", kv.me, appliedEntry.CommandIndex, command)
// 通知
if ch, ok := kv.notify[appliedEntry.CommandIndex]; ok && ch != nil {
DPrintf("%d notify index %d\n",kv.me, appliedEntry.CommandIndex)
close(ch)
delete(kv.notify, appliedEntry.CommandIndex)
}
kv.mu.Unlock()
}
}
kv.applyCh這個chanel會在創建raft的時候傳給raft,當某個Log Entry可以被commit的時候,raft會往這個chanel中塞,只要for迴圈這個kv.applyCh,就能知道已經被commit的Entry,拿到Entry後,根據其中的命令執行相應的操作,然後通知KVServer.Get()繼續執行。
具體代碼在:https://github.com/gatsbyd/mit_6.824_2018
如有錯誤,歡迎指正:
15313676365