需要註意的是: 1.需要在當前目錄下創建hosts.txt文件,文件內容格式為: https://www.baidu.com,百度首頁 https://www.trc.com,泰然城首頁 https://www.jd.com,京東商城 2.ding_url換成自己的釘釘機器人webhook鏈接,也可 ...
import re import requests import json from threading import Thread,Lock from concurrent.futures import ThreadPoolExecutor list1 = [] list2 = [] code_list = [200,301,302,401] # 定義正確的狀態碼 class MyThread(Thread): ''' 用來獲取線程的值 ''' def __init__(self,func,args=()): super(MyThread, self).__init__() self.func = func self.args = args def run(self): self.result = self.func(*self.args) def get_result(self): try: return self.result except Exception: return None def get_url(): ''' 打開存放url的文件,並將結果返回出去 :return: ''' try: with open('hosts.txt','r',encoding='utf-8') as f: data = f.readlines() return data except Exception: # 文件不存在則返回False return False def verdictUrl(): ''' 從hosts.txt文件中取出url,然後進行合法性檢測 :return: ''' url_list = [] comment_list = [] get_url_res = get_url() if get_url_res: for data in get_url_res: url = data.split(',')[0] comment = data.split(',')[-1] try: res = re.search(r'http\w{0,1}://(\w+\.){2}\w+.*', url).group() url_list.append(res) comment_list.append(comment) except Exception: print('url:%s 有誤'%url) return (url_list,comment_list) else: print('文件不存在......') def getStatusCode(url,comment): ''' 獲取網站的狀態碼,並將它返回出去 :param url: :param comment: :return: ''' global list1,list2 try: res = requests.head(url) if res.status_code in code_list: lock.acquire() # 開始添加互斥鎖 list1.append(res.status_code) lock.release() except requests.exceptions.ConnectionError: status = 0 # 自定義狀態碼 lock.acquire() list2.append(status) lock.release() else: status = res.status_code # 將狀態碼賦值給status finally: return {'url':url,'StatusCode':status,'comment':comment} def sendDingDing(bc): ''' 用來接收getStatusCode的返回值以及釘釘發送消息 :param bc: :return: ''' ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=e0bef403aded94c230953384353bc411a7fba57389ebd59bc0e63cc602ec175f' HEADERS = { "Content-Type": "application/json ;charset=utf-8" } bc = bc.result() url = bc['url'] status = bc['StatusCode'] comment = bc['comment'] string_textMsg = { 'msgtype': 'text', 'text': { # 自行添加需要的內容 'content': 'url地址:%s\n' 'url名稱:%s\n' '狀態碼:%s\n'% (url, comment,status) } } string_textMsg = json.dumps(string_textMsg) # 序列化到記憶體中 res = requests.post(ding_url, data=string_textMsg, headers=HEADERS) if __name__ == '__main__': lock = Lock() # 創建鎖對象 pool = ThreadPoolExecutor(4) # 線程池 url,comment = verdictUrl() res = zip(url,comment) li = [] for i in res: for j in range(4): # 開啟多線程 t = MyThread(getStatusCode,args = (i[0],i[1])) li.append(t) t.start() for t in li: t.join() if len(list1)>3 or len(list2)>3: # 如果xxxxx,則交給sendDingDing處理 pool.submit(getStatusCode,i[0],i[1]).add_done_callback(sendDingDing)
需要註意的是:
1.需要在當前目錄下創建hosts.txt文件,文件內容格式為:
https://www.baidu.com,百度首頁
https://www.trc.com,泰然城首頁
https://www.jd.com,京東商城
2.ding_url換成自己的釘釘機器人webhook鏈接,也可以換成微信報警