一、為什麼要確定付費客戶特征? 先講個案例,以 Shopify 網站為例進行分析。該網站提供了許多功能,圍繞著潛在客戶在全生命周期中所需的業務需求,包括從創建業務開始、賺取收益等整個閉環鏈上所需的任何工具,如: 開始做生意:Business name generator 線上工具、功能變數名稱選擇頁面、Bu ...
ETCD-raft筆記
0. 引言
該篇博客基於etcd v3.5.7版本,首先會簡單介紹etcd/raft對Raft選舉部分的演算法優化,然後通過源碼分析etcd/raft的選舉實現。
1. etcd對於raft選舉演算法優化措施
該優化措施均在raft博士論文中有講解
etcd/raft實現的與選舉有關的優化有Pre-Vote、Check Quorum、和Leader Lease。在這三種優化中,只有Pre-Vote和Leader Lease最初是對選舉過程的優化,Check Quorum是為了更高效地實現線性一致性讀(Linearizable Read)而做出的優化,但是由於Leader Lease需要依賴Check Quorum,因此也放在這講。
1.1 Pre-Vote
如下圖所示,當Raft集群的網路發生分區時,會出現節點數達不到quorum(達成共識至少需要的節點數)的分區,如圖中的Partition 1。
在節點數能夠達到quorum的分區中,選舉流程會正常進行,該分區中的所有節點的term最終會穩定為新選舉出的leader節點的term。不幸的是,在節點數無法達到quorum的分區中,如果該分區中沒有leader節點,因為節點總是無法收到數量達到quorum的投票而不會選舉出新的leader,所以該分區中的節點在election timeout超時後,會增大term併發起下一輪選舉,這導致該分區中的節點的term會不斷增大。
如果網路一直沒有恢復,這是沒有問題的。但是,如果網路分區恢復,此時,達不到quorum的分區中的節點的term值會遠大於能夠達到quorum的分區中的節點的term,這會導致能夠達到quorum的分區的leader退位(step down)並增大自己的term到更大的term,使集群產生一輪不必要的選舉。
Pre-Vote機制就是為瞭解決這一問題而設計的,其解決的思路在於不允許達不到quorum的分區正常進入投票流程,也就避免了其term號的增大。為此,Pre-Vote引入了“預投票”,也就是說,當節點election timeout超時時,它們不會立即增大自身的term並請求投票,而是先發起一輪預投票。收到預投票請求的節點不會退位。只有當節點收到了達到quorum的預投票響應時,節點才能增大自身term號併發起投票請求。這樣,達不到quorum的分區中的節點永遠無法增大term,也就不會在分區恢復後引起不必要的一輪投票。
1.2 Check Quorum
在Raft演算法中,保證線性一致性讀取的最簡單的方式,就是講讀請求同樣當做一條Raft提議,通過與其它日誌相同的方式執行,因此這種方式也叫作Log Read。顯然,Log Read的性能很差。而在很多系統中,讀多寫少的負載是很常見的場景。因此,為了提高讀取的性能,就要試圖繞過日誌機制。
但是,直接繞過日誌機制從leader讀取,可能會讀到陳舊的數據,也就是說存在stale read的問題。在下圖的場景中,假設網路分區前,Node 5是整個集群的leader。在網路發生分區後,Partition 0分區中選舉出了新leader,也就是圖中的Node 1。
但是,由於網路分區,Node 5無法收到Partition 0中節點的消息,Node 5不會意識到集群中出現了新的leader。此時,雖然它不能成功地完成日誌提交,但是如果讀取時繞過了日誌,它還是能夠提供讀取服務的。這會導致連接到Node 5的client讀取到陳舊的數據。
Check Quorum可以減輕這一問題帶來的影響,其機制也非常簡單:讓leader每隔一段時間主動地檢查follower是否活躍。如果活躍的follower數量達不到quorum,那麼說明該leader可能是分區前的舊leader,所以此時該leader會主動退位轉為follower。
需要註意的是,Check Quorum並不能完全避免stale read的發生,只能減小其發生時間,降低影響。如果需要嚴格的線性一致性,需要通過其它機制實現。
1.3 Leader Lease
分散式系統中的網路環境十分複雜,有時可能出現網路不完全分區的情況,即整個整個網路拓補圖是一個連通圖,但是可能並非任意的兩個節點都能互相訪問。
這種現象不止會出現在網路故障中,還會出現在成員變更中。在通過ConfChange
移除節點時,不同節點應用該ConfChange
的時間可能不同,這也可能導致這一現象發生——TODO (舉個例子)。
在上圖的場景下,Node 1與Node 2之間無法通信。如果它們之間的通信中斷前,Node 1是集群的leader,在通信中斷後,Node 2無法再收到來自Node 1的心跳。因此,Node 2會開始選舉。如果在Node 2發起選舉前,Node 1和Node 3中都沒有新的日誌,那麼Node 2仍可以收到能達到quorum的投票(來自Node 2本身的投票和來自Node 3的投票),併成為leader。
Leader Lease機制對投票引入了一條新的約束以解決這一問題:當節點在election timeout超時前,如果收到了leader的消息,那麼它不會為其它發起投票或預投票請求的節點投票。也就是說,Leader Lease機制會阻止了正常工作的集群中的節點給其它節點投票。
Leader Lease需要依賴Check Quorum機制才能正常工作。接下來筆者通過一個例子說明其原因。
假如在一個5個節點組成的Raft集群中,出現了下圖中的分區情況:Node 1與Node 2互通,Node 3、Node 4、Node 5之間兩兩互通、Node 5與任一節點不通。在網路分區前,Node 1是集群的leader。
在既沒有Leader Lease也沒有Check Quorum的情況下,Node 3、Node 4會因收不到leader的心跳而發起投票,因為Node 2、Node 3、Node 4互通,該分區節點數能達到quorum,因此它們可以選舉出新的leader。
而在使用了Leader Lease而不使用Check Quorum的情況下,由於Node 2仍能夠收到原leader Node 1的心跳,受Leader Lease機制的約束,它不會為其它節點投票。這會導致即使整個集群中存在可用節點數達到quorum的分區,但是集群仍無法正常工作。
而如果同時使用了Leader Lease和Check Quorum,那麼在上圖的情況下,Node 1會在election timeout超時後因檢測不到數量達到quorum的活躍節點而退位為follower。這樣,Node 2、Node 3、Node 4之間的選舉可以正常進行。
1.4 引入的新問題與解決方案
引入Pre-Vote和Check Quorum(etcd/raft的實現中,開啟Check Quorum會自動開啟Leader Lease)會為Raft演算法引入一些新的問題。
當一個節點收到了term比自己低的消息時,原本的邏輯是直接忽略該消息,因為term比自己低的消息僅可能是因網路延遲的遲到的舊消息。然而,開啟了這些機制後,在如下的場景中會出現問題:
場景1: 如上圖所示,在開啟了Check Quorum / Leader Lease後(假設沒有開啟Pre-Vote,Pre-Vote的問題在下一場景中討論),數量達不到quorum的分區中的leader會退位,且該分區中的節點永遠都無法選舉出leader,因此該分區的節點的term會不斷增大。當該分區與整個集群的網路恢復後,由於開啟了Check Quorum / Leader Lease,即使該分區中的節點有更大的term,由於原分區的節點工作正常,它們的選舉請求會被丟棄。同時,由於該節點的term比原分區的leader節點的term大,因此它會丟棄原分區的leader的請求。這樣,該節點永遠都無法重新加入集群,也無法當選新leader。(詳見issue #5451、issue #5468)。
場景2: Pre-Vote機制也有類似的問題。如上圖所示,假如發起預投票的節點,在預投票通過後正要發起正式投票的請求時出現網路分區。此時,該節點的term會高於原集群的term。而原集群因沒有收到真正的投票請求,不會更新term,繼續正常運行。在網路分區恢復後,原集群的term低於分區節點的term,但是日誌比分區節點更新。此時,該節點發起的預投票請求因沒有日誌落後會被丟棄,而原集群leader發給該節點的請求會因term比該節點小而被丟棄。同樣,該節點永遠都無法重新加入集群,也無法當選新leader。(詳見issue #8501、issue #8525)。
場景3: 在更複雜的情況中,比如,在變更配置時,開啟了原本沒有開啟的Pre-Vote機制。此時可能會出現與上一條類似的情況,即可能因term更高但是log更舊的節點的存在導致整個集群的死鎖,所有節點都無法預投票成功。這種情況比上一種情況更危險,上一種情況只有之前分區的節點無法加入集群,在這種情況下,整個集群都會不可用。(詳見issue #8501、issue #8525)。
為瞭解決以上問題,節點在收到term比自己低的請求時,需要做特殊的處理。處理邏輯也很簡單:
- 如果收到了term比當前節點term低的leader的消息,且集群開啟了Check Quorum / Leader Lease或Pre-Vote,那麼發送一條term為當前term的消息,令term低的節點成為follower。(針對場景1、場景2)
- 對於term比當前節點term低的預投票請求,無論是否開啟了Check Quorum / Leader Lease或Pre-Vote,都要通過一條term為當前term的消息,迫使其轉為follower並更新term。(針對場景3)
2. etcd中Raft選舉的實現
2.1 發起vote或pre-vote流程
2.1.1 Election timeout
在集群剛啟動時,所有節點的狀態都為 follower
,等待超時觸發 leader election
。超時時間由 Config
設置。etcd/raft
沒有用真實時間而是使用邏輯時鐘,當調用 tick
的次數超過指定次數時觸發超時事件。 對於 follower
和 candidate
而言,tick
中會判斷是否超時,若超時則會本地生成一個 MsgHup
類型的消息觸發 leader election
:
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
2.1.2 MsgHup消息處理與hup方法
etcd/raft通過raft
結構體的Step
方法實現Raft狀態機的狀態轉移。Step
方法是消息處理的入口,不同 state
處理的消息不同且處理方式不同,所以有多個 step
方法:
raft.Step()
: 消息處理的入口,做一些共性的檢查,如term
,或處理所有狀態都需要處理的消息。若需要更進一步處理,會根據狀態 調用下麵的方法:raft.stepLeader()
:leader
狀態的消息處理方法;raft.stepFollower()
:follower
狀態的消息處理方法;raft.stepCandidate()
:candidate
狀態的消息處理方法。
func (r *raft) Step(m pb.Message) error {
// ... ...
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
// ... ...
}
// ... ...
}
Step
方法在處理MsgHup
消息時,會根據當前配置中是否開啟了Pre-Vote
機制,以不同的CampaignType
調用hup
方法。CampaignType
是一種枚舉類型(go語言的枚舉實現方式),其可能值如下表所示。
值 | 描述 |
---|---|
campaignPreElection |
表示Pre-Vote的預選舉階段。 |
campaignElection |
表示正常的選舉階段(僅超時選舉,不包括Leader Transfer)。 |
campaignTransfer |
表示Leader Transfer階段。 |
接下來對hup
的實現進行分析。
func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}
總結當節點出現以下情況時不能發起選舉:
- 節點被移出集群
- 節點是learner
- 節點還有未保存到穩定存儲的snapshot
- 節點有還未被應用的集群配置變更
ConfChange
消息
2.1.3 campaign
官方註釋很詳細了,因此不多廢筆墨解釋
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
// 因為調用campaign的方法不止有hup,campaign方法首先還是會檢查promotable()是否為真。
if !r.promotable() {
// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
}
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
var ids []uint64
{
//won't send requestVote to learners, beacause learners[] are not in incoming[] and outgoing[]
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
至此,該節點已向其他節點發送MsgVote或MsgPreVote消息
2.2 節點收到vote或pre-vote消息處理流程
處理vote或pre-vote消息都在Step
方法內,不會進入各自的step方法,有效的MsgPreVote
必須滿足其中一個條件(m.Term > r.Term)
官方註釋很詳細,簡單易理解,因此不多廢筆墨解釋
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
// ........
}
switch m.Type {
case pb.MsgHup:
// ........
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// Note: it turns out that that learners must be allowed to cast votes.
// This seems counter- intuitive but is necessary in the situation in which
// a learner has been promoted (i.e. is now a voter) but has not learned
// about this yet.
// For example, consider a group in which id=1 is a learner and id=2 and
// id=3 are voters. A configuration change promoting 1 can be committed on
// the quorum `{2,3}` without the config change being appended to the
// learner's log. If the leader (say 2) fails, there are de facto two
// voters remaining. Only 3 can win an election (due to its log containing
// all committed entries), but to do so it will need 1 to vote. But 1
// considers itself a learner and will continue to do so until 3 has
// stepped up as leader, replicates the conf change to 1, and 1 applies it.
// Ultimately, by receiving a request to vote, the learner realizes that
// the candidate believes it to be a voter, and that it should act
// accordingly. The candidate's config may be stale, too; but in that case
// it won't win the election, at least in the absence of the bug discussed
// in:
// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
// it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
default:
// ...........
}
return nil
}
註意:節點同意投票消息帶的是m.Term
,拒絕投票消息是r.Term
,如果拒接MsgPreVote
消息,那麼發送pre-vote消息的節點就變為
在r.Term
的follower
,在2.3.1節內體現
2.3 節點收到處理MsgPreVoteResp或MsgVoteResp消息流程
2.3.1 Step內處理
根據2.2節可以看到Step
內有這樣一段代碼:在2.2節最後有解釋,官方也給了詳細註釋
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
2.3.2 stepCandidate內處理
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
如果預投票成功,則發起新一輪正式投票。如果正式投票成功,則轉為leader,接著後續操作
2.4 轉變領導者身份
2.4.1 becomeLeader()
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs.Progress[r.id].BecomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
從candidate
轉變為leader
,需要在自己的log中append一條當前term的日誌,並廣播給其他節點