Go實現海量日誌收集系統(二)

来源:https://www.cnblogs.com/zhaof/archive/2018/03/30/8673420.html
-Advertisement-
Play Games

一篇文章主要是關於整體架構以及用到的軟體的一些介紹,這一篇文章是對各個軟體的使用介紹,當然這裡主要是關於架構中我們agent的實現用到的內容 關於zookeeper+kafka 我們需要先把兩者啟動,先啟動zookeeper,再啟動kafka啟動ZooKeeper:./bin/zkServer.sh ...


一篇文章主要是關於整體架構以及用到的軟體的一些介紹,這一篇文章是對各個軟體的使用介紹,當然這裡主要是關於架構中我們agent的實現用到的內容

關於zookeeper+kafka

我們需要先把兩者啟動,先啟動zookeeper,再啟動kafka
啟動ZooKeeper:./bin/zkServer.sh start
啟動kafka:./bin/kafka-server-start.sh ./config/server.properties 

操作kafka需要安裝一個包:go get github.com/Shopify/sarama
寫一個簡單的代碼,通過go調用往kafka里扔數據:

package main

import (
    "github.com/Shopify/sarama"
    "fmt"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    msg := &sarama.ProducerMessage{}
    msg.Topic = "nginx_log"
    msg.Value = sarama.StringEncoder("this is a good test,my message is zhaofan")
    client,err := sarama.NewSyncProducer([]string{"192.168.0.118:9092"},config)
    if err != nil{
        fmt.Println("producer close err:",err)
        return
    }
    defer client.Close()

    pid,offset,err := client.SendMessage(msg)
    if err != nil{
        fmt.Println("send message failed,",err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n",pid,offset)
}

config.Producer.RequiredAcks = sarama.WaitForAll 這裡表示是在給kafka扔數據的時候是否需要確認收到kafka的ack消息

msg.Topic = "nginx_log" 因為kafka是一個分散式系統,假如我們要讀的是nginx日誌,apache日誌,我們可以根據topic做區分,同時也是我們也可以有不同的分區

我們將上述代碼執行一下,就會往kafka中扔一條消息,可以通過kakfa中自帶的消費者命令查看:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nginx_log --from-beginning

 

我們可以將最後的代碼稍微更改一下,更改為迴圈發送:

for{
    pid,offset,err := client.SendMessage(msg)
    if err != nil{
        fmt.Println("send message failed,",err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n",pid,offset)
    time.Sleep(2*time.Second)
}

這樣當我們再次執行的程式的時候,我們可以看到客戶端在不停的消費到數據:

這樣我們就實現一個kakfa的生產者的簡單的demo

接下來我們還需要知道一個工具的使用tailf

tailf

我們的agent需要讀日誌目錄下的日誌文件,而日誌文件是不停的增加並且切換文件的,所以我們就需要藉助於tailf這個包來讀文件,當然這裡的tailf和linux里的tail -f命令雖然不同,但是效果是差不多的,都是為了獲取日誌文件新增加的內容。

而我們的客戶端非常重要的一個地方就是要讀日誌文件並且將讀到的日誌文件推送到kafka

這裡需要我們下載一個包:go get github.com/hpcloud/tail

我們通過下麵一個例子對這個包進行一個基本的使用,更詳細的api說明看:https://godoc.org/github.com/hpcloud/tail

package main

import (
    "github.com/hpcloud/tail"
    "fmt"
    "time"
)

func main() {
    filename := "/Users/zhaofan/go_project/src/go_dev/13/tailf/my.log"
    tails,err := tail.TailFile(filename,tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    })

    if err !=nil{
        fmt.Println("tail file err:",err)
        return
    }

    var msg *tail.Line
    var ok bool
    for true{
        msg,ok = <-tails.Lines
        if !ok{
            fmt.Printf("tail file close reopen,filenam:%s\n",tails,filename)
            time.Sleep(100*time.Millisecond)
            continue
        }
        fmt.Println("msg:",msg.Text)
    }
}

最終實現的效果是當你文件裡面添加內容後,就可以不斷的讀取文件中的內容

日誌庫的使用

這裡是通過beego的日誌庫實現的,beego的日誌庫是可以單獨拿出來用的,還是非常方便的,使用例子如下:

 

package main

import (
    "github.com/astaxie/beego/logs"
    "encoding/json"
    "fmt"
)

func main() {
    config := make(map[string]interface{})
    config["filename"] = "/Users/zhaofan/go_project/src/go_dev/13/log/logcollect.log"
    config["level"] = logs.LevelTrace
    configStr,err := json.Marshal(config)
    if err != nil{
        fmt.Println("marshal failed,err:",err)
        return
    }
    logs.SetLogger(logs.AdapterFile,string(configStr))
    logs.Debug("this is a debug,my name is %s","stu01")
    logs.Info("this is a info,my name is %s","stu02")
    logs.Trace("this is trace my name is %s","stu03")
    logs.Warn("this is a warn my name is %s","stu04")
}

簡單版本logagent的實現

這裡主要是先實現核心的功能,後續再做優化和改進,主要實現能夠根據配置文件中配置的日誌路徑去讀取日誌並將讀取的實時推送到kafka消息隊列中

關於logagent的主要結構如下:

 

程式目錄結構為:

├── conf
│   └── app.conf
├── config.go
├── kafka.go
├── logs
│   └── logcollect.log
├── main.go
└── server.go

app.conf :配置文件
config.go:用於初始化讀取配置文件中的內容,這裡的配置文件載入是通過之前自己實現的配置文件熱載入包處理的,博客地址:http://www.cnblogs.com/zhaof/p/8593204.html
logcollect.log:日誌文件
kafka.go:對kafka的操作,包括初始化kafka連接,以及給kafka發送消息
server.go:主要是tail 的相關操作,用於去讀日誌文件並將內容放到channel中

所以這裡我們主要的代碼邏輯或者重要的代碼邏輯就是kafka.go 以及server.go

kafka.go代碼內容為:

// 這裡主要是kafak的相關操作,包括了kafka的初始化,以及發送消息的操作
package main

import (
    "github.com/Shopify/sarama"
    "github.com/astaxie/beego/logs"
)

var (
    client sarama.SyncProducer
    kafkaSender *KafkaSender
)

type KafkaSender struct {
    client sarama.SyncProducer
    lineChan chan string
}

// 初始化kafka
func NewKafkaSender(kafkaAddr string)(kafka *KafkaSender,err error){
    kafka = &KafkaSender{
        lineChan:make(chan string,100000),
    }
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    client,err := sarama.NewSyncProducer([]string{kafkaAddr},config)
    if err != nil{
        logs.Error("init kafka client failed,err:%v\n",err)
        return
    }
    kafka.client = client
    for i:=0;i<appConfig.KafkaThreadNum;i++{
        // 根據配置文件迴圈開啟線程去發消息到kafka
        go kafka.sendToKafka()
    }
    return
}

func initKafka()(err error){
    kafkaSender,err = NewKafkaSender(appConfig.kafkaAddr)
    return
}

func (k *KafkaSender) sendToKafka(){
    //從channel中讀取日誌內容放到kafka消息隊列中
    for v := range k.lineChan{
        msg := &sarama.ProducerMessage{}
        msg.Topic = "nginx_log"
        msg.Value = sarama.StringEncoder(v)
        _,_,err := k.client.SendMessage(msg)
        if err != nil{
            logs.Error("send message to kafka failed,err:%v",err)
        }
    }
}

func (k *KafkaSender) addMessage(line string)(err error){
    //我們通過tailf讀取的日誌文件內容先放到channel裡面
    k.lineChan <- line
    return
}

server.go的代碼為:

package main

import (
    "github.com/hpcloud/tail"
    "fmt"
    "sync"
    "github.com/astaxie/beego/logs"
    "strings"
)

type TailMgr struct {
    //因為我們的agent可能是讀取多個日誌文件,這裡通過存儲為一個map
    tailObjMap map[string]*TailObj
    lock sync.Mutex
}

type TailObj struct {
    //這裡是每個讀取日誌文件的對象
    tail *tail.Tail
    offset int64  //記錄當前位置
    filename string
}

var tailMgr *TailMgr
var waitGroup sync.WaitGroup

func NewTailMgr()(*TailMgr){
    tailMgr =  &TailMgr{
        tailObjMap:make(map[string]*TailObj,16),
    }
    return tailMgr
}

func (t *TailMgr) AddLogFile(filename string)(err error){
    t.lock.Lock()
    defer t.lock.Unlock()
    _,ok := t.tailObjMap[filename]
    if ok{
        err = fmt.Errorf("duplicate filename:%s\n",filename)
        return
    }
    tail,err := tail.TailFile(filename,tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    })

    tailobj := &TailObj{
        filename:filename,
        offset:0,
        tail:tail,
    }
    t.tailObjMap[filename] = tailobj
    return
}

func (t *TailMgr) Process(){
    //開啟線程去讀日誌文件
    for _, tailObj := range t.tailObjMap{
        waitGroup.Add(1)
        go tailObj.readLog()
    }
}

func (t *TailObj) readLog(){
    //讀取每行日誌內容
    for line := range t.tail.Lines{
        if line.Err != nil {
            logs.Error("read line failed,err:%v",line.Err)
            continue
        }
        str := strings.TrimSpace(line.Text)
        if len(str)==0 || str[0] == '\n'{
            continue
        }

        kafkaSender.addMessage(line.Text)
    }
    waitGroup.Done()
}


func RunServer(){
    tailMgr = NewTailMgr()
    // 這一部分是要調用tailf讀日誌文件推送到kafka中
    for _, filename := range appConfig.LogFiles{
        err := tailMgr.AddLogFile(filename)
        if err != nil{
            logs.Error("add log file failed,err:%v",err)
            continue
        }

    }
    tailMgr.Process()
    waitGroup.Wait()
}

可以整體演示一下代碼實現的效果,當我們運行程式之後我配置文件配置的目錄為:
log_files=/app/log/a.log,/Users/zhaofan/a.log
我通過一個簡單的代碼對對a.log迴圈追加內容,你可以從kafka的客戶端消費力看到內容了:

完成的代碼地址:https://github.com/pythonsite/logagent

小結

這次只是實現logagent的核心功能,實現了從日誌文件中通過多個線程獲取要讀的日誌內容,這裡藉助了tailf,並將獲取的內容放到channel中,kafka.go會從channel中取出日誌內容並放到kafka的消息隊列中
這裡並沒有做很多細緻的處理,下一篇文章會在這個代碼的基礎上進行改進。同時現在的配置文件的方式也不是最佳的,每次改動配置文件都需要重新啟動程式,後面將通過etcd的方式。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 先放在這裡,下班再寫 ...
  • 情景: 父組件中引入上傳附件的子組件:點擊組件可以分別上傳對應要求的圖片,子組件內部迴圈可創建多個模塊. 父組件傳入數組子組件迴圈來創建不同的組件模塊,所有事件都在子組件內部. 父組件頁面的上方同時有一個上傳圖片按鈕上傳圖片後會顯示在第一個模塊: 設想思路:點擊父組件中的按鈕觸發子組件中上傳方法: ...
  • 控制反轉( IoC)和依賴註入(DI) tags: 容器 依賴註入 IOC DI 控制反轉 引言:如果你看過一些框架的源碼或者手冊,像是laravel或者tp5之類的,應該會提到容器,依賴註入,控制反轉等辭彙。或者是某些面試官會問到這類問題。希望這篇文章能讓你有所收穫。 1.1、IoC(控制反轉 I ...
  • 組合模式為了描述分支包含關係,也就是我們說的樹形關係,其對象分為枝和葉,每一枝可包含枝和葉,直到全部為葉節點。我們對枝和葉進行行為抽象,可認為枝和葉都是Component,而葉是最小的操作單元,其下不存在枝和葉,而枝作為Composite裡面存有其下枝和葉的組件列表。 作用 將對象組合成樹形結構以表 ...
  • 簡單來說就是暫停的意思,一般在LINUX編程時會用到,等待接收信號,才會重新運行 。 在進行C/C++編程的時候,在運行程式查看輸出效果時,會出現視窗閃一下就關閉的情況。 在C語言中一般通過添加getchar(); 在C++中一般在main函數中的return之前添加system("pause"); ...
  • ssh實現遠程登陸一般有兩種方式,一種就是用戶密碼登陸,另一種是密鑰登陸(當然預設是要服務端打開ssh服務)。 我這裡使用這兩種方法操作一下遠程登陸,測試客戶端是本機的root與jeff用戶,遠程連接我的阿裡雲伺服器。 用戶及密碼登陸 root為服務端用戶,輸入帳號密碼後,即登陸阿裡雲伺服器。 密鑰 ...
  • 很早之前學過python2.7版本,後來準備考研好久沒用了,最近先瀏覽一遍Python基礎教程,總結一下前七章,總體作為一個回憶引子 python中的一切皆為對象,每個對象都有兩種屬性,第一種是用戶定義的屬性;第二種是和對象有關的python內部屬性通常以__attr__的形式出現 1.幾個常用方法 ...
  • 作者:匿名用戶鏈接:https://www.zhihu.com/question/52992079/answer/156294774來源:知乎著作權歸作者所有。商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。 (sklearn官方指南:Choosing the right estimator) 0 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...