一、簡介 ELK日誌我們一般都是按天存儲,例如索引名為"kafkalog-2022-04-05",因為日誌量所占的存儲是非常大的,我們不能一直保存,而是要定期清理舊的,這裡就以保留7天日誌為例。 自動清理7天以前的日誌可以用定時任務的方式,這樣就需要加入多一個定時任務,可能不同服務記錄的索引名又不一 ...
一、簡介
ELK日誌我們一般都是按天存儲,例如索引名為"kafkalog-2022-04-05",因為日誌量所占的存儲是非常大的,我們不能一直保存,而是要定期清理舊的,這裡就以保留7天日誌為例。
自動清理7天以前的日誌可以用定時任務的方式,這樣就需要加入多一個定時任務,可能不同服務記錄的索引名又不一樣,這樣用定時任務配還是沒那麼方便。
ES給我們提供了一個索引的生命周期策略(lifecycle),就可以對索引指定刪除時間,能很好解決這個問題。
索引生命周期分為四個階段:HOT(熱)=>WARM(溫)=》COLD(冷)=>DELETE(刪除)
二、給索引設生命周期策略(ILM)
1.配置生命周期策略(policy)
這裡為ELK日誌超過7天的自動刪除,所以只需要用到DELETE(刪除階段)
PUT _ilm/policy/auto_delete_policy { "policy": { "phases": { "delete": { "min_age": "7d", "actions": { "delete": {} } } } } }
創建一個自動刪除策略(auto_delete_policy)
delete:刪除階段,7天執行刪除索引動作
查看策略:GET _ilm/policy/
2.創建索引模板
索引模板可以匹配索引名稱,匹配到的索引名稱按這個模板創建mapping
PUT _template/elk_template { "index_patterns": ["kafka*"], "settings": { "index":{ "lifecycle":{ "name":"auto_delete_policy", "indexing_complete":true } } } }
創建索引模板(elk_tempalte),index.lifecycle.name把上面的自動刪除策略綁定到elk索引模板
創建kafka開頭的索引時就會應用這個模板。
indexing_complete:true,必須設為true,跳過HOT階段的Rollover
查看模板:GET /_template/
3.測試效果
logstash配置:
logstash接收kafka的輸入,輸出到es。
input { kafka { type=>"log1" topics => "kafkalog" #在kafka這個topics提取數據 bootstrap_servers => "127.0.0.1:9092" # kafka的地址 codec => "json" # 在提取kafka主機的日誌時,需要寫成json格式 } } output { if [type] =="log1" { elasticsearch { hosts => ["127.0.0.1:9200"] #es地址 index => "kafkalog%{+yyyy.MM.dd}" #把日誌採集到es的索引名稱 # user => "elastic" # password => "123456" } } }
這裡測試時把DELETE的日期由7天"7d"改為1分鐘"1m"。
生命周期策略預設10分鐘檢測一次,為了方便測試,這裡設為30s。
PUT /_cluster/settings { "transient": { "indices.lifecycle.poll_interval":"30s" } }
把日誌寫入到es後,查看日誌索引的生命周期策略信息。
GET kafka*/_ilm/explain 查看kafka開頭索引的生命周期策略
過一會再點查詢,索引已經沒有了,說明已經生效。