Go實現海量日誌收集系統(四)

来源:https://www.cnblogs.com/zhaof/archive/2018/04/26/8948516.html
-Advertisement-
Play Games

到這一步,我的收集系統就已經完成很大一部分工作,我們重新看一下我們之前畫的圖: 我們已經完成前面的部分,剩下是要完成後半部分,將kafka中的數據扔到ElasticSearch,並且最終通過kibana展現出來 ElasticSearch 官網地址這裡介紹了非常詳細的安裝方法:https://www ...


到這一步,我的收集系統就已經完成很大一部分工作,我們重新看一下我們之前畫的圖:

我們已經完成前面的部分,剩下是要完成後半部分,將kafka中的數據扔到ElasticSearch,並且最終通過kibana展現出來

ElasticSearch

官網地址這裡介紹了非常詳細的安裝方法:
https://www.elastic.co/downloads/elasticsearch

但是其實這裡是需要配置一些東西的,要不然直接啟動是會悲劇的,在網上找了一個地址,如果出現類似的錯誤直接處理就行,我自己已經驗證了:
https://blog.csdn.net/liangzhao_jay/article/details/56840941

如下圖所示就表示已經安裝完成:

 

 

 

 

 

 通過go寫一個簡單的調用ElasticSearch的例子:

package main

import (
    "fmt"
    elastic "gopkg.in/olivere/elastic.v2"
)

type Tweet struct{
    User string
    Message string
}

func main(){
    client,err := elastic.NewClient(elastic.SetSniff(false),elastic.SetURL("http://192.168.0.118:9200/"))
    if err != nil{
        fmt.Println("connect es error",err)
        return
    }
    fmt.Println("conn es succ")
    tweet := Tweet{User:"olivere name",Message:"Take Five"}
    _, err = client.Index().Index("twitter").Type("tweet").Id("1").BodyJson(tweet).Do()
    if err != nil {
        panic(err)
        return
    }
    fmt.Println("insert succ")
}

logtransfer

logtransfer主要負責從 kafka隊列中讀取日誌信息,並且添加到ElasticSearch中

看那一下logtransfer 目錄結構如下:

├── conf
│   └── app.conf
├── es.go
├── etcd.go
├── ip.go
├── kafka.go
├── logs
│   └── transfer.log
└── main.go

conf:存放配置文件
es.go:主要是連接ElasticSearch的部分以及用於將消息放到ElasticSearch中
etcd.go:主要用於做動態的配置更改,當我們需要將kafka中的哪些topic日誌內容扔到ElasticSearch中
ip.go: 用於獲取當前伺服器的ip地址
kafka.go: 主要是kafka的處理邏輯,包括連接kafka以及從kafka中讀日誌內容
main.go:代碼的入口函數

整體大代碼框架,通過如圖展示:

和之前的logagent中的代碼有很多啟示是可以復用的或者稍作更改,就可以了,其中es之心的,主要是連接ElasticSearch並將日誌內容放進去

es.go的代碼內容為:

package main

import (
    "gopkg.in/olivere/elastic.v2"
    "github.com/astaxie/beego/logs"
    "sync"
    "encoding/json"
)

var waitGroup sync.WaitGroup

var client *elastic.Client

func initEs(addr string,) (err error){
    client,err = elastic.NewClient(elastic.SetSniff(false),elastic.SetURL(addr))
    if err != nil{
        logs.Error("connect to es error:%v",err)
        return
    }
    logs.Debug("conn to es success")
    return
}

func reloadKafka(topicArray []string) {
    for _, topic := range topicArray{
        kafkaMgr.AddTopic(topic)
    }
}

func reload(){
    //GetLogConf() 從channel中獲topic信息,而這部分信息是從etcd放進去的
    for conf := range GetLogConf(){
        var topicArray []string
        err := json.Unmarshal([]byte(conf),&topicArray)
        if err != nil {
            logs.Error("unmarshal failed,err:%v conf:%v",err,conf)
            continue
        }
        reloadKafka(topicArray)
    }
}

func Run(esThreadNum int) (err error) {
    go reload()
    for i:=0;i<esThreadNum;i++{
        waitGroup.Add(1)
        go sendToEs()
    }
    waitGroup.Wait()
    return
}

type EsMessage struct {
    Message string
}

func sendToEs(){
    // 從msgChan中讀取日誌內容並扔到elasticsearch中
    for msg:= range GetMessage() {
        var esMsg EsMessage
        esMsg.Message = msg.line
        _,err := client.Index().Index(msg.topic).Type(msg.topic).BodyJson(esMsg).Do()
        if err != nil {
            logs.Error("send to es failed,err:%v",err)
            continue
        }
        logs.Debug("send to es success")
    }
    waitGroup.Done()
}

最終我將logagnet以及logtransfer部署到虛擬機上進行測試的效果是:

 

 

這樣當我再次查日誌的時候就可以不用登陸每台伺服器去查日誌,只需要通過頁面根據關鍵字迅速看到相關日誌,當然目前實現的功能還是有點粗糙,etcd的更改程式,是自己寫的發送程式,其實更好的解決方法是通過頁面,讓用戶點來點去,來控制自己要收集哪些日誌,以及自己要將哪些topic的日誌從kafka中放到ElasticSearch (本人是做後端開發,不擅長前端的開發,不過後面可以試著寫個頁面試試,估計會很醜哈哈)

同時這裡關於各個部分的安裝並沒有做過多的介紹,以及維護,當然我們的目標是是通過這些開源的軟體以及包來實現我們想要的功能,後期的維護,肯定需要對各個組件部分都進行深入瞭解

這裡附贈一下那個etcd客戶端代碼:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "golang.org/x/net/context"
)

var logconf = `
[
    {
        "topic":"eslservice_log",
        "log_path":"/opt/pbx/log/eslservice.log",
        "service":"eslservice",
        "send_rate":50000
    }
]
`

var test111 = `
[
    {
        "topic":"test_log",
        "log_path":"D:/a.log",
        "service":"test",
        "send_rate":50000
    }
]
`


var transconf = `
[
    "eslservice_log"
]
`

func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.90.78:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect success")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    //_,err = cli.Put(ctx,"/logagent/192.168.90.11/log_config",logconf)
    //_,err = cli.Put(ctx,"/logagent/192.168.90.61/log_config",test111)
    _, err = cli.Put(ctx,"/logtransfer/192.168.90.11/log_config",transconf)
    cancel()
    if err != nil {
        fmt.Println("put failed ,err:",err)
        return
    }
    ctx,cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"/logtransfer/",clientv3.WithPrefix())
    cancel()
    if err != nil {
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev:=range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
    }
}

到目前為止基本的功能都已經實現了,當然了現在的代碼結構還有的糙,後面會進行優化!
整個項目中的代碼:
logagent代碼地址:https://github.com/pythonsite/logagent
logtransfer代碼地址:https://github.com/pythonsite/logtransfer

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在最近移動端項目中用到了vux,感覺用著還習慣,當把vux使用到PC端的時候出現了IE瀏覽器出現,這樣的錯誤信息: CSS3114: @font-face 未能完成 OpenType 嵌入許可權檢查。許可權必須是可安裝的。 文件: UwCtGsNCf5NCQ0N.... 然後在IE瀏覽器頁面中的字體圖標 ...
  • 本文內容: header nav article footer section aside datalist 音頻標簽: audio 視頻標簽: video 插入媒體標簽: embed 新增input屬性 首發日期:2018-04-25 header 功能:header標簽定義頁面的頁眉信息。【主要... ...
  • ng zorro Carousel 走馬燈的左右方向控制項實現 ng zorro框架的走馬燈本身還沒有左右方向控制項的實現,作者只是在文檔中(0.6x)中曝出幾個方法介面,如圖: 實現: 在根component中引入NzCarouselComponent 組件 在需要引用carousel的父組件中引入N ...
  • package.json webpack.config.js 的簡單配置 ...
  • 先說下自己開發的實例。 最近在使用 Spring Cloud Config 做分散式配置中心(基於 SVN/Git), 當所有服務啟動後,SVN/Git 中的配置文件更改後,客戶端服務讀取的還是舊的配置,並不能實時讀取(配置信息會緩存在客戶端) ,Spring Boot 提供了一種方式進行更新(通過 ...
  • 手把手教你寫網路爬蟲(7) 作者:拓海 摘要:從零開始寫爬蟲,初學者的速成指南! 封面: 本期我們來聊聊URL去重那些事兒。以前我們曾使用Python的字典來保存抓取過的URL,目的是將重覆抓取的URL去除,避免多次抓取同一網頁。爬蟲會將待抓取的URL放在todo隊列中,從抓取到的網頁中提取到新的U ...
  • 題目: You are given two non-empty linked lists representing two non-negative integers. The digits are stored in reverse order and each of their nodes co ...
  • 簡單的建立一個後臺項目 新建servlet: 內容如下: web.xml 前端代碼: 運行前後臺項目,出現異常情況 在查看ajax的發送情況 從這裡可以看到結果是正確返回的;並且後臺也是正常執行了。 因此得出結論: 跨域是瀏覽器在aja返回結果的時候進行了攔截,先執行,後判斷,不是後臺不允許跨域; ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...