如何監控PostgreSQL存儲過程/函數代碼運行?本文介紹用python+微信/郵件的方式進行報警、監控。 首先要有一張表、用於存放PostgreSQL存儲過程/函數代碼運行異常的信息。 處理原則:若出現異常;把“發生時間+所在的程式+**原因”**通過微信/郵件發給對應人員。當然發送一次即可;起 ...
如何監控PostgreSQL存儲過程/函數代碼運行?本文介紹用python+微信/郵件的方式進行報警、監控。
首先要有一張表、用於存放PostgreSQL存儲過程/函數代碼運行異常的信息。
處理原則:若出現異常;把“發生時間+所在的程式+原因”通過微信/郵件發給對應人員。當然發送一次即可;起到通知的效果。
一、媒介
通過什麼方式進行發送內容;下麵介紹微信/郵件兩種方式
1、python發送微信
py_wechar.py的內容
企業微信號;大家可以到企業微信上配置
#!/usr/bin/python3
#coding=utf-8
import json
import time
import urllib.request as urllib2
options = {
'WeiXin': {
'corp_id': '*', #微信企業號ID
'agent_id': '*', #微信企業號應用ID
'agent_secret': '*', #微信企業號密鑰
'to_user': '@all' #發送給誰
},
}
class WeiXinSendMsg:
def __init__(self, wx_conf):
self.corp_id = wx_conf.get('corp_id')
self.agent_secret = wx_conf.get('agent_secret')
self.agent_id = wx_conf.get('agent_id')
self.to_user = wx_conf.get('to_user')
self.token = self.get_token()
self.token_update_time = int(time.time())
def get_token(self):
get_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + self.corp_id + '&corpsecret=' + self.agent_secret
token = json.loads(urllib2.urlopen(get_token_url).read().decode('utf-8'))['access_token']
if token:
return token
# 微信發送端的token每1800秒會更新一次
def update_token(self):
if int(time.time()) - self.token_update_time >= 1800:
self.token = self.get_token()
self.token_update_time = int(time.time())
def send_message(self, msg):
try:
self.update_token()
send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.token
send_val = {"touser":self.to_user, "toparty":"", "msgtype":"text", "agentid":self.agent_id, "text":{"content":msg}, "safe":"0"}
send_data = json.dumps(send_val, ensure_ascii=True).encode("utf-8")
send_request = urllib2.Request(send_url, send_data)
response = json.loads(urllib2.urlopen(send_request).read())
except Exception as e:
print('Exception WeiXin send_message:', e)
if __name__ == '__main__':
WeiXin = WeiXinSendMsg(options.get('WeiXin'))
WeiXin.send_message('hello world / 測試')
2、python發送郵件
py_email.py的內容
#!/usr/bin/python3
#coding=utf-8
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
options = {
'Email': {
'smtp_server': 'smtp.exmail.qq.com', #郵箱伺服器地址
'from_addr': '[email protected]', #發送人賬號
'password': '123456', #發送人密碼
'to_addr': ['[email protected]', '[email protected]'], #發送給誰
}
}
class EmailSendMsg:
def __init__(self, email_conf):
self.smtp_server = email_conf.get('smtp_server')
self.from_addr = email_conf.get('from_addr')
self.password = email_conf.get('password')
self.to_addr = email_conf.get('to_addr')
# def __del__(self):
# self.server.quit()
def format_addr(self, str):
name, addr = parseaddr(str)
return formataddr(( \
Header(name, 'utf-8').encode(), \
addr.encode('utf-8') if isinstance(addr, unicode) else addr))
def send_msg(self, text):
try:
self.server = smtplib.SMTP(self.smtp_server, 25)
self.server.set_debuglevel(1)
self.server.login(self.from_addr, self.password)
msg = MIMEText(text, 'plain', 'utf-8')
msg['From'] = self.format_addr(u'監控 <%s>' % self.from_addr)
for i in range(len(self.to_addr)):
msg['To'] = self.format_addr(u'<%s>' % self.to_addr[i])
msg['Subject'] = Header(u'異常報警…', 'utf-8').encode()
self.server.sendmail(self.from_addr, self.to_addr, msg.as_string())
self.server.quit()
except Exception as e:
print 'Exception Email send_message:', e
if __name__ == '__main__':
Email = EmailSendMsg(options.get('Email'))
Email.send_msg('hello world!')
二、python連接資料庫
看這個鏈接可以研究下python如何連接PostgreSQL資料庫
三、python報警
上面我們知道如何通過python發送微信內容、以及python連接PostgreSQL資料庫。現在我們要如何獲取報警時機;報警內容。
python_alert.py
#!/usr/bin/python3
import psycopg2
from config import config
from py_wechar import WeiXinSendMsg,options
def get_errors():
""" query data from the vendors table """
conn = None
try:
params = config()
WeiXin = WeiXinSendMsg(options.get('WeiXin'))
conn = psycopg2.connect(**params)
cur = conn.cursor()
cur.execute("select error_time, error_desc, proc_name from adsas.tbl_error_log where deal_status = 0 order by id")
rows = cur.fetchall()
if cur.rowcount > 0 :
WeiXin.send_message("The number of parts: {}".format(cur.rowcount))
for row in rows:
# WeiXin.send_message('-'*60)
# WeiXin.send_message('發生時間:{}'.format(row[0]))
# WeiXin.send_message('錯誤原因:{}'.format(row[1]))
# WeiXin.send_message('報警代碼:{}'.format(row[2]))
str_error='發生時間:{}\n錯誤原因:{}\n報警代碼:{}'.format(row[0],row[1],row[2])
WeiXin.send_message(str_error)
cur.execute("update adsas.tbl_error_log set deal_status = 1 where deal_status = 0 ")
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == '__main__':
get_errors()
四、部署
可以通過cron/或者開源的定時任務系統進行報警;
報警信息: