最近在寫一個批量巡檢工具,利用ansible將腳本推到各個機器上執行,然後將執行的結果以json格式返回來。 如下所示: # ansible node2 -m script -a /root/python/health_check.py 然後將結果重定向到一個文本文件中,再通過另外一個腳本,對該文本 ...
最近在寫一個批量巡檢工具,利用ansible將腳本推到各個機器上執行,然後將執行的結果以json格式返回來。
如下所示:
# ansible node2 -m script -a /root/python/health_check.py
node2 | SUCCESS => { "changed": true, "rc": 0, "stderr": "Shared connection to 192.168.244.20 closed.\r\n", "stdout": "{'cpu_iowait': '0.00', 'swap_out': 0, 'cpu_usr': '0.00', 'cpu_idle': '100.00', 'swap_total': '1999', 'swap_used': '78' , 'load_average_5': '0.11', 'mem_util': '92.0', 'uptime': '5', 'load_average_1': '0.03', 'cpu_sys': '0.00', 'mem_total': '475', 'swap_in': 0, 'load_average_15': '0.06', 'disk': ['Filesystem Size Used Avail Use% Mounted on\\n', '/dev/sda3 18G 8.6G 8.1G 52% /\\n', 'tmpfs 238M 0 238M 0% /dev/shm\\n', '/dev/sda1 190M 27M 154M 15% /boot\\n'], 'numa': '1'}\r\n",
"stdout_lines": [ "{'cpu_iowait': '0.00', 'swap_out': 0, 'cpu_usr': '0.00', 'cpu_idle': '100.00', 'swap_total': '1999', 'swap_used': '78', 'loa d_average_5': '0.11', 'mem_util': '92.0', 'uptime': '5', 'load_average_1': '0.03', 'cpu_sys': '0.00', 'mem_total': '475', 'swap_in': 0, 'load_average_15': '0.06', 'disk': ['Filesystem Size Used Avail Use% Mounted on\\n', '/dev/sda3 18G 8.6G 8.1G 52% /\\n', 'tmpfs 238M 0 238M 0% /dev/shm\\n', '/dev/sda1 190M 27M 154M 15% /boot\\n'], 'numa': '1'}" ] }
然後將結果重定向到一個文本文件中,再通過另外一個腳本,對該文本文件進行解析彙總,最後實現的結果如下:
ip uptime cpu_usr cpu_sys cpu_iowait cpu_idle load_average_1 load_average_5 ... 192.168.244.30 24 0 0 6 94 0.02 0.08 ... 192.168.244.20 24 0 0 0 100 0 0.01 ...
但總感覺這種方式有點low,對返回結果進行解析,這似乎是一個比較普遍的需求吧?
沒道理,官方會對這種需求視而不見的,其實,官方提供了一個callback插件,來實現回調功能,裡面定義了若幹場景,譬如主機不可達,執行任務失敗,執行任務成功等,分別對應不同的方法,這樣就可以實現在不同的場景觸發不同的操作,譬如,如果執行playbook失敗了就發送郵件等,執行成功了將返回的結果保存到資料庫中。
官方給了一個樣例,具體可見:https://github.com/ansible/ansible/blob/devel/lib/ansible/plugins/callback/log_plays.py
基於上面這個樣例,自己進行了定製性開發。本來想在callback插件中實現所有功能,但callback插件調試相當麻煩,不允許使用print函數,而且如果出現問題了,譬如列表下標越界,也只是在執行ansible時給出報錯信息,並沒有指出具體的報錯行數。
最後,放棄了自己ALL IN ONE的想法,只是將返回的結果解析後保存到sqlite3資料庫中,後續再基於資料庫中的數據進行彙總。
代碼如下:
from __future__ import (absolute_import, division, print_function) __metaclass__ = type import os import time import json import sqlite3 from ansible.module_utils._text import to_bytes from ansible.plugins.callback import CallbackBase class CallbackModule(CallbackBase): """ logs playbook results, per host, in /var/log/ansible/hosts """ CALLBACK_VERSION = 2.0 CALLBACK_TYPE = 'notification' CALLBACK_NAME = 'performance_check' CALLBACK_NEEDS_WHITELIST = False def __init__(self): super(CallbackModule, self).__init__() def runner_on_failed(self, host, res, ignore_errors=False): pass def runner_on_ok(self, host, res): performance_data=PerformanceData() create_table_sql = 'CREATE TABLE performance_data(ip varchar(20) primary key, uptime varchar(20),cpu_usr DECIMAL,cpu_sys DECI MAL, cpu_iowait DECIMAL,cpu_idle DECIMAL,load_average_1 DECIMAL,load_average_5 DECIMAL,load_average_15 DECIMAL, mem_total INTEGER,mem_util DECIMAL,swap_total INTEGER,swap_used INTEGER,swap_in INTEGER,swap_out INTEGER, numa TINYINT)'
insert_sql = 'insert into performance_data values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)' insert_value = str_to_json(host,res) performance_data.create_table(create_table_sql) performance_data.insert_command(insert_sql,insert_value) performance_data.quit() def runner_on_skipped(self, host, item=None): #self.log(host, 'SKIPPED', '...') pass def runner_on_unreachable(self, host, res): #self.log(host, 'UNREACHABLE', res) pass def runner_on_async_failed(self, host, res, jid): #self.log(host, 'ASYNC_FAILED', res) pass def playbook_on_import_for_host(self, host, imported_file): pass def playbook_on_not_import_for_host(self, host, missing_file): pass class PerformanceData(): def __init__(self): self.conn = sqlite3.connect("/tmp/data.db") self.cursor = self.conn.cursor() def create_table(self,create_table_sql): self.cursor.execute(create_table_sql) def insert_command(self,insert_sql,insert_value): self.cursor.execute(insert_sql,insert_value) def query(self,query_sql): self.cursor.execute(query_sql) results=self.cursor.fetchall() return results def quit(self): self.conn.commit() self.conn.close() def str_to_json(host,res): result= res["stdout"].strip(" ").replace("'",'"').strip('\n').strip('"') results= '{"'+host+'":'+result+'}' result_with_host = json.loads(results) value=result_with_host[host] return (host,value['uptime'],float(value['cpu_usr']),float(value['cpu_sys']),float(value['cpu_iowait']), float(value['cpu_idle']), float(value['load_average_1']), float(value['load_average_5']), float(value['load_average_15 ']), int(value['mem_total']), float(value['mem_util']),int(value['swap_total']),int(value['swap_used']),int(value['swap_in' ]), int(value['swap_out']), int(value['numa']) )
這裡一併附上,上述解析文本的腳本,似乎更能實現我ALL IN ONE的想法,哈哈~
#coding: utf8 import re,json,sqlite3 def get_ip_success(): with open(r'C:\Users\Administrator\Desktop\2.txt') as f: ip_unreachable = [] ip_failed = [] ip_success=[] line_num=0 for line in f.readlines(): if re.search('UNREACHABLE', line): ip=line.split()[0] ip_unreachable.append(ip) flag=0 elif re.search('FAILED',line): ip = line.split()[0] ip_failed.append(ip) flag=0 elif re.search('SUCCESS',line): ip = line.split()[0] flag=1 line_num=1 elif flag == 1 and line_num == 7: line= line.strip(" ").replace("'",'"').strip('\n').strip('"') stdout_lines= '{"'+ip+'":'+line+'}' stdout_lines_with_ip = json.loads(stdout_lines) ip_success.append(stdout_lines_with_ip) line_num =line_num + 1 return ip_success def os_status_generator(ip_success): for os_status in ip_success: for key,value in os_status.iteritems(): yield (key,value['uptime'],float(value['cpu_usr']),float(value['cpu_sys']),float(value['cpu_iowait']), float(value['cpu_idle']), float(value['load_average_1']), float(value['load_average_5']), float(value['load_average_15']), int(value['mem_total']), float(value['mem_util']),int(value['swap_total']),int(value['swap_used']),int(value['swap_in']), int(value['swap_out']), int(value['numa']) ) class OsStatus(): def __init__(self,ip_success): try: self.conn = sqlite3.connect(":memory:") self.cursor = self.conn.cursor() self.cursor.execute('''CREATE TABLE os_status (ip varchar(20) primary key, uptime varchar(20),cpu_usr DECIMAL,cpu_sys DECIMAL,cpu_iowait DECIMAL,cpu_idle DECIMAL, load_average_1 DECIMAL,load_average_5 DECIMAL,load_average_15 DECIMAL,mem_total INTEGER,mem_util DECIMAL, swap_total INTEGER,swap_used INTEGER,swap_in INTEGER,swap_out INTEGER,numa TINYINT)''') self.cursor.executemany("insert into os_status values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",os_status_generator(ip_success) ) except Exception as e: print e; def query(self,sql): self.cursor.execute(sql) results=self.cursor.fetchall() column_size=len(results[0]) column_name= [column[0] for column in self.cursor.description] for i in range(column_size): print column_name[i].ljust(15), print for each_result in results: for i in range(column_size): print str(each_result[i]).ljust(15), print def quit(self): try: self.cursor.close() self.conn.close() except Exception as e: print e; ip_success=get_ip_success() os_status=OsStatus(ip_success) sql = "select * from os_status" os_status.query(sql)
最後,再提一下ansible中如何開啟callback插件功能,預設是關閉的。
開啟兩個選項:
callback_plugins = /usr/share/ansible/plugins/callback
bin_ansible_callbacks = True
這兩個是必需的,另外一個選項是
callback_whitelist = performance_check
其中,performance_check對應的是上面callback插件中定義的“CALLBACK_NAME”,
另一個相關參數是“CALLBACK_NEEDS_WHITELIST”,如果設置為False,則無需設置callback_whitelist選項,反之,則必須在callback_whitelist選項中指定“CALLBACK_NAME”。