asyncio之Coroutines,Tasks and Future Coroutines and Tasks屬於High-level APIs,也就是高級層的api。 本節概述用於協程和任務的高級非同步api。 Coroutines Coroutines翻譯過來意思是協程,使用async/awai ...
asyncio之Coroutines,Tasks and Future
Coroutines and Tasks屬於High-level APIs,也就是高級層的api。
本節概述用於協程和任務的高級非同步api。
Coroutines
Coroutines翻譯過來意思是協程,
使用async/await語法聲明的協程是編寫asyncio應用程式的首選方法。
import asyncio
async def main():
print("hello")
await asyncio.sleep(1)
print("world")
if __name__ == '__main__':
# asyncio.run(main()) # 3.7的用法
# 阻塞直到hello world()協程結束時返回
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
第一個非同步函數是通過創建loop迴圈去調用,其他非同步函數之間通過await進行調用。
像下麵的一個例子
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 阻塞直到hello world()協程結束時返回
loop.run_until_complete(main())
loop.close()
或者我們可以通過asyncio.create_task()將協程say_after封裝任務去調用就像下麵這樣。
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 等待兩個子任務完成
await task1
await task2
print(f"finished at {time.strftime('%X')}")
如果報錯async沒有create_task,可以用
ensure_future代替
Awaitables
我們說,如果一個對象可以用在await表達式中,那麼它就是Awaitables的對象。
可等待對象主要有三種類型:coroutines, Tasks, and Futures.
Coroutines
前面的代碼中演示了協程的運作方式,這裡主要強調兩點。
- 協程函數:asyc def定義的函數;
-
協程對象:通過調用協程函數返回的對象。
Tasks
任務對協程進一步封裝,其中包含任務的各種狀態。
協程對象不能直接運行,在註冊事件迴圈的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)對象。
import asyncio
async def nested():
await asyncio.sleep(2)
print("等待2s")
async def main():
# 將協程包裝成任務含有狀態
# task = asyncio.create_task(nested())
task = asyncio.ensure_future(nested())
print(task)
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
print(task)
print(task.done())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
print(task)
task.cancel()
print(task)
loop.run_forever() # restart loop
finally:
loop.close()
可以看到
<Task pending coro=<nested() running at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>>
等待2s
<Task finished coro=<nested() done, defined at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9> result=None>
True
創建task後,task在加入事件迴圈之前是pending狀態然後調用nested函數等待2s之後列印task為finished狀態。asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以創建一個task,python3.7增加了asyncio.create_task(coro)。其中task是Future的一個子類
Future
future:代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別
通常不需要在應用程式級別代碼中創建Future對象。
future對象有幾個狀態:
- Pending
- Running
- Done
- Cancelled
通過上面的代碼可以知道創建future的時候,task為pending,事件迴圈調用執行的時候是running,調用完畢自然就是done於是調用task.done()列印了true。
如果在命令行中運行上述代碼,ctrl+c後會發現
輸出以下內容
<Task pending coro=<nested() running at 1-2-1.py:9>>
^C<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d342978>()]> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>
<Task cancelling coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>
因為我們調用了task.cancel() 所以可以看到此時的任務狀態為取消狀態。
併發的執行任務
通過使用await+asyncio.gather可以完成併發的操作。
asyncio.gather用法如下。
**asyncio.gather(*aws, loop=None, return_exceptions=False)
aws是一系列協程,協程都成功完成,就返回值一個結果列表。結果值的順序與aws中添加協程的順序相對應。
return_exceptions=False,其實就是如果有一個任務失敗了,就直接拋出異常。如果等於True就把錯誤信息作為結果返回回來。
首先來一個正常情況不出錯的例子:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
if number == 2:
1 / 0
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
res = await asyncio.gather(
*[factorial("A", 2),
factorial("B", 3),
factorial("C", 4)]
, return_exceptions=True)
for item in res:
print(item)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
print(task)
task.cancel()
print(task)
loop.run_forever() # restart loop
finally:
loop.close()
輸入以下內容:
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
division by zero
None
None
可以發現async.gather最後會返回一系列的結果,如果出現了錯誤就把錯誤信息作為返回結果,這裡我當數字為2時人為加了異常操作1/0,於是返回了結果division by zero,對於其他的任務因為沒有返回值所以是None。這裡return_exceptions=True來保證瞭如果其中一個任務出現異常,其他任務不會受其影響會執行到結束。
asyncio.wait
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
asyncio.wait和async.gather用法差不多只是async.wait接收的是個列表。
第三個參數和async.gather有點區別.
參數名 | 含義 |
---|---|
FIRST_COMPLETED | 任何一個future完成或取消時返回 |
FIRST_EXCEPTION | 任何一個future出現錯誤將返回,如果出現異常等價於ALL_COMPLETED |
ALL_COMPLETED | 當所有任務完成或者被取消時返回結果,預設值。 |
Timeouts
通過使用asyncio.wait_for來完成一個超時函數回調操作,如果函數規定時間內未完成則報錯。
**asyncio.wait_for(aw, timeout, *, loop=None)**
aw代表一個協程,timeout單位秒。
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# Expected output:
#
# timeout!
1秒內eternity沒有完成就報錯了。
python3.7中發生更改:當aw由於超時而被取消時,不再顯示異常而是等待aw被取消。
說到timeout的,如果僅僅是對一個代碼塊做timeout操作而不是等待某個協程此時推薦第三方模塊async_timeout
async_timeout
安裝
pip installa async_timeout
使用方法很簡單如下
async with async_timeout.timeout(1.5) as cm:
await inner()
print(cm.expired)
如果1.5s可以運行完列印true,否則列印false,表示超時。
asyncio.as_completed
**asyncio.as_completed(aws, *, loop=None, timeout=None)**
使用as_completed會返回一個可以迭代的future對象,同樣可以獲取協程的運行結果,使用方法如下:
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print('Task ret: {}'.format(result))
start = now()
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)
協程嵌套
使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來
官網實例:
圖解:
1、run_until_complete運行,會註冊task(協程:print_sum)並開啟事件迴圈 →
2、print_sum協程中嵌套了子協程,此時print_sum協程暫停(類似委托生成器),轉到子協程(協程:compute)中運行代碼,期間子協程需sleep1秒鐘,直接將結果反饋到event loop中,即將控制權轉回調用方,而中間的print_sum暫停不操作 →
3、1秒後,調用方將控制權給到子協程(調用方與子協程直接通信),子協程執行接下來的代碼,直到再遇到wait(此實例沒有)→
4、 最後執行到return語句,子協程向上級協程(print_sum拋出異常:StopIteration),同時將return返回的值返回給上級協程(print_sum中的result接收值),print_sum繼續執行暫時時後續的代碼,直到遇到return語句 →
5、向 event loop 拋出StopIteration異常,此時協程任務都已經執行完畢,事件迴圈執行完成(event loop :the loop is stopped),close事件迴圈。
調度線程
asyncio.run_coroutine_threadsafe(coro, loop)
等待其他線程返回一個concurrent.futures.Future對象,這是一個線程安全的方法。
這個函數應該從不同的OS線程調用,而不是從事件迴圈所在的線程調用。
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_some_work(x):
print('Waiting {}'.format(x))
await asyncio.sleep(x)
print('Done after {}s'.format(x))
def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
上述的例子,主線程中創建一個new_loop,然後在另外的子線程中開啟一個無限事件迴圈。主線程通過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件迴圈的併發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。
run_in_executor
import time
import asyncio
async def main():
print(f'{time.ctime()} Hello')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye')
loop.stop()
def blocking(): # 1
time.sleep(0.5) # 2
print(f'{time.ctime()} Hello from a thread!')
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking) # 3
loop.run_forever()
pending = asyncio.Task.all_tasks(loop=loop) # 4
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()
輸出
Fri Jan 4 15:32:03 2019 Hello
Fri Jan 4 15:32:04 2019 Hello from a thread!
Fri Jan 4 15:32:04 2019 Goodbye
下麵對上面的函數的序號進行講解:
1 這個函數調用了常規的sleep(),這會阻塞主線程並阻止loop運行,我們不能使這個函數變成協程,更糟糕的是不能在主線程運行loop時調用它,解決辦法是用一個executor來運行它;
2 註意一點,這個sleep運行時間比協程中的sleep運行時間要短,後文再討論如果長的話會發生什麼;
3 該方法幫助我們在事件loop里用額外的線程或進程執行函數,這個方法的返回值是一個Future對象,意味著可以用await來切換它;
4 掛起的task中不包含前面的阻塞函數,並且這個方法只返回task對象,絕對不會返回Future對象。
綁定回調
綁定回調,在task執行完畢的時候可以獲取執行的結果,回調的最後一個參數是future對象,通過該對象可以獲取協程返回值。如果回調需要多個參數,可以通過偏函數導入
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)
def callback(future): # 回調函數
print('Callback: ', future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
get_future = asyncio.ensure_future(coroutine)
task.add_done_callback(callback) # 添加回調函數
loop.run_until_complete(get_future)
print('TIME: ', now() - start)
回調函數需要多個參數時,future參數要放最後。執行完成,我們可以通過參數future獲取協程的執行結果:future.result()
import functools # functools.partial:偏函數,能將帶參數的函數包裝成一個新的函數
def callback(t, future): # 回調函數 ,future放最後
print('Callback:', t, future.result())
task.add_done_callback(functools.partial(callback, 2)
asyncio.iscoroutine(obj)
Return True if obj is a coroutine object.
判斷是否為coroutine對象,如果是返回True
asyncio.iscoroutinefunction(func)
判斷是否為coroutine函數,如果是返回True
參考資料
https://docs.python.org/3.7/library/asyncio-task.html
https://www.jianshu.com/p/b5e347b3a17c
微信公眾號:python學習開發 加微信italocxa 入群。
原文地址:https://www.cnblogs.com/c-x-a/p/10220398.html