一 進程對象及其他方法 '''一臺電腦上面運行著很多進程,那麼電腦是如何區分並管理這些進程服務端的呢?電腦會給每一個運行的進程分配一個PID號如何查看 windows電腦 進入cmd輸入tasklist即可查看 tasklist|findstr PID查看具體的進程 linux電腦 進入終端之 ...
夜鶯初探四·mtail插件採集日誌指標
前言
上一篇介紹了Categraf的配置,這篇我們嘗試通過使用google開源的mtail工具來作為Categraf的插件,從應用日誌中提取指標數據。
mtail項目介紹和配置文件說明
通過mtail -h可以很方便看到參數詳細,也推薦喬克-從日誌中提取指標的瑞士軍刀或者Dream運維夢工廠-categraf-mtail日誌收集插件詳解來瞭解更多,我就不再班門弄斧了。
當然也可以通過官方來瞭解詳情新手村介紹 和高手入門
Categraf採集插件
categraf-mtail插件地址
https://github.com/flashcatcloud/categraf/tree/main/inputs/mtail
源碼解讀
package mtail
...
//常量值
const inputName = `mtail`
const description = ` extract internal monitoring data from application logs`
//配置
// MTail holds the configuration for the plugin.
type MTail struct {
config.PluginConfig
Instances []*Instance `toml:"instances"`
}
//配置文件中instances對象需要參數結構體
type Instance struct {
config.InstanceConfig
/**
type InternalConfig struct {
// append labels
Labels map[string]string `toml:"labels"`
// metrics drop and pass filter
MetricsDrop []string `toml:"metrics_drop"`
MetricsPass []string `toml:"metrics_pass"`
MetricsDropFilter filter.Filter
MetricsPassFilter filter.Filter
// metric name prefix
MetricsNamePrefix string `toml:"metrics_name_prefix"`
// mapping value
ProcessorEnum []*ProcessorEnum `toml:"processor_enum"`
// whether instance initial success
inited bool `toml:"-"`
}
type InstanceConfig struct {
InternalConfig
IntervalTimes int64 `toml:"interval_times"`
}
**/
NamePrefix string `toml:"name_prefix"`
Progs string `toml:"progs"` //規則文件(xxx.mtail)的目錄
Logs []string `toml:"logs"` //要監控的日誌文件
IgnoreFileRegPattern string `toml:"ignore_filename_regex_pattern"`
OverrideTimeZone string `toml:"override_timezone"` //指定時區
EmitProgLabel string `toml:"emit_prog_label"` //是否導出label標簽 string類型的bool值
emitProgLabel bool `toml:"-"`
EmitMetricTimestamp string `toml:"emit_metric_timestamp"` //metrics是否帶時間戳 string類型的bool值
emitMetricTimestamp bool `toml:"-"`
PollInterval time.Duration `toml:"poll_interval"`
PollLogInterval time.Duration `toml:"poll_log_interval"`
MetricPushInterval time.Duration `toml:"metric_push_interval"`
MaxRegexpLen int `toml:"max_regexp_length"`
MaxRecursionDepth int `toml:"max_recursion_depth"`
SyslogUseCurrentYear string `toml:"syslog_use_current_year"` // true
sysLogUseCurrentYear bool `toml:"-"`
LogRuntimeErrors string `toml:"vm_logs_runtime_errors"` // true
logRuntimeErrors bool `toml:"-"`
//
ctx context.Context `toml:"-"`
cancel context.CancelFunc `toml:"-"`
m *mtail.Server
}
//配置文件中instances對象的Init函數,調用mtail
func (ins *Instance) Init() error {
//初始化檢查,設置預設值
if len(ins.Progs) == 0 || len(ins.Logs) == 0 {
return types.ErrInstancesEmpty
}
// set default value
ins.sysLogUseCurrentYear = ins.SyslogUseCurrentYear == "true"
ins.logRuntimeErrors = ins.LogRuntimeErrors == "true"
ins.emitProgLabel = ins.EmitProgLabel == "true"
ins.emitMetricTimestamp = ins.EmitMetricTimestamp == "true"
if ins.PollLogInterval == 0 {
ins.PollLogInterval = 250 * time.Millisecond
}
if ins.PollInterval == 0 {
ins.PollInterval = 250 * time.Millisecond
}
if ins.MetricPushInterval == 0 {
ins.MetricPushInterval = 1 * time.Minute
}
if ins.MaxRegexpLen == 0 {
ins.MaxRegexpLen = 1024
}
if ins.MaxRecursionDepth == 0 {
ins.MaxRecursionDepth = 100
}
buildInfo := mtail.BuildInfo{
Version: config.Version,
}
//時區設置
loc, err := time.LoadLocation(ins.OverrideTimeZone)
if err != nil {
fmt.Fprintf(os.Stderr, "Couldn't parse timezone %q: %s", ins.OverrideTimeZone, err)
return err
}
//mtail參數設置
opts := []mtail.Option{
mtail.ProgramPath(ins.Progs),
mtail.LogPathPatterns(ins.Logs...),
mtail.IgnoreRegexPattern(ins.IgnoreFileRegPattern),
mtail.SetBuildInfo(buildInfo),
mtail.OverrideLocation(loc),
mtail.MetricPushInterval(ins.MetricPushInterval), // keep it here ?
mtail.MaxRegexpLength(ins.MaxRegexpLen),
mtail.MaxRecursionDepth(ins.MaxRecursionDepth),
mtail.LogRuntimeErrors,
}
if ins.cancel != nil {
ins.cancel()
} else {
ins.ctx, ins.cancel = context.WithCancel(context.Background()) //父級ctx
}
//mtail配置,每隔1h啟動 清理過期日誌
staleLogGcWaker := waker.NewTimed(ins.ctx, time.Hour)
opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker))
if ins.PollInterval > 0 {
logStreamPollWaker := waker.NewTimed(ins.ctx, ins.PollInterval)
logPatternPollWaker := waker.NewTimed(ins.ctx, ins.PollLogInterval)
opts = append(opts, mtail.LogPatternPollWaker(logPatternPollWaker), mtail.LogstreamPollWaker(logStreamPollWaker))
}
if ins.sysLogUseCurrentYear {
opts = append(opts, mtail.SyslogUseCurrentYear)
}
if !ins.emitProgLabel {
opts = append(opts, mtail.OmitProgLabel)
}
if ins.emitMetricTimestamp {
opts = append(opts, mtail.EmitMetricTimestamp)
}
//指標結果存儲對象
store := metrics.NewStore()
//間隔1h清理歷史指標
store.StartGcLoop(ins.ctx, time.Hour)
m, err := mtail.New(ins.ctx, store, opts...)
if err != nil {
log.Println(err)
ins.cancel()
return err
}
ins.m = m
return nil
}
//銷毀取消所有任務
func (ins *Instance) Drop() {
ins.cancel()
}
//對象初始化方法
func init() {
inputs.Add(inputName, func() inputs.Input {
return &MTail{}
})
}
//對象複製返回新建對象
func (s *MTail) Clone() inputs.Input {
return &MTail{}
}
func (s *MTail) Name() string {
return inputName
}
//MTail獲取配置文件中所有instances
func (s *MTail) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(s.Instances))
for i := 0; i < len(s.Instances); i++ {
ret[i] = s.Instances[i]
}
return ret
}
// Description returns a one-sentence description on the input.
func (s *MTail) Description() string {
return description
}
//抓取數據方法?
// Gather retrieves all the configured fields and tables.
// Any error encountered does not halt the process. The errors are accumulated
// and returned at the end.
// func (s *Instance) Gather(acc telegraf.Accumulator) error {
func (ins *Instance) Gather(slist *types.SampleList) {
//獲取到prometheus註冊器
reg := ins.m.GetRegistry()
mfs, done, err := prometheus.ToTransactionalGatherer(reg).Gather()
if err != nil {
log.Println(err)
return
}
defer done()
//遍歷所有指標向量?
for _, mf := range mfs {
metricName := mf.GetName()
//遍歷所有指標
for _, m := range mf.Metric {
//加入配置的Lables
tags := util.MakeLabels(m, ins.GetLabels())
//處理不同指標類型
if mf.GetType() == dto.MetricType_SUMMARY {
util.HandleSummary(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
util.HandleHistogram(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)
} else {
util.HandleGaugeCounter(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)
}
}
}
}
//返回時間戳
func (p *Instance) GetLogMetricTime(ts int64) time.Time {
var tm time.Time
if ts <= 0 || !p.emitMetricTimestamp {
return tm
}
sec := ts / 1000
ms := ts % 1000 * 1e6
tm = time.Unix(sec, ms)
return tm
}
整體理解下來,Categraf有效的通過統一的文件完成了多個目錄,多個規則的日誌採集,簡化許多操作。
最後感謝看完,由於作者水平有限,使用很多工具並不熟悉,如有錯誤和遺漏歡迎指出,感謝諒解。
以上內容來源於官方推出的夜鶯黃埔營的免費培訓活動,加入 QQ 群查看直播視頻,還可以在官方答疑站點獲得更多支持 https://answer.flashcat.cloud/