到這一步,我的收集系統就已經完成很大一部分工作,我們重新看一下我們之前畫的圖: 我們已經完成前面的部分,剩下是要完成後半部分,將kafka中的數據扔到ElasticSearch,並且最終通過kibana展現出來 ElasticSearch 官網地址這裡介紹了非常詳細的安裝方法:https://www ...
到這一步,我的收集系統就已經完成很大一部分工作,我們重新看一下我們之前畫的圖:
我們已經完成前面的部分,剩下是要完成後半部分,將kafka中的數據扔到ElasticSearch,並且最終通過kibana展現出來
ElasticSearch
官網地址這裡介紹了非常詳細的安裝方法:
https://www.elastic.co/downloads/elasticsearch
但是其實這裡是需要配置一些東西的,要不然直接啟動是會悲劇的,在網上找了一個地址,如果出現類似的錯誤直接處理就行,我自己已經驗證了:
https://blog.csdn.net/liangzhao_jay/article/details/56840941
如下圖所示就表示已經安裝完成:
通過go寫一個簡單的調用ElasticSearch的例子:
package main import ( "fmt" elastic "gopkg.in/olivere/elastic.v2" ) type Tweet struct{ User string Message string } func main(){ client,err := elastic.NewClient(elastic.SetSniff(false),elastic.SetURL("http://192.168.0.118:9200/")) if err != nil{ fmt.Println("connect es error",err) return } fmt.Println("conn es succ") tweet := Tweet{User:"olivere name",Message:"Take Five"} _, err = client.Index().Index("twitter").Type("tweet").Id("1").BodyJson(tweet).Do() if err != nil { panic(err) return } fmt.Println("insert succ") }
logtransfer
logtransfer主要負責從 kafka隊列中讀取日誌信息,並且添加到ElasticSearch中
看那一下logtransfer 目錄結構如下:
├── conf
│ └── app.conf
├── es.go
├── etcd.go
├── ip.go
├── kafka.go
├── logs
│ └── transfer.log
└── main.go
conf:存放配置文件
es.go:主要是連接ElasticSearch的部分以及用於將消息放到ElasticSearch中
etcd.go:主要用於做動態的配置更改,當我們需要將kafka中的哪些topic日誌內容扔到ElasticSearch中
ip.go: 用於獲取當前伺服器的ip地址
kafka.go: 主要是kafka的處理邏輯,包括連接kafka以及從kafka中讀日誌內容
main.go:代碼的入口函數
整體大代碼框架,通過如圖展示:
和之前的logagent中的代碼有很多啟示是可以復用的或者稍作更改,就可以了,其中es之心的,主要是連接ElasticSearch並將日誌內容放進去
es.go的代碼內容為:
package main import ( "gopkg.in/olivere/elastic.v2" "github.com/astaxie/beego/logs" "sync" "encoding/json" ) var waitGroup sync.WaitGroup var client *elastic.Client func initEs(addr string,) (err error){ client,err = elastic.NewClient(elastic.SetSniff(false),elastic.SetURL(addr)) if err != nil{ logs.Error("connect to es error:%v",err) return } logs.Debug("conn to es success") return } func reloadKafka(topicArray []string) { for _, topic := range topicArray{ kafkaMgr.AddTopic(topic) } } func reload(){ //GetLogConf() 從channel中獲topic信息,而這部分信息是從etcd放進去的 for conf := range GetLogConf(){ var topicArray []string err := json.Unmarshal([]byte(conf),&topicArray) if err != nil { logs.Error("unmarshal failed,err:%v conf:%v",err,conf) continue } reloadKafka(topicArray) } } func Run(esThreadNum int) (err error) { go reload() for i:=0;i<esThreadNum;i++{ waitGroup.Add(1) go sendToEs() } waitGroup.Wait() return } type EsMessage struct { Message string } func sendToEs(){ // 從msgChan中讀取日誌內容並扔到elasticsearch中 for msg:= range GetMessage() { var esMsg EsMessage esMsg.Message = msg.line _,err := client.Index().Index(msg.topic).Type(msg.topic).BodyJson(esMsg).Do() if err != nil { logs.Error("send to es failed,err:%v",err) continue } logs.Debug("send to es success") } waitGroup.Done() }
最終我將logagnet以及logtransfer部署到虛擬機上進行測試的效果是:
這樣當我再次查日誌的時候就可以不用登陸每台伺服器去查日誌,只需要通過頁面根據關鍵字迅速看到相關日誌,當然目前實現的功能還是有點粗糙,etcd的更改程式,是自己寫的發送程式,其實更好的解決方法是通過頁面,讓用戶點來點去,來控制自己要收集哪些日誌,以及自己要將哪些topic的日誌從kafka中放到ElasticSearch (本人是做後端開發,不擅長前端的開發,不過後面可以試著寫個頁面試試,估計會很醜哈哈)
同時這裡關於各個部分的安裝並沒有做過多的介紹,以及維護,當然我們的目標是是通過這些開源的軟體以及包來實現我們想要的功能,後期的維護,肯定需要對各個組件部分都進行深入瞭解
這裡附贈一下那個etcd客戶端代碼:
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "golang.org/x/net/context" ) var logconf = ` [ { "topic":"eslservice_log", "log_path":"/opt/pbx/log/eslservice.log", "service":"eslservice", "send_rate":50000 } ] ` var test111 = ` [ { "topic":"test_log", "log_path":"D:/a.log", "service":"test", "send_rate":50000 } ] ` var transconf = ` [ "eslservice_log" ] ` func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.90.78:2371"}, DialTimeout:5*time.Second, }) if err != nil { fmt.Println("connect failed,err:",err) return } fmt.Println("connect success") defer cli.Close() ctx,cancel := context.WithTimeout(context.Background(),time.Second) //_,err = cli.Put(ctx,"/logagent/192.168.90.11/log_config",logconf) //_,err = cli.Put(ctx,"/logagent/192.168.90.61/log_config",test111) _, err = cli.Put(ctx,"/logtransfer/192.168.90.11/log_config",transconf) cancel() if err != nil { fmt.Println("put failed ,err:",err) return } ctx,cancel = context.WithTimeout(context.Background(),time.Second) resp,err := cli.Get(ctx,"/logtransfer/",clientv3.WithPrefix()) cancel() if err != nil { fmt.Println("get failed,err:",err) return } for _,ev:=range resp.Kvs{ fmt.Printf("%s:%s\n",ev.Key,ev.Value) } }
到目前為止基本的功能都已經實現了,當然了現在的代碼結構還有的糙,後面會進行優化!
整個項目中的代碼:
logagent代碼地址:https://github.com/pythonsite/logagent
logtransfer代碼地址:https://github.com/pythonsite/logtransfer