大家好,併發編程 進入第十章。好了,今天的內容其實還挺多的,我準備了三天,到今天才整理完畢。希望大家看完,有所收穫的,能給小明一個贊。這就是對小明最大的鼓勵了。為了更好地銜接這一節,我們先來回顧一下上一節的內容。 上一節「」,我們首先介紹了,如何創建一個協程對象.主要有兩種方法 通過async關鍵字 ...
大家好,併發編程
進入第十章。
好了,今天的內容其實還挺多的,我準備了三天,到今天才整理完畢。希望大家看完,有所收穫的,能給小明一個贊。這就是對小明最大的鼓勵了。
為了更好地銜接這一節,我們先來回顧一下上一節的內容。
上一節「」,我們首先介紹了,如何創建一個協程對象.
主要有兩種方法
- 通過
async
關鍵字, - 通過
@asyncio.coroutine
裝飾函數。
然後有了協程對象,就需要一個事件迴圈容器來運行我們的協程。其主要的步驟有如下幾點:
- 將協程對象轉為task任務對象
- 定義一個事件迴圈對象容器用來存放task
- 將task任務扔進事件迴圈對象中並觸發
為了讓大家,對生成器和協程有一個更加清晰的認識,我還介紹了yield
和async/await
的區別。
最後,我們還講了,如何給一個協程添加回調函數。
好了,用個形象的比喻,上一節,其實就只是講了協程中的單任務
。哈哈,是不是還挺難的?希望大家一定要多看幾遍,多敲代碼,不要光看哦。
那麼這一節,我們就來看下,協程中的多任務
。
. 本文目錄
- 協程中的併發
- 協程中的嵌套
- 協程中的狀態
- gather與wait
. 協程中的併發
協程的併發,和線程一樣。舉個例子來說,就好像 一個人同時吃三個饅頭,咬了第一個饅頭一口,就得等這口咽下去,才能去啃第其他兩個饅頭。就這樣交替換著吃。
asyncio
實現併發,就需要多個協程來完成任務,每當有任務阻塞的時候就await,然後其他協程繼續工作。
第一步,當然是創建多個協程的列表。
# 協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 將協程轉成task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
第二步,如何將這些協程註冊到事件迴圈中呢。
有兩種方法,至於這兩種方法什麼區別,稍後會介紹。
- 使用
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
- 使用
asyncio.gather()
# 千萬註意,這裡的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
最後,return的結果,可以用task.result()
查看。
for task in tasks:
print('Task ret: ', task.result())
完整代碼如下
import asyncio
# 協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 將協程轉成task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
輸出結果
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
. 協程中的嵌套
使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來。
來看個例子。
import asyncio
# 用於內部的協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 外部的協程函數
async def main():
# 創建三個協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 將協程轉為task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
# 【重點】:await 一個task列表(協程)
# dones:表示已經完成的任務
# pendings:表示未完成的任務
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
如果這邊,使用的是asyncio.gather()
,是這麼用的
# 註意這邊返回結果,與await不一樣
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
輸出還是一樣的。
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
仔細查看,可以發現這個例子完全是由 上面「協程中的併發
」例子改編而來。結果完全一樣。只是把創建協程對象,轉換task任務,封裝成在一個協程函數里而已。外部的協程,嵌套了一個內部的協程。
其實你如果去看下asyncio.await()
的源碼的話,你會發現下麵這種寫法
loop.run_until_complete(asyncio.wait(tasks))
看似沒有嵌套,實際上內部也是嵌套的。
這裡也把源碼,貼出來,有興趣可以看下,沒興趣,可以直接跳過。
# 內部協程函數
async def _wait(fs, timeout, return_when, loop):
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
f.add_done_callback(_on_completion)
try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()
done, pending = set(), set()
for f in fs:
f.remove_done_callback(_on_completion)
if f.done():
done.add(f)
else:
pending.add(f)
return done, pending
# 外部協程函數
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
loop = events.get_event_loop()
fs = {ensure_future(f, loop=loop) for f in set(fs)}
# 【重點】:await一個內部協程
return await _wait(fs, timeout, return_when, loop)
. 協程中的狀態
還記得我們在講生成器的時候,有提及過生成器的狀態。同樣,在協程這裡,我們也瞭解一下協程(準確的說,應該是Future對象,或者Task任務)有哪些狀態。
Pending
:創建future,還未執行Running
:事件迴圈正在調用執行任務Done
:任務執行完畢Cancelled
:Task被取消後的狀態
可手工 python3 xx.py
執行這段代碼,
import asyncio
import threading
import time
async def hello():
print("Running in the loop...")
flag = 0
while flag < 1000:
with open("F:\\test.txt", "a") as f:
f.write("------")
flag += 1
print("Stop the loop")
if __name__ == '__main__':
coroutine = hello()
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# Pending:未執行狀態
print(task)
try:
t1 = threading.Thread(target=loop.run_until_complete, args=(task,))
# t1.daemon = True
t1.start()
# Running:運行中狀態
time.sleep(1)
print(task)
t1.join()
except KeyboardInterrupt as e:
# 取消任務
task.cancel()
# Cacelled:取消任務
print(task)
finally:
print(task)
順利執行的話,將會列印 Pending
-> Pending:Runing
-> Finished
的狀態變化
假如,執行後 立馬按下 Ctrl+C,則會觸發task取消,就會列印 Pending
-> Cancelling
-> Cancelling
的狀態變化。
. gather與wait
還記得上面我說,把多個協程註冊進一個事件迴圈中有兩種方法嗎?
- 使用
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
- 使用
asyncio.gather()
# 千萬註意,這裡的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
asyncio.gather
和 asyncio.wait
在asyncio中用得的比較廣泛,這裡有必要好好研究下這兩貨。
還是照例用例子來說明,先定義一個協程函數
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
接收參數方式
asyncio.wait
接收的tasks,必須是一個list對象,這個list對象里,存放多個的task。
它可以這樣,用asyncio.ensure_future
轉為task對象
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
也可以這樣,不轉為task對象。
loop = asyncio.get_event_loop()
tasks=[
factorial("A", 2),
factorial("B", 3),
factorial("C", 4)
]
loop.run_until_complete(asyncio.wait(tasks))
asyncio.gather
接收的就比較廣泛了,他可以接收list對象,但是 *
不能省略
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B",