公司的Riak版本是2.0.4,目前已根據CMDB三級業務部署了十幾套集群,大部分是跨機房部署。監控採集分為兩個大的維度,第一個維度是單機,也就是 「IP:埠」;第二個維度是集群,也就是所有節點指標的統計結果。本文主要介紹採集的指標和採集程式。 一、採集的指標 1、吞吐量指標 1.1 單機 採集方 ...
公司的Riak版本是2.0.4,目前已根據CMDB三級業務部署了十幾套集群,大部分是跨機房部署。監控採集分為兩個大的維度,第一個維度是單機,也就是 「IP:埠」;第二個維度是集群,也就是所有節點指標的統計結果。本文主要介紹採集的指標和採集程式。
一、採集的指標
1、吞吐量指標
1.1 單機
採集方法:
/usr/sbin/riak-admin status
指標 | 功能 |
---|---|
node_gets | 某節點前一分鐘處理的 GET 請求數量,包括該節點上非本地虛擬節點處理的 GET 請求 |
node_puts | 某節點前一分鐘處理的 PUT 請求數量,包括該節點上非本地虛擬節點處理的 PUT 請求 |
1.2 集群
指標 | 功能 | 統計方法 |
---|---|---|
node_gets_total | 集群前一分鐘處理的 GET 請求數量 | SUM(node_gets) |
node_puts_total | 集群前一分鐘處理的 PUT 請求數量 | SUM(node_puts) |
2、延遲指標
2.1 單機
採集方法:
/usr/sbin/riak-admin status
指標 | 功能 |
---|---|
node_get_fsm_time_mean | 客戶端發起 GET 請求到收到響應時間間隔的均值 |
node_get_fsm_time_median | 客戶端發起 GET 請求到收到響應時間間隔的中值 |
node_get_fsm_time_95 | 客戶端發起 GET 請求到收到響應時間間隔的 95 百分位值 |
node_get_fsm_time_100 | 客戶端發起 GET 請求到收到響應時間間隔的 100 百分位值 |
node_put_fsm_time_mean | 客戶端發起 PUT 請求到收到響應時間間隔的均值 |
node_put_fsm_time_median | 客戶端發起 PUT 請求到收到響應時間間隔的中值 |
node_put_fsm_time_95 | 客戶端發起 PUT 請求到收到響應時間間隔的 95 百分位值 |
node_put_fsm_time_100 | 客戶端發起 PUT 請求到收到響應時間間隔的 100 百分位值 |
2.2 集群
指標 | 功能 | 統計方法 |
---|---|---|
node_get_fsm_time_mean_avg | 客戶端發起 GET 請求到收到響應時間間隔的均值 | AVG(node_get_fsm_time_mean) |
node_put_fsm_time_mean_avg | 客戶端發起 PUT 請求到收到響應時間間隔的均值 | AVG(node_put_fsm_time_mean) |
3、Erlang 資源使用情況指標(單機)
採集方法:
/usr/sbin/riak-admin status
指標 | 功能 |
---|---|
sys_process_count | Erlang 進程的數量 |
memory_processes | 分配給 Erlang 進程的記憶體總量(單位 bytes) |
memory_processes_used | Erlang 進程使用的記憶體總量(單位 bytes) |
4、Riak 負荷/健康指標
4.1 單機
採集方法:
/usr/sbin/riak-admin status
指標 | 功能 |
---|---|
read_repairs | 某節點前一分鐘處理的讀取修複操作數量 |
node_get_fsm_siblings_mean | 某節點前一分鐘所有 GET 操作處理的兄弟數據數量均值 |
node_get_fsm_siblings_median | 某節點前一分鐘所有 GET 操作處理的兄弟數據數量中值 |
node_get_fsm_siblings_95 | 某節點前一分鐘所有 GET 操作處理的兄弟數據數量 95 百分位值 |
node_get_fsm_siblings_100 | 某節點前一分鐘所有 GET 操作處理的兄弟數據數量 100 百分位值 |
node_get_fsm_objsize_mean | 某節點前一分鐘流經 GET_FSM 的對象大小均值 |
node_get_fsm_objsize_median | 某節點前一分鐘流經 GET_FSM 的對象大小中值 |
node_get_fsm_objsize_95 | 某節點前一分鐘流經 GET_FSM 的對象大小 95 百分位值 |
node_get_fsm_objsize_100 | 某節點前一分鐘流經 GET_FSM 的對象大小 100 百分位值 |
4.2 集群
指標 | 功能 | 統計方法 |
---|---|---|
read_repairs_total | 集群前一分鐘處理的讀取修複操作數量 | SUM(read_repairs) |
node_get_fsm_siblings_mean_avg | 集群前一分鐘所有 GET 操作處理的兄弟數據數量均值 | AVG(node_get_fsm_siblings_mean) |
node_get_fsm_objsize_mean_avg | 集群前一分鐘流經 GET_FSM 的對象大小均值 | AVG(node_get_fsm_objsize_mean) |
5、其他
5.1 LevelDB合併錯誤(單機)
採集方法:
find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} \; | wc -l
5.2 LevelDB讀取塊操作錯誤(單機)
採集方法:
/usr/sbin/riak-admin status
指標 | 功能 |
---|---|
leveldb_read_block_error | LevelDB 讀取塊操作錯誤數量 |
5.3 節點存活狀態(單機)
採集方法:
/usr/sbin/riak-admin member-status | grep `ifconfig | grep "inet addr:10" | awk -F':' '{print $2}' | awk '{print $1}'`
輸出如下,valid表示節點正常
valid 9.0% -- '[email protected]'
5.4 Riak Error Log(單機)
Riak 日誌路徑:/data1/riak/logs 採集文件:/data1/riak/logs/* 採集時間段:最近一分鐘 採集內容:最近一分鐘發生的錯誤數 採集示例:grep error -rn /data1/riak/logs | wc -l 說明:這個採集需要程式處理下邏輯,在此不給出完整的採集方法
二、採集程式
1、Riak監控系統設計
DBA通過前臺頁面根據CMDB三級業務添加/卸載Riak集群監控,根據CMDB的ip添加Riak單機監控(單機屬於集群,不能單獨存在,可增量添加單機監控),填寫ip和埠,配置閾值、負責人等信息
1)資料庫設計
mysql> use riakMonitor show tabReading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> show tables; +---------------------------+ | Tables_in_riakMonitor | +---------------------------+ | riakClusterConf | | riakClusterDisplay | | riakClusterStatus | | riakClusterStatusTemplate | | riakSingleConf | | riakSingleDisplay | | riakSingleStatus | | riakSingleStatusTemplate | +---------------------------+ 8 rows in set (0.00 sec)
Template表作為歷史庫表模板,歷史庫按月分庫,按ip分表
2) 單機Agent設計
- Agent會通過自動調度平臺下發到目標機器,Crond周期是1分鐘,直接上報到mysql資料庫。運行時間超過45s 會被調度平臺kill
- 如果檢測不到riak或者命令出錯則會發送rtx告警給admins + dba, 系統錯誤會發送給admins
3) 集群匯聚設計
- 集群數據根據節點agent上報數據在50s的時候select出當前一分鐘的數據計算匯聚入庫
- 程式每分鐘都會清除clusterStatus的數據,如果agent在本分鐘上報心跳異常或者上報時間不在集群程式運行前(50s),cluster則不會統計該ip數據,但平均值計算時的除數會算上該ip(+1)
- 集群計算同時會寫進歷史庫,並創建歷史表
4) CGI介面設計(NodeJs)
- 非同步接收agent上報的數據,根據redis的ip列表轉換成ip1
- 如果redis獲取的ip1不存在singleConf表中則會拒絕上報,返回3003錯誤
- 上報成功會入singleStatus和歷史庫,並創建歷史表
5) 代碼列表
CGI : /data/riakMonitor # daemon agent: /home/opd/script/riakMonitor # crond analyzer: /opdData/opdOnline/script/kmc/riakMonitor/analyzer # crond 1、從CMDB更新single/cluster conf數據 2、同步conf和display 3、解析status數據到display 4、異常數據寫入 5、告警 riakTool: /opdData/opdOnline/script/kmc/riakMonitor/riakTool # daemon 每分鐘第50s運行一次 1、獲取監控集群和集群的ip,計算結果並匯聚 2、操作redis,將集群數據入歷史庫
2、採集程式部分代碼 (單機,python2.4)
1) 採集指標函數
def getRiakMeta(): thisFuncName = str(sys._getframe().f_code.co_name) cmdStr = "/usr/sbin/riak-admin status" cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr) if 0 != cmdCode: msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr) logger.error(msgTxt) sendRtx(MYCONF.riakAdmins, thisFuncName+" %s Fail:" % cmdStr) return 1 data["node_gets"] = data["node_puts"] = data["node_get_fsm_time_mean"] = data["node_get_fsm_time_median"] = 0 data["node_get_fsm_time_95"] = data["node_get_fsm_time_100"] = data["node_put_fsm_time_mean"] = 0 data["node_put_fsm_time_median"] = data["node_put_fsm_time_95"] = data["node_put_fsm_time_100"] = 0 data["sys_process_count"] = data["memory_processes"] = data["memory_processes_used"] = 0 data["read_repairs"] = data["node_get_fsm_siblings_mean"] = data["node_get_fsm_siblings_median"] = 0 data["node_get_fsm_siblings_95"] = data["node_get_fsm_siblings_100"] = data["node_get_fsm_objsize_mean"] = 0 data["node_get_fsm_objsize_median"] = data["node_get_fsm_objsize_95"] = data["node_get_fsm_objsize_100"] = 0 data["leveldb_read_block_error"] = 0 riakItemInfo = cmdStdout.split('\n') for each in riakItemInfo: eachInfo = each.split(" : ") if 2 == len(eachInfo): itemKey = eachInfo[0] itemValue = eachInfo[1].replace('<<"', '').replace('">>', '') if itemKey in data: logger.debug("%s:%s" % (itemKey, itemValue)) try: data[itemKey] = str(round(float(itemValue), 2)) except ValueError: data[itemKey] = itemValue except: raise cmdStr = """ find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} \; | wc -l """ cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr) if 0 != cmdCode: msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr) logger.error(msgTxt) sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 data["leveldb_compaction_error"] = cmdStdout #不用轉int cmdStr = "/usr/sbin/riak-admin member-status | grep %s" % data["mainIp"] cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr) logger.debug(cmdStdout) if 0 != cmdCode: msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr) logger.error(msgTxt) sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 if cmdStdout.strip().startswith('valid'): data["is_active"] = 1 else: data["is_active"] = 0 data["riak_error_log"] = 0 riakLogPath = "/data1/riak/logs/" if not os.path.isdir(riakLogPath): msgTxt = "[%s] %s not exists" % (thisFuncName, riakLogPath) logger.error(msgTxt) sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 riakLogInfo = os.listdir(riakLogPath) reportTimeSec = time.mktime(time.strptime(data["report_time"], "%Y-%m-%d %H:%M:%S")) for each in riakLogInfo: logger.debug("fileName: "+each) eachFile = os.path.join(riakLogPath, each) if os.path.isfile(eachFile): try: eachFd = open(eachFile, 'r') except IOError, e: msgTxt = "I/O error({}): {}".format(e.errno, e.strerror) logger.error(msgTxt) sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 else: for eachLine in eachFd: #從頭讀,怕文件太大撐爆記憶體 if "error" in eachLine: #2016-03-20 04:57:09.704 [info] <0.19012.49>@riak_kv_index_h eachInfo = eachLine.split(' ') try: eachTimeStr = "%s %s" % (eachInfo[0], eachInfo[1][:-4]) eachTimeSec = time.mktime(time.strptime(eachTimeStr, "%Y-%m-%d %H:%M:%S")) if reportTimeSec - 60 <= eachTimeSec < reportTimeSec: logger.debug(eachLine) data["riak_error_log"] += 1 elif eachTimeSec >= reportTimeSec: break except: msgTxt = "file(%s) format wrong " % eachFile logger.error(msgTxt) break #sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) #eachFile.close() #return 1 eachFd.close() return 0
2) 上報和失敗重傳函數
def report2server(content, retry): '''上報到入庫程式,根據ip求餘獲取優先的server,如果上報失敗會遍歷server列表''' thisFuncName = "" try: thisFuncName = str(sys._getframe().f_code.co_name) pos = data["ip"] % len(MYCONF.reportServer) serverKeys = MYCONF.reportServer.keys() serverKeys.sort() serverKeys = serverKeys[pos:] + serverKeys[:pos] for serverId in serverKeys: cmdStr = "/usr/bin/curl -s --connect-timeout %d -m %d -d '%s&reTry=%d' %s" %( MYCONF.curlConnectTimeout, MYCONF.curlMaxTimeout, content, retry, MYCONF.reportServer[serverId]) cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr) logger.info(cmdStr + "\ncmdCode:" + str(cmdCode) + "\n" + cmdStdout + cmdStderr) if 0 == cmdCode: return 0 return 1 except: exceptmsg = StringIO.StringIO() traceback.print_exc(file=exceptmsg) msgTxt = exceptmsg.getvalue() sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt) return 1 def reportScheduler(reportRecord=0): '''reportRecord = 0 表示上報data中採集的新數據, reportRecord = 1 表示從reportFailFile裡面獲取最新的一條數據上報到server,然後需要處理reportFailFile''' thisFuncName = "" try: thisFuncName = str(sys._getframe().f_code.co_name) if 1 == reportRecord: # 從上報失敗文件中獲取最後一條數據,上報之 if not reportFail.has_section("index"): #這裡不要去add_section("index") 該誰add誰add去 return 0 if not reportFail.has_option("index", "index") or "" == reportFail.get("index", "index").strip(): return 0 indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip()) index = indexVec[-1] if "" == index: msgTxt = reportFail.get("index", "index").strip() sendRtx(MYCONF.admins, thisFuncName + "[系統錯誤] index.index 末尾有多餘的逗號 " + msgTxt) return 1 if not reportFail.has_option("content", index + "_c") or not reportFail.has_option("content", index + "_t"): # _c 是內容 _t 是重試次數 msgTxt = "content sector 缺少 %s_c 或 %s_t" %(index, index) sendRtx(MYCONF.admins, thisFuncName + "[系統錯誤] " + msgTxt) return 1 content = reportFail.get("content", index + "_c") retry = reportFail.getint("content", index + "_t") retry += 1 code = report2server(content, retry) if 0 == code: # 發送成功 indexVec.remove(index) if indexVec: reportFail.set("index", "index", ",".join(indexVec)) else: reportFail.set("index", "index", "") reportFail.remove_option("content", index + "_c") reportFail.remove_option("content", index + "_t") elif retry > MYCONF.maxRetry: # 重發失敗,且超過最大重試次數 indexVec.remove(index) if indexVec: reportFail.set("index", "index", ",".join(indexVec)) else: reportFail.set("index", "index", "") reportFail.remove_option("content", index + "_c") reportFail.remove_option("content", index + "_t") else: # 重發失敗, 更新 _t (retry) 欄位 reportFail.set("content", index + "_t", retry) else: # 發送新數據 index = data["report_time"].replace(" ", "").replace("-", "").replace(":", "") content = urllib.urlencode(data) retry = 0 code = report2server(content, retry) if 0 == code: return 0 if not reportFail.has_section("index"): reportFail.add_section("index") reportFail.set("index", "index", index) reportFail.add_section("content") reportFail.set("content", index + "_c", content) reportFail.set("content", index + "_t", retry) else: indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip()) indexVec.append(index) if len(indexVec) > MYCONF.maxFailRecord: # 超過最大 fail record 數 reportFail.set("index", "index", ",".join(indexVec[len(indexVec) - MYCONF.maxFailRecord:])) reportFail.set("content", index + "_c", content) reportFail.set("content", index + "_t", retry) for i in range(0, len(indexVec) - MYCONF.maxFailRecord): delIndex = indexVec[i] reportFail.remove_option("content", delIndex + "_c") reportFail.remove_option("content", delIndex + "_t") else: reportFail.set("index", "index", ",".join(indexVec)) reportFail.set("content", index + "_c", content) reportFail.set("content", index + "_t", retry) return 0 except: exceptmsg = StringIO.StringIO() traceback.print_exc(file=exceptmsg) msgTxt = exceptmsg.getvalue() sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt) return 1
3) 獲取shell命令輸出函數
def getCmdResult(cmdStr): '''獲取shell命令的返回碼,標準輸出,標準錯誤''' #child = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) #cmdStdout, cmdStderr = child.communicate() #cmdCode = child.wait() #return (cmdCode, cmdStdout, cmdStderr) thisFuncName = str(sys._getframe().f_code.co_name) nowTime = int(time.time()) tmpstdout = os.path.join(MYCONF.basePath, "cmd.stdout.%d.tmp" % nowTime) tmpstderr = os.path.join(MYCONF.basePath, "cmd.stderr.%d.tmp" % nowTime) if "debug" == MYCONF.role: msgTxt = "[%d]Run Cmd: %s" % (nowTime, cmdStr) logger.debug(msgTxt) cmdStr = "(%s) 1>%s 2>%s" %(cmdStr, tmpstdout, tmpstderr) cmdCode = os.system(cmdStr) >> 8 cdmStdout = cmdStderr = "" try: outfd = open(tmpstdout) cmdStdout = outfd.read() errfd = open(tmpstderr) cmdStderr = errfd.read() except: exceptmsg = StringIO.StringIO() traceback.print_exc(file=exceptmsg) msgTxt = exceptmsg.getvalue() sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt) cmdCode = 110 else: outfd.close() errfd.close() os.remove(tmpstderr) os.remove(tmpstdout) return (cmdCode, cmdStdout, cmdStderr)
4) 讀/寫Cache函數
def readLastCache(): global lastCache lastCache = ConfigParser.ConfigParser() if not os.path.isfile(MYCONF.lastCacheFile): try: fd = open(MYCONF.lastCacheFile, "w") except IOError, e: logger.error("I/O error({}): {}".format(e.errno, e.strerror)) return 1 else: fd.close() lastCache.readfp(open(MYCONF.lastCacheFile), "rb") return 0 def writeCache(): thisFuncName = "" try: thisFuncName = str(sys._getframe().f_code.co_name) lastCache.write(open(MYCONF.lastCacheFile, 'w')) return 0 except: exceptmsg = StringIO.StringIO() traceback.print_exc(file=exceptmsg) msgTxt = exceptmsg.getvalue() logger.error(msgTxt) return 1
5) 讀/寫失敗記錄
def readFailRecord(): global reportFail reportFail = ConfigParser.ConfigParser() if not os.path.isfile(MYCONF.lastReportFailFile): try: fd = open(MYCONF.lastReportFailFile, "w") except IOError, e: logger.error("I/O error({}): {}".format(e.errno, e.strerror)) return 1 else: fd.close() reportFail.readfp(open(MYCONF.lastReportFailFile), "rb") return 0 def writeFailRecord(): thisFuncName = "" try: thisFuncName = str(sys._getframe().f_code.co_name) reportFail.write(open(MYCONF.lastReportFailFile, 'w')) return 0 except: exceptmsg = StringIO.StringIO() traceback.print_exc(file=exceptmsg) msgTxt = exceptmsg.getvalue() logger.error(msgTxt) return 1
6) main函數
def main(): data["osType"] = 0 # 0表示 linux data["version"] = MYCONF.version # 當前程式的自定義版本號 data["report_time"] = time.strftime("%Y-%m-%d %H:%M:00") #上報時間,由於目前基礎監控是分鐘級監控粒度,因此秒取 00 initLog() logger.info('='*80) if 0 == checkLastPid() and 0 == readLastCache() and 0 == getLoginIp(): readFailRecord() # 讀取早遷採集周期上報失敗,需要重傳的數據 reportScheduler(reportRecord=1) #從 fail record 中選取最近的一條信息上報給伺服器 if 0 == getRiakMeta(): reportScheduler(reportRecord=0) writeFailRecord() writeCache() logger.info('='*80) logging.shutdown() return
3、添加/卸載監控
1) 添加監控
添加監控需要先添加集群(不支持先添加IP),添加集群會預設把所有IP都添加監控(前臺將在clusterConf新增記錄,併在singleConf增加對應的ip記錄,然後調用調度平臺,檢測ip是否已經安裝)如果該集群在CMDB裡面新增Ip,則需要手動添加監控(前臺提供新增監控節點,插入singleConf)
2) 卸載監控
(1) 卸載監控可以卸載整個集群的監控(將clusterConf needMonitor置0,同步將singleConf的needMonitor都置0,然後調用
調度平臺
卸載集群下的所有機器,如果該ip存在其他集群並且需要監控,則不用調用
調度平臺
卸載)也可以卸載單個節點的監控(前臺將singleConf的needMonitor置0,調用
調度平臺
,同樣判斷ip是否存在其他集群) (2) 添加卸載監控部由前臺調用
調度平臺
介面,並修改資料庫(插入數據或者更新need_monitor) (3) Single/cluster dislplay表會同步conf表的數據,只保留need_monitor=1的數據
4、CMDB數據同步
後臺一直同步CMDB的數據和conf表的數據,如果不在CMDB的則需要刪掉conf裡面的數據,不管needMonitor的值為多少。刪除三級業務的話只需要刪除clusterConf表對應的記錄,single會自動同步外鍵(嘗試調用
調度平臺
卸載介面,卸載掉被刪除的三級業務ID下麵的所有已安裝監控的IP)
5、前臺展示
1) 集群狀態展示
2) 單機節點狀態展示
原創文章,轉載請備註原文地址 http://www.cnblogs.com/lxmhhy/p/6036330.html
知識交流討論請加qq:1130010617。謝謝合作。