已經有兩個月沒有寫博客了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大數據相關的內容,導致最近工作較忙,博客停止了更新,正好想撿起之前go的東西,所以找了一個源碼學習 這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就 ...
已經有兩個月沒有寫博客了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大數據相關的內容,導致最近工作較忙,博客停止了更新,正好想撿起之前go的東西,所以找了一個源碼學習
這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就學習一下人家的源碼,為了方便看這個代碼,我將這個包進行了簡化,也是用於方便理解,代碼放到了:https://github.com/pythonsite/tail, 這個代碼包可能無法正常用,只是為了方面理解tail這個包,以及學習人家的代碼
精簡後的代碼目錄
│ tail.go │ └─watch filechanges.go inotify.go inotify_tracker.go watch.go
tail.go: 這裡包含著tail包的核心代碼,主要的邏輯處理時在這個裡面
watch: 這個包主要用於對文件的監控,用於將文件的變化通知到tail.如:文件修改了,文件刪除了,文件內容追加了
tail.go 代碼分析
在tail.go中主要有幾下幾個結構體:
// Line 結構體用於存讀每行的時候的對象 type Line struct { Text string //當前行的內容 Time time.Time // 時間 Err error // Error from tail } type SeekInfo struct { Offset int64 Whence int } // 關於配置的結構體 type Config struct { Location *SeekInfo ReOpen bool MustExist bool // 要打開的文件是否必須存在 Poll bool Pipe bool Follow bool // 是否繼續讀取新的一行,可以理解為tail -f 命令 } // 核心的結構體Tail type Tail struct { Filename string // 要打開的文件名 Lines chan *Line // 用於存每行內容的Line結構體 Config watcher watch.FileWatcher changes *watch.FileChanges tomb.Tomb file *os.File reader *bufio.Reader lk sync.Mutex }Line 結構體用於存讀取文件的每行內容 Tail 是核心的結構體,我們使用tail這個包的時候其實就是會先調用初始化這個struct的方法TailFile,如我在寫日誌收集的時候的使用:
tail,err := tail.TailFile(conf.LogPath,tail.Config{ ReOpen:true, Follow:true, Location:&tail.SeekInfo{Offset:0,Whence:2}, MustExist:false, Poll:true, })
既然我們使用的時候就會在最開始的時候調用tail.TailFile方法,就直接看這個方法:
// 主要用於Tail結構體的初始化 func TailFile(filename string, config Config) (*Tail, error) { t := &Tail { Filename: filename, Lines: make(chan *Line), Config: config, } t.watcher = watch.NewInotifyFileWatcher(filename) if t.MustExist { var err error t.file, err = OpenFile(t.Filename) if err != nil { return nil, err } } go t.tailFileSync() return t, nil }
從這個代碼里我們就可以看到它首先初始化了Tail結構體並且對Tail中的watcher進行的複製,先暫時不看watch相關的內容
然後就是關於文件是否必須存在的判斷處理,最後開啟了一個一個線程執行tailFileSync()方法,我們接著看tailFileSync方法
func (tail *Tail) tailFileSync(){ defer tail.Done() defer tail.close() if !tail.MustExist { err := tail.reopen() if err != nil { if err != tomb.ErrDying { tail.Kill(err) } return } } tail.openReader() var offset int64 var err error // 一行行讀文件內容 for { if !tail.Pipe { offset,err = tail.Tell() if err != nil { tail.Kill(err) return } } line, err := tail.readLine() if err == nil { // 將讀取的一行內容放到chan中 tail.sendLine(line) } else if err == io.EOF { // 表示讀到文件的最後了 // 如果Follow 設置為false的話就不會繼續讀文件 if !tail.Follow { if line != "" { tail.sendLine(line) } return } // 如果Follow設置為True則會繼續讀 if tail.Follow && line != "" { err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0}) if err != nil { tail.Kill(err) return } } // 如果讀到文件最後,文件並沒有新的內容增加 err := tail.waitForChanges() if err != nil { if err != ErrStop { tail.Kill(err) } return } } else { // 既不是文件結尾,也沒有error tail.Killf("error reading %s :%s", tail.Filename, err) return } select { case <- tail.Dying(): if tail.Err() == errStopAtEOF { continue } return default: } } }
這個方法里主要是先調用了openReader方法,這個方法其實並沒有做什麼,只是對tail.reqader進行了賦值:tail.reader = bufio.NewReader(tail.file)
接著就是迴圈一行行的讀文件
在迴圈里最開始判斷了tail.Pipe的值,這個值一般開始我也並不會設置,所以預設就是false,所以就會執行tail.Tell()方法,這個方法主要是用於獲取文件當前行的位置信息,下麵是Tell的代碼內容:
// 獲取文件當前行的位置信息 func (tail *Tail) Tell()(offset int64, err error) { if tail.file == nil { return } offset, err = tail.file.Seek(0, os.SEEK_CUR) if err != nil { return } tail.lk.Lock() defer tail.lk.Unlock() if tail.reader == nil { return } offset -= int64(tail.reader.Buffered()) return }
接著會調用tail.readLine()方法,這個方法就是用於獲取文件的一行內容,同時將一行內容實例化為Line對象,然後扔到管道tail.Lines中
//將讀取的文件的每行內容存入到Line結構體中,並最終存入到tail.Lines的chan中 func (tail *Tail) sendLine(line string) bool { now := time.Now() lines := []string{line} for _, line := range lines { tail.Lines <- &Line { line, now, nil, } } return true }
最後的大量if 判斷其實主要是針對讀到文件末尾後的一些操作,
Tail結構體在最後定義的時候有一個參數:Follow, 這個參數的目的就是當讀到文件最後的時候是否繼續讀文件, 如果最開始設置了false,那麼讀到最後之後就不會在讀文件了
如果設置為True,那麼讀到文件最後之後會保存文件的位置信息,並執行waitForChanges() 去等待文件的變化,waitForChanges()代碼內容如下:
// 等待文件的變化事件 func (tail *Tail) waitForChanges() error { if tail.changes == nil { // 這裡是獲取文件指針的當前位置 pos, err := tail.file.Seek(0,os.SEEK_CUR) if err != nil { return err } tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos) if err != nil { return err } } // 和inotify中進行很巧妙的配合,這裡通過select 來進行查看那個chan變化了,來知道文件的變化 select { case <- tail.changes.Modified: // 文件被修改 return nil case <- tail.changes.Deleted: // 文件被刪除或者移動到其他目錄 tail.changes = nil // 如果文件被刪除或者被移動到其他目錄,則會嘗試重新打開文件 if tail.ReOpen { fmt.Printf("Re-opening moved/deleted file %s...",tail.Filename) if err := tail.reopen();err != nil { return err } fmt.Printf("Successfully reopened %s", tail.Filename) tail.openReader() return nil } else { fmt.Printf("Stoping tail as file not longer exists: %s", tail.Filename) return ErrStop } case <- tail.changes.Truncated: // 文件被追加新的內容 fmt.Printf("Re-opening truncated file %s....", tail.Filename) if err := tail.reopen();err != nil { return err } fmt.Printf("SuccessFuly reopend truncated %s", tail.Filename) tail.openReader() return nil case <- tail.Dying(): return nil } panic("unreachable") }
看到這裡的時候其實就能感覺到,別人寫的代碼其實也並不是非常複雜,也是很普通的代碼,但是你會覺得人家很多地方用的非常巧妙,
這段代碼中主要的是的內容就是select部分,這個部分通過select監控
tail.changes.Modified tail.changes.Deleted tail.changes.Truncated從而知道文件的變化,是修改了,還是刪除了,還是追加內容了,這幾個其實都是一個channel,這幾個channel中的內容是怎麼放進去的呢,接下來看watch包中的內容
watch包代碼分析
首先先看一下watch包中的watch.go,這個裡面其實就是定一個了一個FileWatcher的介面
type FileWatcher interface { BlockUntilExists(*tomb.Tomb) error ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error) }
接著我們看一下inotify.go文件,這個裡面我們就可以看到定一個InotifyFileWatcher結構體,並且實現了FileWatcher 這個介面
type InotifyFileWatcher struct { Filename string Size int64 } func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { fw := &InotifyFileWatcher { filepath.Clean(filename), 0, } return fw } // 關於文件改變事件的處理,當文件被修改了或者文件內容被追加了,進行通知 func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) { err := Watch(fw.Filename) if err != nil { return nil, err } changes := NewFileChanges() fw.Size = pos go func() { events := Events(fw.Filename) for { prevSize := fw.Size var evt fsnotify.Event var ok bool select { case evt, ok = <- events: if !ok { RemoveWatch(fw.Filename) return } case <- t.Dying(): RemoveWatch(fw.Filename) return } switch { case evt.Op & fsnotify.Remove == fsnotify.Remove: fallthrough case evt.Op & fsnotify.Rename == fsnotify.Rename: RemoveWatch(fw.Filename) changes.NotifyDeleted() return case evt.Op & fsnotify.Chmod == fsnotify.Chmod: fallthrough case evt.Op & fsnotify.Write == fsnotify.Write: fi, err := os.Stat(fw.Filename) if err != nil { // 文件如果被刪除了通知文件刪除到chan if os.IsNotExist(err) { RemoveWatch(fw.Filename) changes.NotifyDeleted() return } } fw.Size = fi.Size() if prevSize > 0 && prevSize > fw.Size { // 表示文件內容增加了 changes.NotifyTruncated() } else { // 表示文件被修改了 changes.NotifyModified() } prevSize = fw.Size } } }() return changes, nil } func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { err := WatchCreate(fw.Filename) if err != nil { return err } defer RemoveWatchCreate(fw.Filename) if _, err := os.Stat(fw.Filename);!os.IsNotExist(err) { return err } events := Events(fw.Filename) for { select { case evt, ok := <- events: if !ok { return fmt.Errorf("inotify watcher has been closed") } evtName, err := filepath.Abs(evt.Name) if err != nil { return err } fwFilename, err := filepath.Abs(fw.Filename) if err != nil { return err } if evtName == fwFilename { return nil } case <- t.Dying(): return tomb.ErrDying } } panic("unreachable") }
實現的介面就兩個方法:
ChangeEvents: 這個主要是監控文件的變化,是刪除了,還是被修改了,或者是文件,然後將狀態信息通過調用:changes.NotifyTruncated()或者 changes.NotifyDeleted() 或者changes.NotifyModified() 將狀態信息更新到channel中,這樣我們在分析tail.go 中最後的分析的那部分channel中的數據,就是在這裡 放進去的 BlockUntilExists:這個主要是關於文件不存在的時候,如果最開始的時候可以允許文件不存在,那麼就會 在這裡通過for迴圈一直等待,知道文件存在 再看看filechanges.go 文件,代碼內容如下:type FileChanges struct { Modified chan bool // 修改 Truncated chan bool // 增加 Deleted chan bool // 刪除 } func NewFileChanges() *FileChanges { return &FileChanges{ make(chan bool, 1), make(chan bool, 1), make(chan bool, 1), } } func (fc *FileChanges) NotifyModified() { sendOnlyIfEmpty(fc.Modified) } func (fc *FileChanges) NotifyTruncated() { sendOnlyIfEmpty(fc.Truncated) } func (fc *FileChanges) NotifyDeleted() { sendOnlyIfEmpty(fc.Deleted) } func sendOnlyIfEmpty(ch chan bool) { select { case ch <- true: default: } }
在這個裡面也是可以學習到人家寫的這個地方非常巧妙,雖然談不上代碼高達上,但是看著會讓你很舒服,通過這個結構體,當文件被刪除,修改和增加的時候就會讓對應的channel中插入一個true,並且這裡
的channel都是不帶緩衝區的,只有當tail中觸發一次之後,channel中的內容就會被獲取出來,從而觸發tail繼續讀文件的內容