基礎監控的同比告警主要是針對伺服器監控採集的指標,包括負載(load1/load5/load15)、平均CPU使用率、記憶體使用率、內外網流量、埠數量等,具體採集方法可參考《基礎監控-伺服器監控》。 一、告警原理 多個指標每分鐘1個數據,比較當前分鐘的前10分鐘的7天平均值,如果幅度超過100%並且 ...
基礎監控的同比告警主要是針對伺服器監控採集的指標,包括負載(load1/load5/load15)、平均CPU使用率、記憶體使用率、內外網流量、埠數量等,具體採集方法可參考《基礎監控-伺服器監控》。
一、告警原理
多個指標每分鐘1個數據,比較當前分鐘的前10分鐘的7天平均值,如果幅度超過100%並且絕對值相差達到M則算是一次異常,包括上升/下降異常,如果一個指標持續兩次上升異常或者持續兩次下降異常(不包括先上升後下降或者先下降後上升情況)則開始告警給機器的對應負責人(運維/開發)。例如當前是10號11:00,則比較的是10:59,10:58...10:50共10分鐘的最近7天的平均值,例如10:59則是10號、9、8...4共7天的10:59這個點的數據的平均值。實際情況一般是第二分鐘才解析了前一分鐘的數據,相當於當前是11點,則解析的是10:59這個點和它之前的10分鐘的7天平均值的比較。
二、數據來源
基礎監控-伺服器監控每分鐘會採集一份數據保存到Redis中,保存格式是 reportTime - hash,hash的格式是{ip1:{item1:value, item2:value2...}, ip2:{item1:value, item2:value2...}, ip3...},則每分鐘一個redis hash,共7天 7*1440=10080個數據。實際情況保存的時候會多保留10分鐘的數據,即7天+10分鐘;由於reportTime是根據機器的實際時間來上報(這樣畫圖才能保證是準確的),而某些機器沒有NTP伺服器或者其他原因導致時間不准,則reportTime則又會多種多樣,所以導致的結果是redis的hash會變多一些,當然這並不影響我們的數據獲取,因為整個同比告警就是根據畫圖來比較的,畫圖採用的是reportTime。採用hash保存到redis則不用每個ip讀取一次redis,可以減少N次網路IO,大大提高程式速度。由於redis占據記憶體較多,大概10G左右,需要調整redis配置文件maxmemory的大小,不然redis會隨機刪除設置自動過期的key。
三、程式設計
1、DB設計
需要數據來源保存(redis)、異常展示表(mysql)、閾值配置表(mysql)、上次狀態(redis)。異常展示表保存所有ip的異常描述信息、異常持續時間,可在頁面展示;閾值配置表保存所有ip的閾值配置信息,每個ip的異常比率、各個指標的絕對值差、是否需要監控等;上次狀態則用來判斷是否需要告警的,持續2次同類異常則告警,使用redis保存即可。
mysql> show tables;
+-----------------------------------+
| Tables_in_machineMonitor_overLast |
+-----------------------------------+
| currentDisplay |
| monitorConf |
+-----------------------------------+
2、導入測試數據
測試數據是使用mysql和redis將數據從mysql導入redis中,線上數據是等程式完成後修改"伺服器監控"的上報CGI來導入的。測試導入的時候遇到的坑參考《python處理json和redis hash的坑》
def initRedis(client): if client not in CONF.redisInfo: raise RedisNotFound("can not found redis %s in config" % client) try: pool = redis.ConnectionPool(**CONF.redisInfo[client]) # 線程安全 red = redis.Redis(connection_pool=pool) red.randomkey() # check except Exception, e: raise RedisException("connect redis %s failed: %s" % (client, str(e))) return red def initDb(client): if client not in CONF.dbInfo: raise MysqlNotFound("can not found mysql db %s in config" % client) try: db = opMysql.OP_MYSQL(**CONF.dbInfo[client]) except Exception, e: raise MysqlException("connect mysql %s failed: %s" % (client, str(e))) code, errMsg = db.connect() if 0 != code: raise MysqlException("connect mysql %s failed: %s" % (client, errMsg)) return db
3、告警介面
調用"告警平臺"介面,項目配置後可發送RTX/SMS/Wechat,可在頁面查看歷史記錄,輕鬆修改配置和臨時屏蔽等。調用介面直接使用urllib/urllib2庫即可
postData = urllib.urlencode(values) req = urllib2.Request(CONF.amcUrl, postData) response = urllib2.urlopen(req, timeout=CONF.AMC_TIME_OUT) amcDict = json.loads(response.read()) code = int(amcDict["returnCode"]) errMsg = amcDict["errorMsg"]
4、解析來源的數據
將數據轉成可識別字典,並做排錯處理,將錯誤數據拒絕掉
5、使用numpy和panda求平均值
將來源數據解析後存入panda.DataFrame中,如果數據不存在則使用numpy.nan代替,使用panda.DataFrame().mean().round(2)求平均值並保留2位數。如果某一分鐘的7天數據全部獲取不到則拒絕解析當前值,如果是7天數據的部分數據獲取不到則剔除該點併在平均值的除數相對減一,使用NAN代替該點則在mean中可以解決這種情況。增加自定義函數比較每個列是否滿足幅度上升/下降100%(可配置)並且絕對值差達到M(可配置),是則返回True,否則返回False,判斷所有返回值是否均為True或者均為False,是則符合異常場景。
初始化DataFrame
for item in vd: if value is None or value[item] is None: vd[item][lastDayKey] = numpy.nan else: vd[item][lastDayKey] = value[item] vf = pandas.DataFrame(vd) columns.append(vf.mean().round(2)) indexes.append(lastMinKey) self.ipInfo[ip]["lastData"] = pandas.DataFrame(columns, index=indexes)
panda自定義函數比較並且判斷是否需要告警
for item in curValue: if curValue[item] is None: # error continue else: curValue[item] = round(curValue[item], 2) def overLastCompute(v2, absSub): """ :param v2: float :param absSub: absolute subtract :return: high/low/null """ v1 = curValue[item] v2 = round(v2, 2) if 0 == v2: if v1 > absSub: return "HIGH" if v1 < -absSub: return "LOW" return "NULL" subVal = abs(v1 - v2) if subVal / v2 > CONF.RATIO and subVal > absSub: if v1 > v2: return "HIGH" return "LOW" return "NULL" self.ipInfo[ip]["result"][item] = self.ipInfo[ip]["lastData"][item].apply(overLastCompute, absSub=self.monitorConf[ip][item]) res = self.ipInfo[ip]["result"][item] == "HIGH" # Series if all(i for i in res): resErr[item] = CONF.HIGH_ERR if CONF.HIGH_ERR == self.lastCache[str(ip)][item]: # will Alert if switch on pass else: res = self.ipInfo[ip]["result"][item] == "LOW" if all(i for i in res): resErr[item] = CONF.LOW_ERR if CONF.LOW_ERR == self.lastCache[str(ip)][item]: # will Alert if switch on pass
6、由於IP較多,並且主要邏輯在解析數據和panda的計算上,使用CPU比較多,則需要使用多進程,並且結合線程池將進程跑滿,別浪費進程資源。
step = ipNums / multiprocessing.cpu_count() ipList = list() i = 0 j = 1 processList = list() for ip in self.ipInfo: ipS = str(ip) if ipS not in self.lastCache: self.lastCache[ipS] = copy.deepcopy(self.value) ipList.append(ip) i += 1 if i == step * j or i == ipNums: j += 1 def innerRun(): wm = Pool.ThreadPool(CONF.POOL_SIZE) for myIp in ipList: kw = dict(ip=myIp, handlerKey=myIp) wm.addJob(self.handleOne, **kw) wm.waitForComplete() ipListNums = len(ipList) for tmp in xrange(ipListNums): res = wm.getResult() if res: handlerKey, code, handlerRet, errMsg = res if 0 != code: continue self.lastCache[str(handlerKey)] = handlerRet process = multiprocessing.Process(target=innerRun) process.start() processList.append(process) ipList = list() for process in processList: process.join()
四、優化
指標監控v2則是同比告警的升級版,數據量將會大好幾倍,目前想到的優化如下
1、使用hbase替代redis
2、將程式做京廣容災,改成分散式運行,橫向擴展多套程式併列運行