寫在前面 這次的爬蟲是關於房價信息的抓取,目的在於練習10萬以上的數據處理及整站式抓取。 數據量的提升最直觀的感覺便是對函數邏輯要求的提高,針對Python的特性,謹慎的選擇數據結構。以往小數據量的抓取,即使函數邏輯部分重覆,I/O請求頻率密集,迴圈套嵌過深,也不過是1~2s的差別,而隨著數據規模的 ...
寫在前面
這次的爬蟲是關於房價信息的抓取,目的在於練習10萬以上的數據處理及整站式抓取。
數據量的提升最直觀的感覺便是對函數邏輯要求的提高,針對Python的特性,謹慎的選擇數據結構。以往小數據量的抓取,即使函數邏輯部分重覆,I/O請求頻率密集,迴圈套嵌過深,也不過是1~2s的差別,而隨著數據規模的提高,這1~2s的差別就有可能擴展成為1~2h。
因此對於要抓取數據量較多的網站,可以從兩方面著手降低抓取信息的時間成本。
1)優化函數邏輯,選擇適當的數據結構,符合Pythonic的編程習慣。例如,字元串的合併,使用join()要比“+”節省記憶體空間。
2)依據I/O密集與CPU密集,選擇多線程、多進程並行的執行方式,提高執行效率。
一、獲取索引
包裝請求request,設置超時timeout
1 # 獲取列表頁面 2 def get_page(url): 3 headers = { 4 'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' 5 r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3', 6 'Referer': r'http://bj.fangjia.com/ershoufang/', 7 'Host': r'bj.fangjia.com', 8 'Connection': 'keep-alive' 9 } 10 timeout = 60 11 socket.setdefaulttimeout(timeout) # 設置超時 12 req = request.Request(url, headers=headers) 13 response = request.urlopen(req).read() 14 page = response.decode('utf-8') 15 return page
一級位置:區域信息
二級位置:板塊信息(根據區域位置得到板塊信息,以key_value對的形式存儲在dict中)
以dict方式存儲,可以快速的查詢到所要查找的目標。-> {'朝陽':{'工體','安貞','健翔橋'......}}
三級位置:地鐵信息(搜索地鐵周邊房源信息)
將所屬位置地鐵信息,添加至dict中。 -> {'朝陽':{'工體':{'5號線','10號線' , '13號線'},'安貞','健翔橋'......}}
對應的url:http://bj.fangjia.com/ershoufang/--r-%E6%9C%9D%E9%98%B3%7Cw-5%E5%8F%B7%E7%BA%BF%7Cb-%E6%83%A0%E6%96%B0%E8%A5%BF%E8%A1%97
解碼後的url:http://bj.fangjia.com/ershoufang/--r-朝陽|w-5號線|b-惠新西街
根據url的參數模式,可以有兩種方式獲取目的url:
1)根據索引路徑獲得目的url
1 # 獲取房源信息列表(嵌套字典遍歷) 2 def get_info_list(search_dict, layer, tmp_list, search_list): 3 layer += 1 # 設置字典層級 4 for i in range(len(search_dict)): 5 tmp_key = list(search_dict.keys())[i] # 提取當前字典層級key 6 tmp_list.append(tmp_key) # 將當前key值作為索引添加至tmp_list 7 tmp_value = search_dict[tmp_key] 8 if isinstance(tmp_value, str): # 當鍵值為url時 9 tmp_list.append(tmp_value) # 將url添加至tmp_list 10 search_list.append(copy.deepcopy(tmp_list)) # 將tmp_list索引url添加至search_list 11 tmp_list = tmp_list[:layer] # 根據層級保留索引 12 elif tmp_value == '': # 鍵值為空時跳過 13 layer -= 2 # 跳出鍵值層級 14 tmp_list = tmp_list[:layer] # 根據層級保留索引 15 else: 16 get_info_list(tmp_value, layer, tmp_list, search_list) # 當鍵值為列表時,迭代遍歷 17 tmp_list = tmp_list[:layer] 18 return search_list
2)根據dict信息包裝url
{'朝陽':{'工體':{'5號線'}}}
參數:
—— r-朝陽
—— b-工體
—— w-5號線
組裝參數:http://bj.fangjia.com/ershoufang/--r-朝陽|w-5號線|b-工體
1 # 根據參數創建組合url 2 def get_compose_url(compose_tmp_url, tag_args, key_args): 3 compose_tmp_url_list = [compose_tmp_url, '|' if tag_args != 'r-' else '', tag_args, parse.quote(key_args), ] 4 compose_url = ''.join(compose_tmp_url_list) 5 return compose_url
二、獲取索引頁最大頁數
1 # 獲取當前索引頁面頁數的url列表 2 def get_info_pn_list(search_list): 3 fin_search_list = [] 4 for i in range(len(search_list)): 5 print('>>>正在抓取%s' % search_list[i][:3]) 6 search_url = search_list[i][3] 7 try: 8 page = get_page(search_url) 9 except: 10 print('獲取頁面超時') 11 continue 12 soup = BS(page, 'lxml') 13 # 獲取最大頁數 14 pn_num = soup.select('span[class="mr5"]')[0].get_text() 15 rule = re.compile(r'\d+') 16 max_pn = int(rule.findall(pn_num)[1]) 17 # 組裝url 18 for pn in range(1, max_pn+1): 19 print('************************正在抓取%s頁************************' % pn) 20 pn_rule = re.compile('[|]') 21 fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1) 22 tmp_url_list = copy.deepcopy(search_list[i][:3]) 23 tmp_url_list.append(fin_url) 24 fin_search_list.append(tmp_url_list) 25 return fin_search_list
三、抓取房源信息Tag
這是我們要抓取的Tag:
['區域', '板塊', '地鐵', '標題', '位置', '平米', '戶型', '樓層', '總價', '單位平米價格']
1 # 獲取tag信息 2 def get_info(fin_search_list, process_i): 3 print('進程%s開始' % process_i) 4 fin_info_list = [] 5 for i in range(len(fin_search_list)): 6 url = fin_search_list[i][3] 7 try: 8 page = get_page(url) 9 except: 10 print('獲取tag超時') 11 continue 12 soup = BS(page, 'lxml') 13 title_list = soup.select('a[class="h_name"]') 14 address_list = soup.select('span[class="address]') 15 attr_list = soup.select('span[class="attribute"]') 16 price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select對於某些屬性值(屬性值中間包含空格)無法識別,可以用find_all(attrs={})代替 17 for num in range(20): 18 tag_tmp_list = [] 19 try: 20 title = title_list[num].attrs["title"] 21 print(r'************************正在獲取%s************************' % title) 22 address = re.sub('\n', '', address_list[num].get_text()) 23 area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0) 24 layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0) 25 floor = re.search('\d/\d', attr_list[num].get_text()).group(0) 26 price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0) 27 unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0) 28 tag_tmp_list = copy.deepcopy(fin_search_list[i][:3]) 29 for tag in [title, address, area, layout, floor, price, unit_price]: 30 tag_tmp_list.append(tag) 31 fin_info_list.append(tag_tmp_list) 32 except: 33 print('【抓取失敗】') 34 continue 35 print('進程%s結束' % process_i) 36 return fin_info_list
四、分配任務,並行抓取
對任務列表進行分片,設置進程池,並行抓取。
1 # 分配任務 2 def assignment_search_list(fin_search_list, project_num): # project_num每個進程包含的任務數,數值越小,進程數越多 3 assignment_list = [] 4 fin_search_list_len = len(fin_search_list) 5 for i in range(0, fin_search_list_len, project_num): 6 start = i 7 end = i+project_num 8 assignment_list.append(fin_search_list[start: end]) # 獲取列表碎片 9 return assignment_list
1 p = Pool(4) # 設置進程池 2 assignment_list = assignment_search_list(fin_info_pn_list, 3) # 分配任務,用於多進程 3 result = [] # 多進程結果列表 4 for i in range(len(assignment_list)): 5 result.append(p.apply_async(get_info, args=(assignment_list[i], i))) 6 p.close() 7 p.join() 8 for result_i in range(len(result)): 9 fin_info_result_list = result[result_i].get() 10 fin_save_list.extend(fin_info_result_list) # 將各個進程獲得的列表合併
通過設置進程池並行抓取,時間縮短為單進程抓取時間的3/1,總計時間3h。
電腦為4核,經過測試,任務數為3時,在當前電腦運行效率最高。
五、將抓取結果存儲到excel中,等待可視化數據化處理
1 # 存儲抓取結果 2 def save_excel(fin_info_list, file_name): 3 tag_name = ['區域', '板塊', '地鐵', '標題', '位置', '平米', '戶型', '樓層', '總價', '單位平米價格'] 4 book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 預設存儲在桌面上 5 tmp = book.add_worksheet() 6 row_num = len(fin_info_list) 7 for i in range(1, row_num): 8 if i == 1: 9 tag_pos = 'A%s' % i 10 tmp.write_row(tag_pos, tag_name) 11 else: 12 con_pos = 'A%s' % i 13 content = fin_info_list[i-1] # -1是因為被表格的表頭所占 14 tmp.write_row(con_pos, content) 15 book.close()
附上源碼
1 #! -*-coding:utf-8-*- 2 # Function: 房價調查 3 # Author:蘭茲 4 5 from urllib import parse, request 6 from bs4 import BeautifulSoup as BS 7 from multiprocessing import Pool 8 import re 9 import lxml 10 import datetime 11 import cProfile 12 import socket 13 import copy 14 import xlsxwriter 15 16 17 starttime = datetime.datetime.now() 18 19 base_url = r'http://bj.fangjia.com/ershoufang/' 20 21 22 test_search_dict = {'昌平': {'霍營': {'13號線': 'http://bj.fangjia.com/ershoufang/--r-%E6%98%8C%E5%B9%B3|w-13%E5%8F%B7%E7%BA%BF|b-%E9%9C%8D%E8%90%A5'}}} 23 24 search_list = [] # 房源信息url列表 25 tmp_list = [] # 房源信息url緩存列表 26 layer = -1 27 28 29 # 獲取列表頁面 30 def get_page(url): 31 headers = { 32 'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' 33 r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3', 34 'Referer': r'http://bj.fangjia.com/ershoufang/', 35 'Host': r'bj.fangjia.com', 36 'Connection': 'keep-alive' 37 } 38 timeout = 60 39 socket.setdefaulttimeout(timeout) # 設置超時 40 req = request.Request(url, headers=headers) 41 response = request.urlopen(req).read() 42 page = response.decode('utf-8') 43 return page 44 45 46 # 獲取查詢關鍵詞dict 47 def get_search(page, key): 48 soup = BS(page, 'lxml') 49 search_list = soup.find_all(href=re.compile(key), target='') 50 search_dict = {} 51 for i in range(len(search_list)): 52 soup = BS(str(search_list[i]), 'lxml') 53 key = soup.select('a')[0].get_text() 54 value = soup.a.attrs['href'] 55 search_dict[key] = value 56 return search_dict 57 58 59 # 獲取房源信息列表(嵌套字典遍歷) 60 def get_info_list(search_dict, layer, tmp_list, search_list): 61 layer += 1 # 設置字典層級 62 for i in range(len(search_dict)): 63 tmp_key = list(search_dict.keys())[i] # 提取當前字典層級key 64 tmp_list.append(tmp_key) # 將當前key值作為索引添加至tmp_list 65 tmp_value = search_dict[tmp_key] 66 if isinstance(tmp_value, str): # 當鍵值為url時 67 tmp_list.append(tmp_value) # 將url添加至tmp_list 68 search_list.append(copy.deepcopy(tmp_list)) # 將tmp_list索引url添加至search_list 69 tmp_list = tmp_list[:layer] # 根據層級保留索引 70 elif tmp_value == '': # 鍵值為空時跳過 71 layer -= 2 # 跳出鍵值層級 72 tmp_list = tmp_list[:layer] # 根據層級保留索引 73 else: 74 get_info_list(tmp_value, layer, tmp_list, search_list) # 當鍵值為列表時,迭代遍歷 75 tmp_list = tmp_list[:layer] 76 return search_list 77 78 79 # 獲取房源信息詳情 80 def get_info_pn_list(search_list): 81 fin_search_list = [] 82 for i in range(len(search_list)): 83 print('>>>正在抓取%s' % search_list[i][:3]) 84 search_url = search_list[i][3] 85 try: 86 page = get_page(search_url) 87 except: 88 print('獲取頁面超時') 89 continue 90 soup = BS(page, 'lxml') 91 # 獲取最大頁數 92 pn_num = soup.select('span[class="mr5"]')[0].get_text() 93 rule = re.compile(r'\d+') 94 max_pn = int(rule.findall(pn_num)[1]) 95 # 組裝url 96 for pn in range(1, max_pn+1): 97 print('************************正在抓取%s頁************************' % pn) 98 pn_rule = re.compile('[|]') 99 fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1) 100 tmp_url_list = copy.deepcopy(search_list[i][:3]) 101 tmp_url_list.append(fin_url) 102 fin_search_list.append(tmp_url_list) 103 return fin_search_list 104 105 106 # 獲取tag信息 107 def get_info(fin_search_list, process_i): 108 print('進程%s開始' % process_i) 109 fin_info_list = [] 110 for i in range(len(fin_search_list)): 111 url = fin_search_list[i][3] 112 try: 113 page = get_page(url) 114 except: 115 print('獲取tag超時') 116 continue 117 soup = BS(page, 'lxml') 118 title_list = soup.select('a[class="h_name"]') 119 address_list = soup.select('span[class="address]') 120 attr_list = soup.select('span[class="attribute"]') 121 price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select對於某些屬性值(屬性值中間包含空格)無法識別,可以用find_all(attrs={})代替 122 for num in range(20): 123 tag_tmp_list = [] 124 try: 125 title = title_list[num].attrs["title"] 126 print(r'************************正在獲取%s************************' % title) 127 address = re.sub('\n', '', address_list[num].get_text()) 128 area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0) 129 layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0) 130 floor = re.search('\d/\d', attr_list[num].get_text()).group(0) 131 price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0) 132 unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0) 133 tag_tmp_list = copy.deepcopy(fin_search_list[i][:3]) 134 for tag in [title, address, area, layout, floor, price, unit_price]: 135 tag_tmp_list.append(tag) 136 fin_info_list.append(tag_tmp_list) 137 except: 138 print('【抓取失敗】') 139 continue 140 print('進程%s結束' % process_i) 141 return fin_info_list 142 143 144 # 分配任務 145 def assignment_search_list(fin_search_list, project_num): # project_num每個進程包含的任務數,數值越小,進程數越多 146 assignment_list = [] 147 fin_search_list_len = len(fin_search_list) 148 for i in range(0, fin_search_list_len, project_num): 149 start = i 150 end = i+project_num 151 assignment_list.append(fin_search_list[start: end]) # 獲取列表碎片 152 return assignment_list 153 154 155 # 存儲抓取結果 156 def save_excel(fin_info_list, file_name): 157 tag_name = ['區域', '板塊', '地鐵', '標題', '位置', '平米', '戶型', '樓層', '總價', '單位平米價格'] 158 book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 預設存儲在桌面上 159 tmp = book.add_worksheet() 160 row_num = len(fin_info_list) 161 for i in range(1, row_num): 162 if i == 1: 163 tag_pos = 'A%s' % i 164 tmp.write_row(tag_pos, tag_name) 165 else: 166 con_pos = 'A%s' % i 167 content = fin_info_list[i-1] # -1是因為被表格的表頭所占 168 tmp.write_row(con_pos, content) 169 book.close() 170 171 172 if __name__ == '__main__': 173 file_name = input(r'抓取完成,輸入文件名保存:') 174 fin_save_list = [] # 抓取信息存儲列表 175 # 一級篩選 176 page = get_page(base_url) 177 search_dict = get_search(page, 'r-') 178 # 二級篩選 179 for k in search_dict: 180 print(r'************************一級抓取:正在抓取【%s】************************' % k) 181 url = search_dict[k] 182 second_page = get_page(url) 183 second_search_dict = get_search(second_page, 'b-') 184 search_dict[k] = second_search_dict 185 # 三級篩選 186 for k in search_dict: 187 second_dict = search_dict[k] 188 for s_k in second_dict: 189 print(r'************************二級抓取:正在抓取【%s】************************' % s_k) 190 url = second_dict[s_k] 191 third_page = get_page(url) 192 third_search_dict = get_search(third_page, 'w-') 193 print('%s>%s' % (k, s_k)) 194 second_dict[s_k] = third_search_dict 195 fin_info_list = get_info_list(search_dict, layer, tmp_list, search_list) 196 fin_info_pn_list = get_info_pn_list(fin_info_list) 197 p = Pool(4) # 設置進程池 198 assignment_list = assignment_search_list(fin_info_pn_list, 2)