非同步asyncio asyncio是一個使用async / await語法編寫併發代碼的庫。 asyncio用作多個Python非同步框架的基礎,這些框架提供高性能的網路和Web伺服器,資料庫連接庫,分散式任務隊列等。 asyncio通常非常適合IO綁定和高級 結構化網路代碼。 asyncio提供了一 ...
非同步asyncio
asyncio是一個使用async / await語法編寫併發代碼的庫。
asyncio用作多個Python非同步框架的基礎,這些框架提供高性能的網路和Web伺服器,資料庫連接庫,分散式任務隊列等。
asyncio通常非常適合IO綁定和高級 結構化網路代碼。
asyncio提供了一組高級 API:
- 同時運行Python協同程式並完全控制它們的執行;
- 執行網路IO和IPC ;
- 控制子過程 ;
- 通過隊列分配任務;
- 同步併發代碼;
此外,還有一些用於庫和框架開發人員的低級 API :
- 創建和管理事件迴圈,提供非同步API
networking
,運行subprocesses
,處理等;OS signals
- 使用傳輸實現有效的協議 ;
- 使用async / await語法橋接基於回調的庫和代碼。
Conroutines
使用async / await語法聲明的協同程式是編寫asyncio應用程式的首選方法。例如,以下代碼片段(需要Python 3.7+)列印“hello”,等待1秒,然後列印“world”:
import asyncio async def main(): print('hello') await asyncio.sleep(1) print('world') asyncio.run(main()) # hello # world
上面代碼等同於下麵(不推薦使用基於生成器的協同程式的支持,並計劃在Python 3.10中刪除。)
import asyncio @asyncio.coroutine def main(): print('hello') yield from asyncio.sleep(1) print('world') asyncio.run(main())
asyncio實際等同於下麵的工作(參數為An asyncio.Future, a coroutine or an awaitable is required)
import asyncio @asyncio.coroutine def main(): print('hello') yield from asyncio.sleep(1) print('world') # asyncio.run(main()) loop = asyncio.events.new_event_loop() asyncio.events.set_event_loop(loop) loop.run_until_complete(main()) # hello # world
1 This function runs the passed coroutine, taking care of 2 managing the asyncio event loop and finalizing asynchronous 3 generators. 4 5 This function cannot be called when another asyncio event loop is 6 running in the same thread. 7 8 If debug is True, the event loop will be run in debug mode. 9 10 This function always creates a new event loop and closes it at the end. 11 It should be used as a main entry point for asyncio programs, and should 12 ideally only be called once.asyncio.run功能介紹
實際運行協程asyncio提供了三種主要機制:
1、The asyncio.run()函數來運行頂層入口點“main()”函數(見上面的例子)
2、Awaiting on a coroutine 以下代碼片段將在等待1秒後列印“hello”,然後在等待另外 2秒後列印“world” :
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 11:54:48 # hello # world # finished at 11:54:51
3、asyncio.create_task()
與asyncio同時運行協同程式的功能Tasks;
讓我們修改上面的例子並同時運行兩個say_after
協同程式 :
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(f"{what} at {time.strftime('%X')}") async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 14:27:22 # hello at 14:27:23 # world at 14:27:24 # finished at 14:27:24
稍微改變一下形式,可以理解的更多
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(f"{what} at {time.strftime('%X')}") async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await asyncio.sleep(3) # await task1 # await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 14:29:41 # hello at 14:29:42 # world at 14:29:43 # finished at 14:29:44
Awaitables
我們說如果一個對象可以在表達式中使用,那麼它就是一個等待對象await
。許多asyncio API旨在接受等待。
有三種主要類型的等待對象:coroutines, Tasks, and Futures.
Coroutines
Python coroutines are awaitables and therefore can be awaited from other coroutines:
import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won't run at all*. nested() # Let's do it differently now and await it: print(await nested()) # will print "42". asyncio.run(main()) # 42
重要
在本文檔中,術語“coroutine”可用於兩個密切相關的概念:
- 一個協程功能:一個功能;
async def
- 一個協程對象:通過調用協同程式函數返回的對象 。
Tasks
任務用於調度協同程式併發。
當一個協程被包裝到一個Task中時,會像asyncio.create_task()
一樣 conroutine自動安排很快運行:
import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main())
Futures
A Future
是一個特殊的低級別等待對象,它表示非同步操作的最終結果。
當等待 Future對象時,它意味著協程將等到Future在其他地方解析。
需要asyncio中的未來對象以允許基於回調的代碼與async / await一起使用。
通常,不需要在應用程式級代碼中創建Future對象。
可以等待有時通過庫和一些asyncio API公開的未來對象:
async def main(): await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
返回Future對象的低級函數的一個很好的例子是loop.run_in_executor()
。
Asyncio方法
1、運行asyncio程式
asyncio.run(coro,*,debug = False )
此函數運行傳遞的協同程式,負責管理asyncio事件迴圈並最終確定非同步生成器。
當另一個asyncio事件迴圈在同一個線程中運行時,無法調用此函數。
如果是debugTrue
,則事件迴圈將以調試模式運行。
此函數始終創建一個新的事件迴圈併在結束時將其關閉。它應該用作asyncio程式的主要入口點,理想情況下應該只調用一次。
版本3.7中的新功能:重要:此功能已臨時添加到Python 3.7中的asyncio中。
2、創建任務
asyncio.create_task(coro)
將coro coroutine包裝成a Task
並安排執行。返回Task對象。
任務在返回的迴圈中執行,如果當前線程中沒有運行迴圈get_running_loop()
, RuntimeError
則引發該任務。
Python 3.7中添加了此功能。在Python 3.7之前,asyncio.ensure_future()
可以使用低級函數:
async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro())
3、sleeping
coroutine asyncio.
sleep
(delay, result=None, *, loop=None)
阻止 delay seconds.。
如果提供了result ,則在協程完成時將其返回給調用者。
leep()
始終掛起當前任務,允許其他任務運行。
該loop 參數已被棄用,並定於去除在Python 3.10。
協程示例每秒顯示當前日期5秒:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
4、同時運行任務
awaitable asyncio.
gather
(*aws, loop=None, return_exceptions=False)
同時在aws 序列中運行 awaitable objects
如果在aws中任何awaitable 是協程,它將自動安排為任務
如果所有等待成功完成,則結果是返回值的彙總列表。結果值的順序對應於aws中的等待順序
如果return_exceptions是False
(預設),則第一個引發的異常會立即傳播到等待的任務gather()
。
如果return_exceptions是True
,異常的處理方式一樣成功的結果,併在結果列表彙總。
如果gather()
被取消,所有提交的awaitables(尚未完成)也被取消。
如果aws序列中的Task or Future被取消,則將其視為已引發CancelledError
- 在這種情況下不會取消gather()
呼叫。這是為了防止取消一個提交的Tasks/Futures 以導致其他任務/期貨被取消。
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24
獲取返回結果,異常情況
import asyncio async def factorial(name, number): print(name) if name == 'A': return name elif name == 'B': raise SyntaxError(name) await asyncio.sleep(number) async def main(): # Schedule three calls *concurrently*: result = await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), return_exceptions=True ) print(result) asyncio.run(main()) # A # B # C # ['A', SyntaxError('B'), None]
版本3.7中已更改:如果取消了聚集本身,則無論return_exceptions如何,都會傳播取消。
5、Shielding From Cancellation
awaitable asyncio.
shield
(aw, *, loop=None)
Protect an awaitable object from being cancelled
.
If aw is a coroutine it is automatically scheduled as a Task.
The statement:
res = await shield(something())
等同於
res = await something()
除非取消包含它的協程,否則something()
不會取消運行的任務。從觀點來看something()
,取消沒有發生。雖然它的來電者仍然被取消,所以“等待”表達仍然提出了一個CancelledError
。
如果something()
通過其他方式取消(即從內部取消)也會取消shield()
。
如果希望完全忽略取消(不推薦),則該shield()
函數應與try / except子句結合使用,如下所示:
try: res = await shield(something()) except CancelledError: res = None
6、超時
coroutine asyncio.
wait_for
(aw, timeout, *, loop=None)
Wait for the aw awaitable to complete with a timeout.
If aw is a coroutine it is automatically scheduled as a Task.
timeout可以是None
或等待的float或int秒數。如果超時是None
,將等到完成
如果發生超時,它將取消任務並加註 asyncio.TimeoutError
。
要避免該任務cancellation
,請將其包裝shield()
。
該函數將一直等到將來實際取消,因此總等待時間可能會超過超時。
如果等待被取消,則未來的aw也會被取消。
該迴圈參數已被棄用,並定於去除在Python 3.10。
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
改變在3.7版本:當AW被取消,由於超時,wait_for
等待AW被取消。以前,它asyncio.TimeoutError
立即提出 。
7、超時原語
coroutine asyncio.
wait
(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同時運行aws中的等待對象 並阻塞,直到return_when指定的條件為止。
如果在aws中任何等待的是協程,它將自動安排為任務。wait()
直接傳遞協同程式對象 已被棄用,因為它會導致 混亂的行為。
返回兩組任務/期貨:。(done, pending)
用法:
done, pending = await asyncio.wait(aws)
該迴圈參數已被棄用,並定於去除在Python 3.10。
timeout(浮點數或整數),如果指定,可用於控制返回前等待的最大秒數。
請註意,此功能不會引發asyncio.TimeoutError
。超時發生時未完成的期貨或任務僅在第二組中返回。
return_when表示此函數何時返回。它必須是以下常量之一:
不變 | 描述 |
---|---|
FIRST_COMPLETED |
當任何未來完成或取消時,該函數將返回。 |
FIRST_EXCEPTION |
當任何未來通過引發異常完成時,函數將返回。如果沒有未來引發異常則等同於 ALL_COMPLETED 。 |
ALL_COMPLETED |
所有期貨結束或取消時,該功能將返回。 |
不像wait_for()
,wait()
當發生超時不會取消期貨。
註意 wait()
將協同程式自動調度為任務,然後在 集合中返回隱式創建的任務對象。因此,以下代碼將無法按預期方式工作:(done, pending)
async def foo(): return 42 coro = foo() done, pending = await asyncio.wait({coro}) if coro in done: # This branch will never be run!
以下是如何修複上述代碼段:
async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now.
wait()
不推薦直接傳遞協程對象。
8、 Scheduling From Other Threads
asyncio.
run_coroutine_threadsafe
(coro, loop)
將協程提交給給定的事件迴圈。線程安全的。
返回a concurrent.futures.Future
以等待另一個OS線程的結果。
此函數旨在從與運行事件迴圈的OS線程不同的OS線程調用。例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果在協程中引發異常,則會通知返回的Future。它還可以用於取消事件迴圈中的任務:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
請參閱 文檔的併發和多線程部分。
與其他asyncio函數不同,此函數需要 顯式傳遞迴圈參數。
3.5.1版中的新功能。
9、自省
asyncio.
current_task
(loop = None )
返回當前正在運行的Task
實例,或者None
沒有正在運行的任務。
如果loop是None
get_running_loop()
用來獲取loop。
版本3.7中的新功能。
asyncio.
all_tasks
(loop = None )
返回Task
迴圈運行的一組尚未完成的對象。
如果loop是None
,get_running_loop()
則用於獲取當前迴圈。
版本3.7中的新功能。
任務對象
class asyncio.
Task
(coro,*,loop = None )
A Future-like
object that runs a Python coroutine. Not thread-safe.
任務用於在事件迴圈中運行協同程式。如果一個協程在Future上等待,則Task暫停執行協程並等待Future的完成。當Future 完成後,包裝協程的執行將恢復。
事件迴圈使用協作調度:事件迴圈一次運行一個任務。當一個Task等待完成Future時,事件迴圈運行其他任務,回調或執行IO操作。
使用高級asyncio.create_task()
功能創建任務,或低級別loop.create_task()
或 ensure_future()
功能。不鼓勵手動實例化任務。
要取消正在運行的任務,請使用該cancel()
方法。調用它將導致Task將CancelledError
異常拋出到包裝的協同程式中。如果在取消期間協程正在等待Future對象,則Future對象將被取消。
cancelled()
可用於檢查任務是否被取消。True
如果包裝的協程沒有抑制CancelledError
異常並且實際上被取消,則該方法返回。
asyncio.Task
繼承自Future
其所有API,除了Future.set_result()
和 Future.set_exception()
。
任務支持該contextvars
模塊。創建任務時,它會複製當前上下文,然後在複製的上下文中運行其協程。
版本3.7中已更改:添加了對contextvars
模塊的支持。
cancel
()
請求取消任務。
這會安排CancelledError
在事件迴圈的下一個迴圈中將異常拋入包裝的協程。
協程則有機會通過抑制異常與清理,甚至拒絕請求try
... ... ... ... ... ... 塊。因此,不同於,不保證任務將被取消,儘管完全抑制取消並不常見,並且積極地不鼓勵。exceptCancelledError
finally
Future.cancel()
Task.cancel()
以下示例說明瞭協同程式如何攔截取消請求:
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
cancelled
()-
True
如果任務被取消,則返回。 - 請求取消時取消任務,
cancel()
並且包裝的協同程式將CancelledError
異常傳播 到其中。
done
()-
True
如果任務完成則返回。 - 一個任務完成時,包裹協程要麼返回的值,引發異常,或者任務被取消。
result
()- 返回任務的結果。
- 如果任務完成,則返回包裝協程的結果(或者如果協程引發異常,則重新引發該異常。)
- 如果已取消任務,則此方法會引發
CancelledError
異常。 - 如果Task的結果尚不可用,則此方法會引發
InvalidStateError
異常。
exception
()- 返回Task的例外。
- 如果包裝的協同程式引發異常,則返回異常。如果包裝的協程正常返回,則此方法返回
None
。 - 如果已取消任務,則此方法會引發
CancelledError
異常。 - 如果尚未完成任務,則此方法會引發
InvalidStateError
異常。
add_done_callback
(回調,*,上下文=無)- 添加要在任務完成時運行的回調。
- 此方法僅應在基於低級回調的代碼中使用。
- 有關
Future.add_done_callback()
詳細信息,請參閱文檔。
remove_done_callback
(回調)- 從回調列表中刪除回調。
- 此方法僅應在基於低級回調的代碼中使用。
- 有關
Future.remove_done_callback()
詳細信息,請參閱文檔。
get_stack
(*,limit = None )- 返回此任務的堆棧幀列表。
- 如果未完成包裝的協同程式,則會返回掛起它的堆棧。如果協程已成功完成或被取消,則返回一個空列表。如果協程被異常終止,則返回回溯幀列表。
- 幀始終從最舊到最新排序。
- 對於掛起的協程,只返回一個堆棧幀。
- 可選的limit參數設置要返回的最大幀數; 預設情況下,返回所有可用的幀。返回列表的排序取決於是返回堆棧還是返回:返回堆棧的最新幀,但返回最舊的回溯幀。(這與回溯模塊的行為相匹配。)
print_stack
(*,limit = None,file = None )- 列印此任務的堆棧或回溯。
- 這會為檢索到的幀生成類似於回溯模塊的輸出
get_stack()
。 - 該極限參數傳遞給
get_stack()
直接。 - 的文件參數是其中輸出被寫入的I / O流; 預設輸出寫入
sys.stderr
。
- classmethod
all_tasks
(loop = None ) - 返回一組事件迴圈的所有任務。
- 預設情況下,返回當前事件迴圈的所有任務。如果是loop
None
,則該get_event_loop()
函數用於獲取當前迴圈。 - 不推薦使用此方法,將在Python 3.9中刪除。請改用此
asyncio.all_tasks()
功能。
- classmethod
current_task
(loop = None ) - 返回當前正在運行的任務或
None
。 - 如果是loop
None
,則該get_event_loop()
函數用於獲取當前迴圈。 - 不推薦使用此方法,將在Python 3.9中刪除。請改用此
asyncio.current_task()
功能。