協程 & asyncio & 非同步 1. 協程 (coroutine) 協程不是電腦提供,而是程式員人為創造。 協程(coroutine),也可以被稱為微線程,是一種用戶態內的上下文切換技術。簡而言之,其實就是通過一個線程實現代碼塊互相切換運行。例如: def func1(): print(1) ...
協程 & asyncio & 非同步
1. 協程 (coroutine)
協程不是電腦提供,而是程式員人為創造。
協程(coroutine),也可以被稱為微線程,是一種用戶態內的上下文切換技術。簡而言之,其實就是通過一個線程實現代碼塊互相切換運行。例如:
def func1():
print(1)
...
print(2)
def func2():
print(3)
...
print(4)
func1()
func2()
實現協程有這麼幾種方法:
greenlet
,早期模塊。yield
關鍵字。asyncio
裝飾器(python 3.4)async
、await
關鍵字(python 3.5)
1.1 greenlet
實現協程
pip3 install greenlet
from greenlet import greenlet
def func1():
print(1) # 第 1 步:輸出 1
gr2.switch() # 第 3 步:切換到 func2 函數
print(2) # 第 6 步:輸出 2
gr2.switch() # 第 7 步 切換到 func2 函數,從上一次執行的位置繼續向後執行
def func2():
print(3) # 第 4 步:輸出 3
gr1.switch() # 第 5 步:切換到 func1 函數,從上一次執行的位置繼續向後執行
print(4) # 第 8 步:輸出 4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第 1 步:去執行 func1 函數
1.2 yield
關鍵字
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
偽實現,僅能實現協程的功能。
1.3 asyncio
在 python 3.4 及之後的版本。
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到 IO 耗時操作,自動化切換到 tasks 中其它任務
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到 IO 耗時操作,自動化切換到 tasks 中其它任務
print(4)
tasks = [
asyncio.ensure_future(func1())
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
註意:遇到 IO 阻塞自動切換。
1.4 aynsc
& await
關鍵字
在 python 3.5 及之後的版本。
import asyncio
async def func1():
print(1)
# 網路 IO 請求:下載一張圖片
await asyncio.sleep(2) # 遇到 IO 耗時操作,自動化切換到 tasks 中的其它任務。
print(2)
async def func2():
print(3)
# 網路 IO 請求:下載一張圖片
await asyncio.sleep(2) # 遇到 IO 耗時操作,自動化切換到 tasks 中的其它任務。
print(4)
tasks = [
asyncio.ensure_future(func1())
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2. 協程的意義
在一個線程中如果遇到 IO 等待時間,線程不會傻等,而是利用空閑時間再去乾點其它事情。
案例:下載三張圖片(網路 IO):
-
普通方式(同步)
pip3 install requests
import requests def download_image(url): print("開始下載:", url) response = requests.get(url) print("下載完成") file_name = url.rsplit("_")[-1] with open(file_name, mode="wb") as file_object: file_object.write(response.content) url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg", "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg", "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", ] for item in url_list: download_image(item)
-
協程方式(非同步)
pip3 install aiohttp
import aiohttp import asyncio async def fetch(session, url): print("發送請求:", url) async with session.get(url, verify_ssl=False) as response: content = await response.content.read() file_name = url.rsplit("_")[-1] with open(file_name, mode="wb") as file_object: file_object.write(content) async def main(): async with aiohttp.ClientSession() as session: url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg", "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg", "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == "__main__": aynscio.run(main())
3. 非同步編程
3.1 事件迴圈(Event Loop)
理解成一個死迴圈,去檢測並執行某些代碼。
task_list = [task1, task2, task3, ...]
while True:
executables, completes = [...], [...] # 在 task_list 中檢查所有任務,將可執行和已完成返回
for executable in executables:
execute executable
for complete in completes:
remove complete from task_list
if task_list == []: # 如果 task_list 中的任務都已完成,則終止迴圈
break
import asyncio
# 去生成或獲取一個事件迴圈
loop = asyncio.get_event_loop()
# 將任務放到任務列表
loop.run_until_complete(asyncio.wait(tasks))
3.2 快速上手
協程函數(coroutine function):定義函數時 async def
(加上 async
關鍵字)。
協程對象(coroutine object):執行協程函數得到的協程對象。
async def func():
pass
result = func()
註意到 result = func()
中 call 了 func()
,但並不會執行 func()
內部代碼,只是得到了 func()
的協程對象。
若要執行協程函數內部代碼,需要事件迴圈去處理協程函數得到的協程對象。
async def func():
print("come here.")
result = func()
loop = async.get_event_loop()
loop.run_until_complete(result)
到了 python 3.7 之後,還有更簡便的寫法:
async def func():
print("come here.")
result = func()
# loop = async.get_event_loop()
# loop.run_until_complete(result)
async.run(result) # python 3.7
3.3 await
關鍵字
await 一般要加上 可等待的對象(協程對象、Future 對象、Task 對象),可以簡單理解為 IO 等待(但實際上並沒有這麼簡單)。
示例 1:
import asyncio
async def func():
print("come here.")
response = await asyncio.sleep(2) # 沒有什麼意義,假設這是一個 IO 等待(例如網路請求)
print("terminate", response)
asyncio.run(func())
在事件迴圈內部,執行協程對象 func()
時會先執行 print("come here.")
,接下來會進入 IO 等待,此時事件迴圈會跳出 func()
函數去執行其它任務,一旦 response
得到返回值(即結束 IO 等待),事件迴圈會在下一次迴圈中檢測到 IO 等待已經結束,此刻才會繼續執行 func()
後面的代碼(即 print("terminate", response)
)。
示例 2(協程對象之間可以嵌套):
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "返回值"
async def func():
print("執行協程函數內部代碼")
# 遇到 IO 操作掛起當前協程(任務),等 IO 操作完成之後再繼續往下執行。當前協程掛起時,事件迴圈可以去執行其它協程(任務)。
response = await others()
print("IO 請求結束,結果為:", response)
asyncio.run(func())
示例 3:
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "返回值"
async def func():
print("執行協程函數內部代碼")
# 遇到 IO 操作掛起當前協程(任務),等 IO 操作完成之後再繼續往下執行。當前協程掛起時,事件迴圈可以去執行其它協程(任務)。
response_1 = await others()
print("IO 請求結束,結果為:", response_1)
response_2 = await others()
print("IO 請求結束,結果為:", response_2)
asyncio.run(func())
await
關鍵字的含義就是,等待對象的值得到返回結果之後再繼續向下運行。
3.4 Task
對象
Tasks are used to schedule coroutines concurrently.
When a coroutine is wrapped into a Task with functions like
asyncio.create_task()
the coroutine is automatically scheduled to run soon.
簡單來說,它可以在事件迴圈中添加多個任務。
Tasks 用於併發調度協程,通過
asyncio.create_task(協程對象)
的方式創建 Task 對象,這樣可以讓協程加入事件迴圈中等待被調度執行。除了使用asyncio.create_task()
函數以外的,還可以用低層級的loop.create_task()
或ensure_future()
函數。不建議手動實例化 Task 對象。
示例 1(這種代碼寫得比較少):
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main 開始")
# 創建協程,將協程封裝到一個 Task 對象中並立即添加到事件迴圈的任務列表中,等待事件迴圈去執行(預設是就緒狀態)。
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(fucn())
# 當執行某協程遇到 IO 操作時,會自動化切換執行其它任務。
# 此處的 await 時等待相對應的協程全都執行完畢並獲取結果。
result_1 = await task1
result_2 = await task2
print(result_1, result_2)
asyncio.run(main())
示例 2(這種代碼應用得比較多):
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main 開始")
# 創建協程任務列表
task_list = [
asyncio.create_task(func(), name="n1"), # 給 task 命名,會在返回集中顯示
asyncio.create_task(func(), name="n2")
]
# 不能直接把 task_list 以列表的形式加在 await 之後
# 註意 await 關鍵字只接受 coroutine object, task object, future object
# 此處 done 是一個集合,為 task_list 的返回值
# pending 在 timeout 不為 None 時有意義,timeout 規定了最長等待時間,
# 如果超過 timeout,那麼還未完成的任務將添加到 pending 中。
done, pending = await asyncio.wait(task_list, timeout=1)
print(done)
asyncio.run(main())
示例 3:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
# 創建協程任務列表
task_list = [
asyncio.create_task(func(), name="n1"), # 給 task 命名,會在返回集中顯示
asyncio.create_task(func(), name="n2")
]
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
註意到以上代碼會導致程式報錯。原因是:asyncio.create_task()
會將協程對象立即添加到事件迴圈中,但是,事件迴圈是在 asyncio.run()
中被創造,因此此時並不存在事件迴圈。應該如此修改:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
# 創建協程對象列表
task_list = [
func(),
func()
]
# 此時 asyncio 會在創建事件迴圈之後,在內部將 task_list 中的協程對象添加到事件迴圈中
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
3.5 Future
對象
Future
類是 Task
類的父類,即 Task
類繼承自 Future
類,Task
對象內部 await 結果的處理基於 Future
對象而來。
A
Future
is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
示例 1:
import asyncio
async def main():
# 獲取當前事件迴圈
loop = asyncio.get_running_loop()
# 創建一個任務(Future 對象),這個任務什麼都不幹。
future = loop.create_future()
# 等待任務最終結果(Future 對象),沒有結果則會一直等下去。
await future
asyncio.run(main())
在上述代碼中,由於創建的 Future
對象什麼也不幹,因此 await future
將一直卡住,無法獲得返回結果,所以上述代碼是沒有實際意義的。但註意,如果某一個時刻突然給 future
賦值,那麼 future
立刻可以獲得返回結果,並且跳出 await
。
示例 2(沒什麼意義,用於理解 Future
對象的作用,即幫助我們等待結果):
async def set_after(future):
await asyncio.sleep(2)
future.set_result("666")
async def main():
# 獲取當前事件迴圈
loop = asyncio.get_running_loop()
# 創建一個任務(Future 對象),沒有綁定任何行為,則這個任務永遠不知道什麼時候結束。
future = loop.create_future()
# 創建一個任務(Task 對象),綁定了 set_after 函數,函數內部在 2s 之後會給 future 賦值。
# 即手動設置 future 任務的最終結果,那麼 future 就可以結束了。
await loop.create_task(set_after(future))
# 等待 Future 對象獲取最終結果,否則一直等待下去。
data = await future
print(data)
asyncio.run(main())
3.6 concurrent
中的 Future
對象
首先註意到,concurrent
中的 Future
對象(concurrent.futures.Future
)和 asyncio
中的 Future
對象沒有關係。concurrent
中的 Future
對象是當使用線程池、進程池實現非同步操作時使用到的對象。
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
return value
# 創建線程池
pool = ThreadPoolExecutor(max_workers=5)
# 或創建進程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
# 讓 pool 拿出一個線程去執行 func 函數
future = pool.submit(func, i)
print(future)
實際中可能會存在兩種 Future
對象交叉使用。例如:crm
項目中 80% 都基於協程非同步編程 + MySQL,但 MySQL 不支持非同步,因此在 MySQL 中使用進程池、線程池做非同步編程。
示例 1:
import time
import asyncio
import concurrent.futures
def func1():
# 某個耗時操作
time.sleep(2)
return "complete"
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor (default to ThreadPoolExecutor)
# 第一步:內部會先調用 ThreadPoolExecutor 的 submit 方法去線程池中申請一個線程去
# 執行 func1 函數,並返回一個 concurrent.futures.Future 對象
# 第二步:調用 asyncio.wrap_future 將 concurrent.future.Future 對象
# 包裝為 asyncio.Future 對象。
# 因為 concurrent.futures.Future 對象不支持 await 語法,所以需要包裝為
# asyncio.Future 對象才能使用。
future = loop.run_in_executor(None, func1) # 返回一個 Future
# 上面這一步內部會調用 asyncio.wrap_future 將返回的 concurrent.futures.Future
# 對象轉換為 asyncio.Future 對象
# 預設 None 意味著創建線程池,若想使用進程池請參考以下註釋代碼
result = await future
print("default thread pool", result)
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print("custom thread pool", result)
# 3. Run in a custom process pool:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print("custom process pool", result)
asyncio.run(main())
案例:asyncio
+ 不支持非同步的模塊(爬蟲)
import asyncio
import requests
async def download_image(url):
# 發送網路請求,下載圖片(遇到網路下載圖片的 IO 請求,自動化切換到其它任務)
print("開始下載:", url)
loop = asyncio.get_event_loop()
# requests 模塊預設不支持非同步操作,所以就用線程池配合實現了
future = loop.run_in_executor(None, requests.get, url)
response = await future
print("下載完成")
# 圖片保存到本地文件
file_name = url.rsplit("_")[-1]
with open(file_name, mode="wb") as file_object:
file_object.write(response.content)
if __name__ == "__main__":
url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
"https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
"https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg",
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
耗費資源更大,不得已而為之。
3.7 非同步迭代器
-
什麼是非同步迭代器?
實現了
__aiter__()
和__anext__()
方法的對象。__anext__()
必須返回一個awaitable
對象。async for
會處理非同步迭代器的__anext__()
方法所返回的可等待對象,直到其引發一個StopAsyncIteration
異常。由PEP 492
引入。 -
什麼是非同步可迭代對象?
可在
async for
語句中被使用的對象。必須通過它的__aiter__()
方法返回一個asynchronous iterator
。由PEP 492
引入。
示例:
import asyncio
class Reader(object):
"""
自定義非同步迭代器(同時也是非同步可迭代對象)
"""
def __init__(self):
self.count = 0
async def readline(self):
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val is None:
raise StopAsyncIteration
return val
# 以下代碼會報錯,因為 async for 必須寫在協程函數內。
# obj = Reader()
# async for item in obj:
# print(item)
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run(func())
3.8 非同步上下文管理器
-
什麼是非同步上下文管理器?
此種對象通過定義
__aenter__()
和__aexit__()
方法來對async with
語句中的環境進行控制。由PEP 492
引入。
示例:
import asyncio
class AsyncContextManager(object):
def __init__(self):
self.conn = conn
async def do_something(self):
# 非同步操作資料庫
return 666
async def __aenter__(self):
# 非同步連接資料庫
self.conn = await asyncio.sleep(1) # 可以換成連接資料庫代碼
return self
async def __aexit__(self, exc_type, exc, tb):
# 非同步關閉資料庫鏈接
await asyncio.sleep(1)
# 以下代碼會報錯,因為 async with 必須寫在協程函數內。
# obj = AsyncContextManager()
# async with obj:
# result = await obj.do_something()
# print(result)
# 或者
# async with AsyncContextManager() as f:
# result = await f.do_something()
# print(result)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run(func())
4. uvloop
uvloop
是 asyncio
事件迴圈的替代方案,可以提高事件迴圈效率,性能接近於 go
語言。
pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Your asyncio code here.
# 內部的事件迴圈自動會變為 uvloop
asyncio.run()
註意:asgi
是支持非同步的 wsgi
網關介面(e.g. uvicorn
,內部使用的就是 uvloop
)。
本文來自博客園,作者:車天健,轉載請註明原文鏈接:https://www.cnblogs.com/chetianjian/p/17715303.html