func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) |
初次啟動raftNode時調用WAL.Create方法。創建WAL對象用於記錄追加 :判斷是否存在dirpath路徑,如果已存在則不是初次啟動raftNode,返回os.ErrExist。創建臨時目錄和初始上鎖的wal文件—walName(seq=0 & index=0),seek到文件末尾(why?),預分配該wal文件大小(SegmentSizeBytes=64MB,優化追加速度),創建WAL對象並設定路徑、 metadata(NodeID和ClusterID)、編碼器,將上鎖的WAL文件追加到鎖表內,然後依次寫入crc、metadata和空snapshot,重命名臨時目錄,同步臨時目錄的父目錄(fsync)使得重命名持久化。 |
func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) |
移除w.dir目錄及目錄下所有文件和文件夾,調用os.Rename(tmpdirpath, w.dir)將Create方法內創建的臨時目錄重命名,創建FilePipeline和dirFile *os.File,dirFile is a fd for the wal directory for syncing on Rename |
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error |
檢驗snapshot是否合法(Since etcd>=3.5.0),pb序列化snapshot得到data欄位,加鎖,調用w.encoder.encode方法寫入record,更新w.enti如果snapshot index > 原w.enti,解鎖。 |
func (w *WAL) saveCrc(prevCrc uint32) error |
寫入crcType的記錄 |
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error |
加鎖,如果hardstate和entries為空,返回。判斷是否需要sync(entries長度不為0||vote改變||term改變),寫入entries和hardstate,判斷文件當前位置是否小於SegmentSizeBytes(預設64M),如果小於判斷是否需要sync數據,如果不小於,返回cut操作結果,解鎖。 |
func (w *WAL) saveEntry(e *raftpb.Entry) error |
寫入一條entry記錄並更新WAL對象的enti值 |
func (w *WAL) saveState(s *raftpb.HardState) error |
判斷是否為空,不為空寫入HardState並更新WAL對象的state值 |
func (w *WAL) cut() error |
關閉當前文件並創建一個新的文件用於追加記錄:首先移動到鎖表最後一個wal文件的當前位置截斷文件然後執行sync,調用FilePipeline對象的Open方法創建一個新文件並加入鎖表,首先保存舊的encoder的crc,然後創建新的encoder對象替換舊的encoder對象,保存頭信息crc、metadata和hardstate,原子重命名文件之前先執行sync和保存當前位置偏移,重命名後對WAL對象的dirFile執行fsync持久化wal目錄的變化。關閉文件重新以LockFile方式打開文件並seek到文件末尾,替換鎖表尾文件,再次進行新舊encoder替換。 |
func (w *WAL) tail() *fileutil.LockedFile |
WAL對象的鎖表如果不為空,返回最後一個上鎖文件,否則返回空。 |
func (w *WAL) sync() error |
如果encoder存在,則將encoder pageWriter緩衝區的數據寫入,鎖表尾文件執行fdatasync,將fdatasync延時上報監控prometheus。 |
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) |
尋找所有wal文件中snapshot條目,有效的snapshot條目index必須小於等於最新的hardstate。步驟:找到目錄下所有帶有合法名稱的wal文件名,以只讀模式打開這些wal文件,根據以讀模式打開的這些wal文件創建decoder,迴圈解碼每個文件的record:若為snapshotType,追加到snaps中;若為stateType,更新hardstate;若為crcType(wal文件開頭),驗證是否和decoder.crc相同(上一個文件末尾的crc)。返回所有index小於最新hardstate.Commit的walpb.snap條目。 |
func readWALNames(lg *zap.Logger, dirpath string) ([]string, error) |
從指定目錄讀取所有wal文件name,並檢查name合法性(.wal結尾) |
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) |
根據write標誌選擇以讀模式還是寫模式打開文件,步驟:從nameIndex指定的索引開始打開文件。若寫模式:打開上鎖的wal文件,將該文件添加到鎖表、文件關閉表、讀文件表;若讀模式:以os.O_RDONLY打開文件,將該文件添加到文件關閉表、讀文件表,添加nil到鎖表(鎖表只在寫模式下用到)。 |
func Open(lg zap.Logger, dirpath string, snap walpb.Snapshot) (WAL, error) |
寫模式調用openAtIndex。Open opens the WAL at the given snap,The returned WAL is ready to read and the first record will be the one after the given snap. The WAL cannot be appended to before reading out all of its previous records. |
func OpenForRead(lg zap.Logger, dirpath string, snap walpb.Snapshot) (WAL, error) |
讀模式調用openAtIndex。 |
func openAtIndex(lg zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (WAL, error) |
遍歷wal目錄,找到snap所在位置index,依次打開index後續所有的wal文件並加鎖(調用openWALFiles),創建WAL對象並設定解碼器、readClose(調用closeAll關閉已打開文件)和鎖表,若寫模式:readClose置空(寫模式下還要繼續對wal文件進行append操作,等到讀完後不用進行關閉操作),測試鎖表最後一個上鎖文件是否是合法wal文件(通過是否符合命名規範判斷),如果不合法,關閉所有文件返回錯誤,否則,設定FilePipeline對象(大小超過64M時用於截斷並切換到新文件),返回WAL對象 。 |
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) |
讀取WAL對象的所有記錄 ,對於不同類型記錄做不同處理: 判斷鎖表尾文件如為空(讀模式):如果不是讀到EOF或ErrUnexpectedEOF則重置state返回;對於寫模式,如果err不是EOF,重置狀態返回,然後鎖表尾文件Seek到lastOffset位置, 將後續內容清零(目的是處理遇到0記錄後接非0記錄時,非0記錄又沒有被全部重寫,再次打開的時候會出現 CRC錯誤,由於數據從不會一開始就完全同步到磁碟,因此進行清零操作是安全的 ?暫時沒懂),然後判斷snapshot是否匹配,關閉decoder實現禁讀,重置WAL對象的start為一個空snapshot對象,創建encoder並將decoder設空,返回metadata,state,ents和err。 |