本文介紹了Python 非同步編程技術asyncio ,使用場景,介紹了同步編程,非同步編程原理,非同步技術的優勢,非同步語法 async await, 協程,create_task, gather, event loop, asyncio.run() 等,用回調函數callback 來解析響應消息,實... ...
一、問題:當發送API請求,讀寫資料庫任務較重時,程式運行效率急劇下降。
非同步技術是Python編程中對提升性能非常重要的一項技術。在實際應用,經常面臨對外發送網路請求,調用外部介面,或者不斷更新資料庫或文件等操作。 這這些操作,通常90%以上時間是在等待,如通過REST, gRPC向伺服器發送請求,通常可能等待幾十毫秒至幾秒,甚至更長。如果業務較重,按順序執行編程,會導致大量時間用在等待上,程式運行效率急劇下降。
常見的場景,就是爬蟲軟體通常會發起很多請求,如果採用同步編程方式工,往往運行時間很長。
二、非同步編程的優勢
通常的編程,如果有4個任務,採用同步編程模式,4個任務是按順序執行的,分別用時:10s,7s,5s,6s,共耗時28s; 而非同步方式,就是讓4個任務同時執行,總耗時降為10s,改善效果是很明顯的。
那時非同步編程是如何做到的?
非同步編程,將每個任務改成協程執行,在遇到需要等待的語句時,即暫時將執行權交還給主程式的控制迴圈event loop,其它協程可以繼續使用CPU等資源。而當該協程收到響應後,會用事件通知event loop,申請繼續執行。 這樣就避免了由於等待期間還占用CPU資源的情形。 因此程式執行效率大為提高。
但如果任務是計算密集型的,那麼非同步技術對性能提升幫助不大,需要採用其它方式,如多進程編程。或者Cython 等。
三、用同步編程方式,抓取多個網站數據
先看一下,採用同步編程順序執行,抓取多個網站數據的耗時。 這些網站中,
其中http://www.google.com 是無響應的,會超時。因此在 requests.get()方法,設置 timeout=3, 即超過3秒,會拋出TimeOutException 異常。
代碼如下:
import requests
import time
# 測試時將測試網址替換
urls = [
"http://www.bxxxx.com",
"http://www.aaaa.com",
"http://www.bbbb.com",
"http://www.cccc.com",
"http://www.sdddd.com",
"http://www.jdddd.com",
"http://www.zeeee.com",
"http://www.tffff.com",
"http://www.cgggg.com",
"http://www.zhhhhh.com.cn",
"http://www.google.com",
"https://www.yiiiii.com/",
]
def check_one_ip(url):
headers = {
"user-ageng": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 \
(KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69"
}
TIMEOUT = 3
result = ()
try:
response = requests.get(url, headers=headers, timeout=TIMEOUT)
print(f"response from {url} is : {response.status_code}")
if 200 <= response.status_code < 300:
print(f"length of response body is {len(response.text)}")
result = (url, response.status_code)
except Exception as e:
print(f"{url} met timeout error")
return (url, 999)
return result
def main():
results = []
for url in urls:
result = check_one_ip(url)
results.append(result)
if __name__ == "__main__":
t1 = time.time()
main()
t2 = time.time()
print(f"total time: {t2-t1:.3f}s")
運行代碼,向12個網站發送request, 列印response的狀態碼,總耗時為:6.035s
,
response from url is : 200
length of response body is 2381
response from url is : 200
length of response body is 24000
response from url is : 200
length of response body is 106117
response from url is : 403
response from url is : 404
response from url is : 200
length of response body is 177104
response from url is : 200
length of response body is 37989
response from url is : 200
length of response body is 89513
response from url is : 200
length of response body is 32642
response from url is : 403
url met timeout error
response from url is : 200
length of response body is 834
total time: 6.035s
四、用非同步方式,同時抓取多個網站數據
現在,採用Asyncio非同步編程,以併發的運行方式,向多個網站同時發送request, 總耗時,應該是用時最長那個協程的用時。這裡我們使用了timeout, 就是3秒左右。
AsyncIO非同步編程步驟:
- 定義非同步任務函數
使用 asyc / await 關鍵字。在耗時操作前加await - 創建asyncio.create_task() 方法創建協程任務
- 在main()方法中用gather() 彙集協程任務,以便併發執行。
gather()方法返回結果是一個由所有返回值聚合而成的迭代器 - 在主線程的event loop中運行main()
asyncio模塊提供了1個.run()來啟動 event loop 非同步控制迴圈,並執行main()方法, - 可選,給協程添加回調函數來解析網站響應結果
對於每個Task, 可用 add_done_callback(task_callback) 方法添加回調函數,此例中,對顯示response的狀態碼。
其它說明
- 由於requests庫的 response對象不支持 await語句,因此這裡使用htppx 庫來代替requests, 除了非同步介面外,其它使用方式完全一致。
完整代碼
import asyncio
import httpx
from concurrent.futures import ThreadPoolExecutor, Future
import time
import contextvars
# 測試時將測試網址替換
urls = [
"http://www.bxxxx.com",
"http://www.aaaa.com",
"http://www.bbbb.com",
"http://www.cccc.com",
"http://www.sdddd.com",
"http://www.jdddd.com",
"http://www.zeeee.com",
"http://www.tffff.com",
"http://www.cgggg.com",
"http://www.zhhhhh.com.cn",
"http://www.google.com",
"https://www.yiiiii.com/",
]
async def check_one_ip(url):
headers = {
"user-ageng": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 \
(KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69"
}
TIMEOUT = 3
result = ()
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers,timeout=TIMEOUT)
print(f"response from {url} is : {response.status_code}")
if 200 <= response.status_code < 300:
print(f"length of response body is {len(response.text)}")
result = (url, response.status_code)
except Exception as e:
print(f"{url} met timeout error")
return (url, 999)
return result
def task_callback(context):
# print response.status_code
url, code = context.result()
print(f"It is callback, got status_code: {code} of {url}")
async def main():
tasks=[]
for url in urls:
task = asyncio.create_task(check_one_ip(url))
task.add_done_callback(task_callback)
tasks.append(task)
await asyncio.gather(*tasks)
if __name__=="__main__":
t1 = time.time()
asyncio.run(main())
t2 = time.time()
print(f"total time: {t2-t1:.3f}s")
運行結果如下,可以看到,總耗時: 3.161s
,相比同步編程方式,耗時減少了1半。 隨著發送請求量的增加,可以看到更加明顯的效果。
response from url is : 302
It is callback, got status_code: 302 of url
response from url is : 302
It is callback, got status_code: 302 of url
response from url is : 200
length of response body is 23508
It is callback, got status_code: 200 of url
response from url is : 302
response from url is : 301
It is callback, got status_code: 302 of url
It is callback, got status_code: 301 of url
response from url is : 301
response from url is : 301
response from url is : 301
response from url is : 200
length of response body is 396837
It is callback, got status_code: 301 of url
It is callback, got status_code: 301 of url
It is callback, got status_code: 301 of url
It is callback, got status_code: 200 of url
response from url is : 404
It is callback, got status_code: 404 of url
response from url is : 200
length of response body is 1151330
It is callback, got status_code: 200 of url
url met timeout error
It is callback, got status_code: 999 of url
total time: 3.161s
五、非同步編程註意事項
1)協程不應該執行耗時長的任務
非同步event loop執行期間,雖然各個協程是在工作,但主線程是被阻塞的。本例中,非同步耗時的總時長與訪問google.com超時時長相同,那麼意味著,如果協程中如果有1個是耗時很長的任務,那麼主線程還將被阻塞,非同步解決不了這個問題,這時耗時協程應該拿出來,用子線程、或者子進程來執行。
2) 協程應該彙集後併發執行
遇到一些開發者咨詢,為什麼採用了非同步編程,但性能沒有明顯提升呢? 創建多個協程任務後,必須按第3步,用gather()方法來彙集創建的協程任務,然後用asyncio.run()方法併發運行。 另外官方文檔要求 event loop要在主線程main() 方法中運行。
3)慎用底層編程介面
另外由於官方文檔並未清晰說明 event loop、future對象等低層編程介面,除非你很瞭解非同步低層的實現機制,否則不建議使用低層介面,
使用ayncio.run() 來啟動evnetloop, 使用 task 對象,而非future 對象。