go 源碼學習之---Tail 源碼分析

来源:https://www.cnblogs.com/zhaof/archive/2018/09/17/9663350.html
-Advertisement-
Play Games

已經有兩個月沒有寫博客了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大數據相關的內容,導致最近工作較忙,博客停止了更新,正好想撿起之前go的東西,所以找了一個源碼學習 這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就 ...


已經有兩個月沒有寫博客了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大數據相關的內容,導致最近工作較忙,博客停止了更新,正好想撿起之前go的東西,所以找了一個源碼學習

這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就學習一下人家的源碼,為了方便看這個代碼,我將這個包進行了簡化,也是用於方便理解,代碼放到了:https://github.com/pythonsite/tail, 這個代碼包可能無法正常用,只是為了方面理解tail這個包,以及學習人家的代碼

精簡後的代碼目錄

tail.go
│
└─watch
        filechanges.go
        inotify.go
        inotify_tracker.go
        watch.go

tail.go: 這裡包含著tail包的核心代碼,主要的邏輯處理時在這個裡面

watch: 這個包主要用於對文件的監控,用於將文件的變化通知到tail.如:文件修改了,文件刪除了,文件內容追加了

 

tail.go 代碼分析

在tail.go中主要有幾下幾個結構體:

// Line 結構體用於存讀每行的時候的對象
type Line struct {
    Text string            //當前行的內容
    Time time.Time        // 時間
    Err  error             // Error from tail
}

type SeekInfo struct {
    Offset         int64
    Whence        int    
}

// 關於配置的結構體
type Config struct {
    Location     *SeekInfo
    ReOpen        bool
    MustExist    bool        // 要打開的文件是否必須存在
    Poll        bool

    Pipe        bool

    Follow        bool        // 是否繼續讀取新的一行,可以理解為tail -f 命令
    
}

// 核心的結構體Tail
type Tail struct {
    Filename     string                    // 要打開的文件名
    Lines        chan *Line                // 用於存每行內容的Line結構體

    Config

    watcher     watch.FileWatcher
    changes     *watch.FileChanges

    tomb.Tomb    

    file         *os.File
    reader         *bufio.Reader
    lk sync.Mutex
}
Line 結構體用於存讀取文件的每行內容 Tail 是核心的結構體,我們使用tail這個包的時候其實就是會先調用初始化這個struct的方法TailFile,如我在寫日誌收集的時候的使用:
    tail,err := tail.TailFile(conf.LogPath,tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    })

既然我們使用的時候就會在最開始的時候調用tail.TailFile方法,就直接看這個方法:

// 主要用於Tail結構體的初始化
func TailFile(filename string, config Config) (*Tail, error) {
    t := &Tail {
        Filename:    filename,
        Lines:        make(chan *Line),
        Config:        config,
    }
    t.watcher = watch.NewInotifyFileWatcher(filename)
    if t.MustExist {
        var err error
        t.file, err = OpenFile(t.Filename)
        if err != nil {
            return nil, err
        }
    }
    go t.tailFileSync()

    return t, nil
}

從這個代碼里我們就可以看到它首先初始化了Tail結構體並且對Tail中的watcher進行的複製,先暫時不看watch相關的內容

然後就是關於文件是否必須存在的判斷處理,最後開啟了一個一個線程執行tailFileSync()方法,我們接著看tailFileSync方法

func (tail *Tail) tailFileSync(){
    defer tail.Done()
    defer tail.close()

    if !tail.MustExist {
        err := tail.reopen()
        if err != nil {
            if err != tomb.ErrDying {
                tail.Kill(err)
            }
            return
        }
    }

    tail.openReader()

    var offset int64
    var err error
    // 一行行讀文件內容
    for {

        if !tail.Pipe {
            offset,err = tail.Tell()
            if err != nil {
                tail.Kill(err)
                return
            }
        }

        line, err := tail.readLine()
        if err == nil {
            // 將讀取的一行內容放到chan中
            tail.sendLine(line)
        } else if err == io.EOF {
            // 表示讀到文件的最後了
            // 如果Follow 設置為false的話就不會繼續讀文件
            if !tail.Follow {
                if line != "" {
                    tail.sendLine(line)
                }
                return
            }
            // 如果Follow設置為True則會繼續讀
            if tail.Follow && line != "" {
                err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
                if err != nil {
                    tail.Kill(err)
                    return
                }
            }
            // 如果讀到文件最後,文件並沒有新的內容增加
            err := tail.waitForChanges()
            if err != nil {
                if err != ErrStop {
                    tail.Kill(err)
                }
                return
            }


        } else {
            // 既不是文件結尾,也沒有error
            tail.Killf("error reading %s :%s", tail.Filename, err)
            return
        }


        select {
        case <- tail.Dying():
            if tail.Err() == errStopAtEOF {
                continue
            }
            return
        default:    
        }
    }
}

這個方法里主要是先調用了openReader方法,這個方法其實並沒有做什麼,只是對tail.reqader進行了賦值:tail.reader = bufio.NewReader(tail.file)

接著就是迴圈一行行的讀文件

在迴圈里最開始判斷了tail.Pipe的值,這個值一般開始我也並不會設置,所以預設就是false,所以就會執行tail.Tell()方法,這個方法主要是用於獲取文件當前行的位置信息,下麵是Tell的代碼內容:

// 獲取文件當前行的位置信息
func (tail *Tail) Tell()(offset int64, err error) {
    if tail.file == nil {
        return
    }
    offset, err = tail.file.Seek(0, os.SEEK_CUR)
    if err != nil {
        return
    }
    tail.lk.Lock()
    defer tail.lk.Unlock()
    if tail.reader == nil {
        return
    }
    offset -= int64(tail.reader.Buffered())
    return
}

接著會調用tail.readLine()方法,這個方法就是用於獲取文件的一行內容,同時將一行內容實例化為Line對象,然後扔到管道tail.Lines中

//將讀取的文件的每行內容存入到Line結構體中,並最終存入到tail.Lines的chan中
func (tail *Tail) sendLine(line string) bool {
    now := time.Now()
    lines := []string{line}

    for _, line := range lines {
        tail.Lines <- &Line {
            line,
            now,
            nil,
        }
    }
    return true
}

最後的大量if 判斷其實主要是針對讀到文件末尾後的一些操作,

Tail結構體在最後定義的時候有一個參數:Follow, 這個參數的目的就是當讀到文件最後的時候是否繼續讀文件, 如果最開始設置了false,那麼讀到最後之後就不會在讀文件了

如果設置為True,那麼讀到文件最後之後會保存文件的位置信息,並執行waitForChanges() 去等待文件的變化,waitForChanges()代碼內容如下:

// 等待文件的變化事件
func (tail *Tail) waitForChanges() error {
    if tail.changes == nil {
        // 這裡是獲取文件指針的當前位置
        pos, err := tail.file.Seek(0,os.SEEK_CUR)
        if err != nil {
            return err
        }
        tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
        if err != nil {
            return err
        }
    }
    // 和inotify中進行很巧妙的配合,這裡通過select 來進行查看那個chan變化了,來知道文件的變化
    select {
    case <- tail.changes.Modified:            // 文件被修改
        return nil
    case <- tail.changes.Deleted:            // 文件被刪除或者移動到其他目錄
        tail.changes = nil
        // 如果文件被刪除或者被移動到其他目錄,則會嘗試重新打開文件
        if tail.ReOpen {
            fmt.Printf("Re-opening moved/deleted file %s...",tail.Filename)
            if err := tail.reopen();err != nil {
                return err
            }
            fmt.Printf("Successfully reopened %s", tail.Filename)
            tail.openReader()
            return nil
        } else {
            fmt.Printf("Stoping tail as file not longer exists: %s", tail.Filename)
            return ErrStop
        }
    case <- tail.changes.Truncated:            // 文件被追加新的內容
        fmt.Printf("Re-opening truncated file %s....", tail.Filename)
        if err := tail.reopen();err != nil {
            return err
        }
        fmt.Printf("SuccessFuly reopend truncated %s", tail.Filename)
        tail.openReader()
        return nil
    case <- tail.Dying():
        return nil
    }
    panic("unreachable")
}

看到這裡的時候其實就能感覺到,別人寫的代碼其實也並不是非常複雜,也是很普通的代碼,但是你會覺得人家很多地方用的非常巧妙,

這段代碼中主要的是的內容就是select部分,這個部分通過select監控

tail.changes.Modified tail.changes.Deleted tail.changes.Truncated

從而知道文件的變化,是修改了,還是刪除了,還是追加內容了,這幾個其實都是一個channel,這幾個channel中的內容是怎麼放進去的呢,接下來看watch包中的內容

watch包代碼分析

首先先看一下watch包中的watch.go,這個裡面其實就是定一個了一個FileWatcher的介面

type FileWatcher interface {
    BlockUntilExists(*tomb.Tomb) error

    ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error)
}

接著我們看一下inotify.go文件,這個裡面我們就可以看到定一個InotifyFileWatcher結構體,並且實現了FileWatcher 這個介面

type InotifyFileWatcher struct {
    Filename     string
    Size        int64
}

func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
    fw := &InotifyFileWatcher {
        filepath.Clean(filename),
        0,
    }
    return fw
}


// 關於文件改變事件的處理,當文件被修改了或者文件內容被追加了,進行通知
func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) {
    err := Watch(fw.Filename)
    if err != nil {
        return nil, err
    }

    changes := NewFileChanges()
    fw.Size = pos

    go func() {
        events := Events(fw.Filename)

        for {
            prevSize := fw.Size

            var evt fsnotify.Event
            var ok bool
            
            select {
            case evt, ok = <- events:
                if !ok {
                    RemoveWatch(fw.Filename)
                    return
                }
            case <- t.Dying():
                RemoveWatch(fw.Filename)
                return
            }
            switch {
            case evt.Op & fsnotify.Remove == fsnotify.Remove:
                fallthrough
            case evt.Op & fsnotify.Rename == fsnotify.Rename:
                RemoveWatch(fw.Filename)
                changes.NotifyDeleted()
                return

            case evt.Op & fsnotify.Chmod == fsnotify.Chmod:
                fallthrough
            case evt.Op & fsnotify.Write == fsnotify.Write:
                fi, err := os.Stat(fw.Filename)
                if err != nil {
                    // 文件如果被刪除了通知文件刪除到chan
                    if os.IsNotExist(err) {
                        RemoveWatch(fw.Filename)
                        changes.NotifyDeleted()
                        return
                    }

                }
                fw.Size = fi.Size()

                if prevSize > 0 && prevSize > fw.Size {
                    // 表示文件內容增加了
                    changes.NotifyTruncated()
                } else {
                    // 表示文件被修改了
                    changes.NotifyModified()
                }

                prevSize = fw.Size
            }

        }
    }()
    return changes, nil
}

func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
    err := WatchCreate(fw.Filename)
    if err != nil {
        return err
    }
    defer RemoveWatchCreate(fw.Filename)
    if _, err := os.Stat(fw.Filename);!os.IsNotExist(err) {
        return err
    }
    events := Events(fw.Filename)
    for {
        select {
        case evt, ok := <- events:
            if !ok {
                return fmt.Errorf("inotify watcher has been closed")
            }
            evtName, err := filepath.Abs(evt.Name)
            if err != nil {
                return err
            }
            fwFilename, err := filepath.Abs(fw.Filename)
            if err != nil {
                return err
            }
            if evtName == fwFilename {
                return nil
            }
        case <- t.Dying():
            return tomb.ErrDying
        }
    }
    panic("unreachable")
}

實現的介面就兩個方法:

ChangeEvents: 這個主要是監控文件的變化,是刪除了,還是被修改了,或者是文件,然後將狀態信息通過調用:changes.NotifyTruncated()或者 changes.NotifyDeleted() 或者changes.NotifyModified() 將狀態信息更新到channel中,這樣我們在分析tail.go 中最後的分析的那部分channel中的數據,就是在這裡 放進去的 BlockUntilExists:這個主要是關於文件不存在的時候,如果最開始的時候可以允許文件不存在,那麼就會 在這裡通過for迴圈一直等待,知道文件存在   再看看filechanges.go 文件,代碼內容如下:
type FileChanges struct {
    Modified  chan bool        // 修改
    Truncated chan bool        // 增加
    Deleted   chan bool        // 刪除
}

func NewFileChanges() *FileChanges {
    return &FileChanges{
        make(chan bool, 1),
        make(chan bool, 1),
        make(chan bool, 1),
    }
}

func (fc *FileChanges) NotifyModified() {
    sendOnlyIfEmpty(fc.Modified)
}

func (fc *FileChanges) NotifyTruncated() {
    sendOnlyIfEmpty(fc.Truncated)
}

func (fc *FileChanges) NotifyDeleted() {
    sendOnlyIfEmpty(fc.Deleted)
}

func sendOnlyIfEmpty(ch chan bool) {
    select {
    case ch <- true:
    default:
    }
}

在這個裡面也是可以學習到人家寫的這個地方非常巧妙,雖然談不上代碼高達上,但是看著會讓你很舒服,通過這個結構體,當文件被刪除,修改和增加的時候就會讓對應的channel中插入一個true,並且這裡

的channel都是不帶緩衝區的,只有當tail中觸發一次之後,channel中的內容就會被獲取出來,從而觸發tail繼續讀文件的內容

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 到現在為止,所有的教程項目都沒有使用Effects11框架類來管理資源。因為在D3DCompile API ( 47)版本中,如果你嘗試編譯fx_5_0的效果文件,會收到這樣的警告: 在未來的版本中,D3DCompiler可能會停止對FX11的支持,所以我們需要自行去管理各種特效,並改用HLS ...
  • 一、字元串格式化 利用一段註釋記錄想要輸出的字元串格式,並用 %s 、 %d 或 %f 依次代替要輸出的數據(%s代表字元串,%d代表數字,%f代表浮點數),然後在這段註釋之後依次加上要輸出的數據。 最後輸出的結果如下: 在這裡附帶一個可以防止輸入錯誤而報錯的方法 isdigit() 可以判斷變數的 ...
  • 我們前面文章介紹了迭代器和可迭代對象,這次介紹python的上下文管理。在python中實現了__enter__和__exit__方法,即支持上下文管理器協議。上下文管理器就是支持上下文管理器協議的對象,它是為了with而生。當with語句在開始運行時,會在上下文管理器對象上調用 __enter__ ...
  • Game Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others)Total Submission(s): 1340 Accepted Submission(s): 891 Problem Des ...
  • 通用的(泛型)演算法 之 只讀演算法,寫演算法,排序演算法 只讀演算法: | 函數名 | 功能描述 | | | | | accumulate | 求容器里元素的和 | | equal | 比較2個容器里的元素 | 寫演算法 | 函數名 | 功能描述 | | | | | fill | 用給定值,覆蓋給定的範圍的元 ...
  • Spring Boot 日誌篇 1、日誌框架(故事引入) 小張;開發一個大型系統; ​ 1、System.out.println("");將關鍵數據列印在控制台;去掉?寫在一個文件? ​ 2、框架來記錄系統的一些運行時信息;日誌框架 ; zhanglogging.jar; ​ 3、高大上的幾個功能? ...
  • 此系列將記錄本人從開始到結束做物料管理系統的過程 登錄界面的設計 此博客將實現如下界面: 當用戶名或密碼沒輸入時將顯示相應的提示信息,採用java swing實現 代碼: ...
  • 數組中有一個數字出現的次數超過數組長度的一半,請找出這個數字。例如輸入一個長度為9的數組{1,2,3,2,2,2,5,4,2}。由於數字2在數組中出現了5次,超過數組長度的一半,因此輸出2。如果不存在則輸出0。 兩種方式: 1.定義一個新數組arr,遍曆數組給arr賦值,arr[元素]=出現的次數 ... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...