1.背景描述 2020年團隊決定對elasticsearch升級。es(elasticsearch縮寫,下同)當前版本為0.9x,升級到5.x版本。es在本公司承載三個部分的業務,站內查詢,訂單數據統計,elk日誌分析。 對於站內查詢和訂單數據統計,當前業務架構是 mysql -> canal -> ...
一.項目背景
隨著集團MHA集群的日漸增長,MHA管理平臺話越來越迫切。而MHA平臺的建設第一步就是將這些成百上千套的MHA集群信息收集起來,便於查詢和管理。
MHA主要信息如下:
(1)基礎配置信息;
(2)運行狀態信息;
(3)啟動及FailOver的log信息。
集團目前資料庫的管理平臺是在Archery的基礎上打造,所以,需要將此功能嵌入到既有平臺上。通過Archery系統進行查詢展示。
二.架構
簡單來說,通過 Filebeat + Logstash + MySQL 架構 來收集保存各個集群的配置信息、啟動及FailOver的log信息 和運行狀態信息。
運行狀態信息是通過一個小程式獲取的,這個小程式每分鐘執行一次,會把執行結果輸出到文件中。當然這個文件是被failbeat監控的。
三.實現
3.1 獲取MHA狀態的腳本
文件為mha_checkstatus.py
#!/usr/bin/python # -*- coding: UTF-8 -*- import os import io import re import ConfigParser Path='/etc/mha' #fout=open('輸出文件名','w') for Name in os.listdir(Path) : Pathname= os.path.join(Path,Name) ## print(Pathname) ## print(Name) config =ConfigParser.ConfigParser() try: config.read(Pathname) server_item = config.sections() server1_host = '' ##MHA cnf 配置文件中的節點1 server2_host = '' ##MHA cnf 配置文件中的節點2 server3_host = '' ##MHA cnf 配置文件中的節點3 mha_cnf_remark = '' if 'server1' in server_item: server1_host = config.get('server1','hostname') else: mha_cnf_remark = mha_cnf_remark + 'Server1未配置;' if 'server2' in server_item: server2_host = config.get('server2','hostname') else: mha_cnf_remark = mha_cnf_remark + 'Server2未配置;' if 'server3' in server_item: server3_host = config.get('server3','hostname') ##print(mha_cnf_remark) except Exception as e: print(e) mha_status_result ='' ###20190330 Name = Name.replace(".cnf", "") ###集群一主一從 if server1_host <> '' and server2_host <> '' and server3_host == '': cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname with os.popen(cmd_mha_status) as mha_status: mha_status_result = mha_status.read() if 'running(0:PING_OK)' in mha_status_result: print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result) if 'stopped(2:NOT_RUNNING)' in mha_status_result: print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result) ####集群一主兩從 if server1_host <> '' and server2_host <> '' and server3_host <> '': cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname with os.popen(cmd_mha_status) as mha_status: mha_status_result = mha_status.read() if 'running(0:PING_OK)' in mha_status_result: print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::0:::'+server3_host+':::'+mha_status_result) if 'stopped(2:NOT_RUNNING)' in mha_status_result: print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::None:::'+server3_host+':::'+mha_status_result)
概況說明,就是到存放MHA配置的文件夾,根據每個集群的配置文檔,去逐一執行下masterha_check_status,把結果格式化,輸出到指定的文件中。這個就是每個集群的狀態數據。通過filebeat實時彙報上去。
觸發的方式可以是crontab,每分鐘執行一次。再本案中是輸出到 /???/checkmhastatus/masterha_check_status.log 中。
形式類似如下:
*/1 * * * * python /???/????/mha_checkstatus.py >> /???/????/masterha_check_status.log
3.2 表的設計及腳本
3.2.1 運行狀態表 dbmha_status
CREATE TABLE `dbmha_status` ( `id` int NOT NULL AUTO_INCREMENT, `host` varchar(100) NOT NULL, `clustername` varchar(200) NOT NULL, `logpath` varchar(500) NOT NULL, `confpath` varchar(500) NOT NULL, `mhstatus` varchar(100) NOT NULL, `serverip` varchar(100) NOT NULL, `info` varchar(2000) NOT NULL, `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間', PRIMARY KEY (`id`), KEY `idx_createtime` (`create_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.2.2 mha log 信息表 dbmha_log
CREATE TABLE `dbmha_log` ( `id` int NOT NULL AUTO_INCREMENT, `host` varchar(100) NOT NULL, `clustername` varchar(200) NOT NULL, `filename` varchar(200) NOT NULL, `logpath` varchar(500) NOT NULL, `message` longtext NOT NULL, `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.2.3 MHA 基礎配置表 dbmha_conf_info
CREATE TABLE `dbmha_conf_info` ( `id` int NOT NULL AUTO_INCREMENT, `host` varchar(100) NOT NULL, `clustername` varchar(200) NOT NULL DEFAULT '', `confpath` varchar(500) NOT NULL DEFAULT '', `manager_log` varchar(500) NOT NULL DEFAULT '', `manager_workdir` varchar(500) NOT NULL DEFAULT '', `master_binlog_dir` varchar(500) NOT NULL DEFAULT '', `failover_script` varchar(500) NOT NULL DEFAULT '', `online_change_script` varchar(500) NOT NULL DEFAULT '', `password` varchar(128) NOT NULL DEFAULT '', `ping_interval` varchar(100) NOT NULL DEFAULT '', `remote_workdir` varchar(100) NOT NULL DEFAULT '', `repl_password` varchar(128) NOT NULL DEFAULT '', `repl_user` varchar(20) NOT NULL DEFAULT '', `ssh_user` varchar(20) NOT NULL DEFAULT '', `user` varchar(20) NOT NULL DEFAULT '', `serverip1` varchar(100) NOT NULL DEFAULT '', `port1` varchar(10) NOT NULL DEFAULT '', `candidate_master1` varchar(5) NOT NULL DEFAULT '', `check_repl_delay1` varchar(20) NOT NULL DEFAULT '', `serverip2` varchar(100) NOT NULL DEFAULT '', `port2` varchar(10) NOT NULL DEFAULT '', `candidate_master2` varchar(5) NOT NULL DEFAULT '', `check_repl_delay2` varchar(20) NOT NULL DEFAULT '', `serverip3` varchar(100) NOT NULL DEFAULT '', `port3` varchar(10) NOT NULL DEFAULT '', `candidate_master3` varchar(5) NOT NULL DEFAULT '', `check_repl_delay3` varchar(20) NOT NULL DEFAULT '', `info` longtext NOT NULL, `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間', PRIMARY KEY (`id`), KEY `idx_createtime` (`create_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.3 filbeat 中關於讀取文件的配置
.............. - type: log paths: - /???/????/masterha_check_status.log fields: log_type: mha-status db_host: 111.111.XXX.1XX ###這個IP為mha Mnaager所在serverip - type: log paths: - /???/mhaconf/*.cnf fields: log_type: mha-cnf db_host: 111.111.XXX.XXX multiline.type: pattern multiline.pattern: '^\[server [[:space:]] default' multiline.negate: true multiline.match: after - type: log paths: - /???/????/mha/*/*.log fields: log_type: mysql-mha db_host: 111.111.XXX.XXX ................
3.4 Logstash 的配置文件
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { beats { port => 5044 } } filter { if [fields][log_type] == "mysql-mha" { grok { match => ["message", "(?m)^%{TIMESTAMP_ISO8601:timestamp} %{BASE10NUM} \[%{WORD:error_level}\] %{GREEDYDATA:error_msg}$"] } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"} } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"} } date { match=> ["timestamp", "ISO8601"] remove_field => ["timestamp"] } mutate { copy => { "[log][file][path]" => "logpath" "[fields][db_host]" => "manager_ip" } } mutate { remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"] } } if [fields][log_type] == "mha-cnf" { mutate { split => ["message","server"] add_field => {"message1" => "%{[message][1]}"} add_field => {"messages1" => "%{[message][2]}"} add_field => {"messages2" => "%{[message][3]}"} add_field => {"messages3" => "%{[message][4]}"} add_field => {"dft_password" => "*********"} add_field => {"dft_repl_password" => "*********"} } kv { source => "message1" field_split => "\n" include_keys => ["manager_log", "manager_workdir", "master_binlog_dir", "master_ip_failover_script", "master_ip_online_change_script", "ping_interval", "remote_workdir", "repl_user", "ssh_user", "user" ] prefix => "dft_" remove_char_value => "<>\[\]," } kv { source => "messages1" field_split => "\n" include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ] prefix => "s1_" } kv { source => "messages2" field_split => "\n" default_keys => [ "s2_candidate_master", "", "s2_check_repl_delay", "", "s2_hostname","", "s2_port","" ] include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ] prefix => "s2_" } kv { source => "messages3" field_split => "\n" default_keys => [ "s3_candidate_master", "", "s3_check_repl_delay", "", "s3_hostname","", "s3_port","" ] include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ] prefix => "s3_" } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"} match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"} } mutate { copy => { "[fields][db_host]" => "manager_ip" } copy => { "[log][file][path]" => "conf_path" } gsub => [ "message", "需要加密的***密***碼", "*********", "message", "需要加密的其他字元", "*********" ] } date { match=> ["timestamp", "ISO8601"] remove_field => ["timestamp"] } mutate { remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"] } } if [fields][log_type] == "mha-status" { mutate { split => ["message",":::"] add_field => {"cluster_name" => "%{[message][0]}"} add_field => {"conf_path" => "%{[message][1]}"} add_field => {"masterha_check_status" => "%{[message][2]}"} add_field => {"server" => "%{[message][3]}"} add_field => {"info" => "%{[message][4]}"} } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"} } mutate { copy => { "[fields][db_host]" => "manager_ip" } } date { match=> ["timestamp", "ISO8601"] remove_field => ["timestamp"] } mutate { remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"] } } } output { if [fields][log_type] == "mysql-mha" { jdbc { driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar" driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????" statement => ["INSERT INTO dbmha_log (host,clustername,filename,logpath, message) VALUES(?, ?, ?, ?, ?)","%{manager_ip}","%{product}",<