線程池 導包: 回調函數非同步將可迭代對象中的元素進行某種操作 註意事項:callback必須有一個參數,且只能有一個參數 非同步主要是被應用在耗時的操作 測試:同步&非同步效率 搭建一個flask,自己啟動服務,測試執行時間 新建一個 新建一個 文件夾,在該文件夾下創建一個HTML文件,我寫的是 ,隨便 ...
線程池
- 導包:
from multiprocessing.dummy import Pool
- 回調函數非同步將可迭代對象中的元素進行某種操作
- 註意事項:callback必須有一個參數,且只能有一個參數
- 非同步主要是被應用在耗時的操作
from multiprocessing.dummy import Pool
pool = Pool(3) # 實例化線程池對象,3是線程池的最大線程數
# 參數1:回調函數(只是函數名,不加括弧);參數2:列表
# 參數1會接收參數2列表中的某一個元素,回調函數可以對該列表元素進行某種操作
pool.map(callback,list)
測試:同步&非同步效率
搭建一個flask,自己啟動服務,測試執行時間
- 新建一個
server.py
from flask import Flask, render_template
import time
app = Flask(__name__)
@app.route('/xx')
def index_1():
time.sleep(2)
return render_template('test.html')
@app.route('/yy')
def index_2():
time.sleep(2)
return render_template('test.html')
@app.route('/oo')
def index_3():
time.sleep(2)
return render_template('test.html')
if __name__ == '__main__':
app.run(debug=True)
- 新建一個
templates
文件夾,在該文件夾下創建一個HTML文件,我寫的是test.html
,隨便寫點數據
<html lang="en">
<head>
<meta charset="UTF-8"/>
<title>測試</title>
</head>
<body>
<div>
<p>百裡守約</p>
</div>
<div class="song">
<p>李清照</p>
<p>王安石</p>
<p>蘇軾</p>
<p>柳宗元</p>
<a href="http://www.song.com/" title="趙匡胤" target="_self">
<span>this is span</span>
宋朝是最強大的王朝,不是軍隊的強大,而是經濟很強大,國民都很有錢</a>
<a href="" class="du">總為浮雲能蔽日,長安不見使人愁</a>
<img src="http://www.baidu.com/meinv.jpg" alt=""/>
</div>
<div class="tang">
<ul>
<li><a href="http://www.baidu.com" title="qing">清明時節雨紛紛,路上行人欲斷魂,借問酒家何處有,牧童遙指杏花村</a></li>
<li><a href="http://www.163.com" title="qin">秦時明月漢時關,萬里長征人未還,但使龍城飛將在,不教胡馬度陰山</a></li>
<li><a href="http://www.126.com" id="qi">岐王宅里尋常見,崔九堂前幾度聞,正是江南好風景,落花時節又逢君</a></li>
<li><a href="http://www.sina.com" class="du">杜甫</a></li>
<li><a href="http://www.dudu.com" class="du">杜牧</a></li>
<li><b>杜小月</b></li>
<li><i>度蜜月</i></li>
<li><a href="http://www.haha.com" id="feng">鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘</a></li>
</ul>
</div>
</body>
</html>
同步&非同步執行時間
import requests
from bs4 import BeautifulSoup
import time
# 線程池模塊
from multiprocessing.dummy import Pool
urls = [
'http://127.0.0.1:5000/xx',
'http://127.0.0.1:5000/yy',
'http://127.0.0.1:5000/oo',
]
# 數據的爬取,返回爬取到的頁面源碼數據
def get_request(url):
page_text = requests.get(url=url).text
return page_text
# 數據的解析,返回標簽的文本
def parse(page_text):
soup = BeautifulSoup(page_text, 'lxml')
return soup.select('#feng')[0].text
# 同步代碼
if __name__ == '__main__':
start = time.time()
for url in urls:
page_text = get_request(url)
text_data = parse(page_text)
print(text_data)
print(time.time() - start)
"""
執行結果:
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
6.056272029876709
"""
# 非同步代碼
if __name__ == '__main__':
start = time.time()
pool = Pool(3) # 實例化線程池對象
# 參數1:回調函數(只是函數名,不加括弧);參數2:列表
# 參數1會接收參數2列表中的某一個元素,回調函數可以對該列表元素進行某種操作
page_text_list = pool.map(get_request,urls)
text_data = pool.map(parse,page_text_list)
for i in text_data:
print(i)
print(time.time() - start)
"""
執行結果:
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
2.0537397861480713
不用for迴圈速度能提升0.01秒左右
"""
綜上所述:非同步代碼執行效率顯著提高
案例:基於線程池爬取梨視頻
- 思路分析
- 爬取到視頻詳情頁對應的url,存儲到一個可迭代對象中
- 再次發送請求獲取視頻詳情頁真正的視頻地址
- 註意:視頻詳情頁的video是js代碼動態生成的,需要用到正則解析
- 寫一個callback,獲取視頻的二進位文件,持久化存儲
import requests
from lxml import etree
from multiprocessing.dummy import Pool
import re
import os
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}
# 梨視頻財富板塊的地址
main_url = 'https://www.pearvideo.com/category_3'
# 解析出該板塊下視頻詳情頁的src
main_page_text = requests.get(url=main_url, headers=headers).text
tree = etree.HTML(main_page_text)
li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
# 線程池
video_urls = []
for li in li_list:
# 視頻詳情頁的具體地址和視頻標題
detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
name = li.xpath('./div/a/div[2]/text()')[0]
# 對詳情頁發起請求
page_text = requests.get(url=detail_url, headers=headers).text
# 視頻詳情頁的video是js代碼動態生成的,使用正則解析
ex = 'srcUrl="(.*?)",vdoUrl='
video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表類型
dic = {
'url': video_url,
'name': name,
}
video_urls.append(dic)
# 回調函數
def get_video(url):
# 對視頻地址發請求,將二進位文件持久化存儲
video_data = requests.get(url=url['url'], headers=headers).content
file_name = "./video/" + url['name'] + ".mp4"
with open(file_name, 'wb') as f:
f.write(video_data)
print(url['name'], "下載完畢!")
# 創建存儲視頻的文件夾
dir_name = 'video'
if not os.path.exists(dir_name):
os.mkdir(dir_name)
# 實例化線程池
pool = Pool(4)
pool.map(get_video, video_urls)
單線程+多任務的非同步協程
asyncio
(重點)
特殊函數
- 如果一個函數的定義被async關鍵字修飾後,則該函數是一個特殊函數。
- 特殊之處:
- 該函數被調用後,函數內部的實現語句不會被立即執行。
- 該函數會返回一個協程對象
協程
協程就是一個對象。當特殊函數被調用後,該函數就會返回一個協程對象。
協程對象 == 特殊函數
import asyncio from time import sleep async def get_request(url): print('正在請求:', url) sleep(2) print('請求成功:', url) return '666' # 返回一個協程對象 g = get_request("https://www,qq.com")
任務對象
就是對協程對象的進一步封裝(就是一個高級的協程對象)
任務對象 == 協程對象 == 特殊函數(表示某個固定形式的任務)
asyncio.ensure_future(協程對象) task = asyncio.ensure_future(g) # g:協程對象
綁定回調:
# 定義一個task的回調函數 def callback(task): task.result() # 表示的是當前任務對象對應的特殊函數的返回值 print("I'm callback:", task) task.add_done_callback(funcName) # task:任務對象 # funcName:回調函數的名稱
funcName
這個回調函數必須要帶一個參數,這個參數表示的就是當前的任務對象參數.result()
:表示的就是當前任務對象對應的特殊函數的返回值
事件迴圈對象
創建事件迴圈對象
需要將任務對象註冊到該事件迴圈對象中
# 創建事件迴圈對象 loop = asyncio.get_event_loop() # 將任務對象註冊/裝載到事件迴圈對象中,然後需要啟動迴圈對象 loop.run_until_complete(task) # 用於裝載且啟動事件迴圈 # task:任務對象
等待
await
:當阻塞操作結束後讓loop回頭執行阻塞之後的代碼。
掛起
asyncio.wait()
:將當前的任務對象交出cpu的使用權。
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
asyncio.wait # 掛起操作
tasks # 任務對象列表
重點註意事項
- 在特殊函數實現內部不可以出現不支持非同步的模塊代碼,否則會中斷非同步效果
aiohttp
(重點)
requests
:不支持非同步,不可以出現在特殊函數內部。aiohttp
:支持非同步的網路請求模塊,和asyncio
一起使用pip install aiohttp
代碼的編寫
- 寫出基本架構
import asyncio import aiohttp # 基於aiohttp實現非同步的網路請求 async def get_requests(url): # 實例化了一個請求對象 with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: with aio.get(url=url) as response: # text() 獲取字元串形式的響應數據 # read() 獲取bytes類型的響應數據 page_text = await response.text() return page_text
- 細節補充(代碼參照完整代碼)
- 在每一個
with
前加上async
關鍵字 - 在每一個阻塞操作前加上
await
關鍵字
- 在每一個
完整代碼
import asyncio import aiohttp # 基於aiohttp實現非同步的網路請求 async def get_requests(url): # 實例化了一個請求對象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url) as response: # text() 獲取字元串形式的響應數據 # read() 獲取bytes類型的響應數據 page_text = await response.text() return page_text
單任務協程操作
import asyncio
from time import sleep
async def get_request(url):
print('正在請求:', url)
sleep(2)
print('請求成功:', url)
return '666'
# 定義一個task的回調函數
def callback(task):
print("I'm callback:", task)
# 返回一個協程對象
g = get_request("https://www,qq.com")
# 創建一個任務對象
task = asyncio.ensure_future(g)
"""
# 給任務對象綁定回調函數
task.add_done_callback(callback)
# 創建事件迴圈對象
loop = asyncio.get_event_loop()
# 將任務對象註冊/裝載到事件迴圈對象中,然後需要啟動迴圈對象
loop.run_until_complete(task) # 用於裝載且啟動事件迴圈
"""
執行結果:
正在請求: www,qq.com
正在請求: www,qq.com
"""
多任務協程操作
import asyncio
import time
start = time.time()
async def get_request(url):
print('正在請求:', url)
# await 當阻塞操作結束後讓loop回頭執行阻塞之後的代碼
await asyncio.sleep(2)
print('請求成功:', url)
return '666'
urls = [
'http://127.0.0.1:5000/xx',
'http://127.0.0.1:5000/yy',
'http://127.0.0.1:5000/oo',
]
tasks = []
for url in urls:
c = get_request(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
# 將任務列表註冊到事件迴圈的時候一定要將任務列表進行掛起操作
# asyncio.wait() 掛起操作,將當前的任務對象交出cpu的使用權
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:', time.time() - start)
單線程&多任務非同步爬蟲
基於Flask自測
- 測試代碼在上述
測試:同步&非同步效率
,按照上述步驟啟動項目;然後運行下方代碼。
import asyncio
import time
import aiohttp
from lxml import etree
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}
urls = [
'http://127.0.0.1:5000/xx',
'http://127.0.0.1:5000/yy',
'http://127.0.0.1:5000/oo',
]
start = time.time()
"""
# 發起請求,獲取響應數據(不可以實現非同步)
async def get_requests(url):
# requests是不支持非同步的模塊
page_text = requests.get(url).text
return page_text
"""
async def get_requests(url):
"""
基於aiohttp實現非同步的網路請求
:param url:
:return:
"""
# 實例化了一個請求對象
async with aiohttp.ClientSession() as aio:
# with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
async with await aio.get(url=url) as response:
# text() 獲取字元串形式的響應數據
# read() 獲取bytes類型的響應數據
page_text = await response.text()
return page_text
def parse(task):
"""
定義回調函數
:param task:
:return:
"""
page_text = task.result() # 獲取特殊函數的返回值(請求到的頁面源碼數據)
tree = etree.HTML(page_text)
content = tree.xpath('//*[@id="feng"]/text()')[0]
print(content)
tasks = []
for url in urls:
c = get_requests(url)
task = asyncio.ensure_future(c)
task.add_done_callback(parse)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:', time.time() - start)
案例:基於單線程多任務非同步爬取梨視頻
- 思路上述
案例:基於線程池爬取梨視頻
import asyncio
import time
import aiohttp
from lxml import etree
import re
import os
import requests
# time模塊是為了測試爬取視頻的耗時
start = time.time()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}
# 梨視頻財富板塊的地址
main_url = 'https://www.pearvideo.com/category_3'
main_page_text = requests.get(url=main_url, headers=headers).text
tree = etree.HTML(main_page_text)
li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
urls = [] # [{'url': video_url,'name': name},{}...]
for li in li_list:
detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
name = li.xpath('./div/a/div[2]/text()')[0]
page_text = requests.get(url=detail_url, headers=headers).text
# 視頻詳情頁的video是js代碼動態生成的
ex = 'srcUrl="(.*?)",vdoUrl='
video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表類型
dic = {
'url': video_url,
'name': name,
}
urls.append(dic)
# 基於aiohttp實現非同步的網路請求
async def get_requests(url):
# 實例化了一個請求對象
async with aiohttp.ClientSession() as aio:
# with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
async with await aio.get(url=url['url'], headers=headers) as response:
# text() 獲取字元串形式的響應數據
# read() 獲取bytes類型的響應數據
page_read = await response.read()
dic = {
"page_read": page_read,
"name": url['name']
}
return dic
def parse(task):
"""
定義回調函數
:param task:
:return:
"""
dic_info = task.result() # 獲取特殊函數的返回值(請求到的頁面源碼數據)
file_name = "./video/" + dic_info["name"] + ".mp4"
with open(file_name, 'wb') as f:
f.write(dic_info['page_read'])
print(dic_info["name"], "下載完畢!")
tasks = []
for url in urls:
c = get_requests(url)
task = asyncio.ensure_future(c)
task.add_done_callback(parse)
tasks.append(task)
dir_name = 'video'
if not os.path.exists(dir_name):
os.mkdir(dir_name)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:', time.time() - start)