asyncio之Coroutines,Tasks and Future

来源:https://www.cnblogs.com/DonCharles/archive/2019/02/12/10366231.html
-Advertisement-
Play Games

asyncio之Coroutines,Tasks and Future Coroutines and Tasks屬於High-level APIs,也就是高級層的api。 本節概述用於協程和任務的高級非同步api。 Coroutines Coroutines翻譯過來意思是協程,使用async/awai ...


asyncio之Coroutines,Tasks and Future

Coroutines and Tasks屬於High-level APIs,也就是高級層的api。

本節概述用於協程和任務的高級非同步api。

Coroutines

Coroutines翻譯過來意思是協程,
使用async/await語法聲明的協程是編寫asyncio應用程式的首選方法。

import asyncio


async def main():
    print("hello")
    await asyncio.sleep(1)
    print("world")


if __name__ == '__main__':
    # asyncio.run(main())  # 3.7的用法
    # 阻塞直到hello world()協程結束時返回
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

第一個非同步函數是通過創建loop迴圈去調用,其他非同步函數之間通過await進行調用。
像下麵的一個例子

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 阻塞直到hello world()協程結束時返回
    loop.run_until_complete(main())
    loop.close()

或者我們可以通過asyncio.create_task()將協程say_after封裝任務去調用就像下麵這樣。

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待兩個子任務完成
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

如果報錯async沒有create_task,可以用
ensure_future代替
 

Awaitables

我們說,如果一個對象可以用在await表達式中,那麼它就是Awaitables的對象。
可等待對象主要有三種類型:coroutines, Tasks, and Futures.

Coroutines

 前面的代碼中演示了協程的運作方式,這裡主要強調兩點。

  • 協程函數:asyc def定義的函數;
  • 協程對象:通過調用協程函數返回的對象。

    Tasks

    任務對協程進一步封裝,其中包含任務的各種狀態。
    協程對象不能直接運行,在註冊事件迴圈的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)對象。
import asyncio


async def nested():
    await asyncio.sleep(2)
    print("等待2s")


async def main():
    # 將協程包裝成任務含有狀態
    # task = asyncio.create_task(nested())
    task = asyncio.ensure_future(nested())
    print(task)
    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task
    print(task)
    print(task.done())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task)
            task.cancel()
            print(task)
        loop.run_forever()  # restart loop
    finally:
        loop.close()

可以看到

<Task pending coro=<nested() running at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>>
等待2s
<Task finished coro=<nested() done, defined at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9> result=None>
True

創建task後,task在加入事件迴圈之前是pending狀態然後調用nested函數等待2s之後列印task為finished狀態。asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以創建一個task,python3.7增加了asyncio.create_task(coro)。其中task是Future的一個子類

Future

future:代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別
通常不需要在應用程式級別代碼中創建Future對象。
future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cancelled

通過上面的代碼可以知道創建future的時候,task為pending,事件迴圈調用執行的時候是running,調用完畢自然就是done於是調用task.done()列印了true。

如果在命令行中運行上述代碼,ctrl+c後會發現
輸出以下內容

<Task pending coro=<nested() running at 1-2-1.py:9>>
^C<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d342978>()]> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>
<Task cancelling coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>

因為我們調用了task.cancel() 所以可以看到此時的任務狀態為取消狀態。

併發的執行任務

通過使用await+asyncio.gather可以完成併發的操作。
asyncio.gather用法如下。
**asyncio.gather(*aws, loop=None, return_exceptions=False)
aws是一系列協程,協程都成功完成,就返回值一個結果列表。結果值的順序與aws中添加協程的順序相對應。
return_exceptions=False,其實就是如果有一個任務失敗了,就直接拋出異常。如果等於True就把錯誤信息作為結果返回回來。
首先來一個正常情況不出錯的例子:

import asyncio


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        if number == 2:
            1 / 0
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")


async def main():
    # Schedule three calls *concurrently*:
    res = await asyncio.gather(
        *[factorial("A", 2),
          factorial("B", 3),
          factorial("C", 4)]
        , return_exceptions=True)
    for item in res:
        print(item)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task)
            task.cancel()
            print(task)
        loop.run_forever()  # restart loop
    finally:
        loop.close()

輸入以下內容:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
division by zero
None
None

可以發現async.gather最後會返回一系列的結果,如果出現了錯誤就把錯誤信息作為返回結果,這裡我當數字為2時人為加了異常操作1/0,於是返回了結果division by zero,對於其他的任務因為沒有返回值所以是None。這裡return_exceptions=True來保證瞭如果其中一個任務出現異常,其他任務不會受其影響會執行到結束。

asyncio.wait

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

asyncio.wait和async.gather用法差不多只是async.wait接收的是個列表。
第三個參數和async.gather有點區別.

參數名含義
FIRST_COMPLETED 任何一個future完成或取消時返回
FIRST_EXCEPTION 任何一個future出現錯誤將返回,如果出現異常等價於ALL_COMPLETED
ALL_COMPLETED 當所有任務完成或者被取消時返回結果,預設值。

Timeouts

通過使用asyncio.wait_for來完成一個超時函數回調操作,如果函數規定時間內未完成則報錯。
**asyncio.wait_for(aw, timeout, *, loop=None)**
aw代表一個協程,timeout單位秒。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

1秒內eternity沒有完成就報錯了。
python3.7中發生更改:當aw由於超時而被取消時,不再顯示異常而是等待aw被取消。
說到timeout的,如果僅僅是對一個代碼塊做timeout操作而不是等待某個協程此時推薦第三方模塊async_timeout

async_timeout

安裝

pip installa async_timeout

使用方法很簡單如下

async with async_timeout.timeout(1.5) as cm:
    await inner()
print(cm.expired)

如果1.5s可以運行完列印true,否則列印false,表示超時。

asyncio.as_completed

**asyncio.as_completed(aws, *, loop=None, timeout=None)**
使用as_completed會返回一個可以迭代的future對象,同樣可以獲取協程的運行結果,使用方法如下:

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))

start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)

協程嵌套

使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來
官網實例:

圖解:

 1、run_until_complete運行,會註冊task(協程:print_sum)並開啟事件迴圈 →

 2、print_sum協程中嵌套了子協程,此時print_sum協程暫停(類似委托生成器),轉到子協程(協程:compute)中運行代碼,期間子協程需sleep1秒鐘,直接將結果反饋到event loop中,即將控制權轉回調用方,而中間的print_sum暫停不操作 →

 3、1秒後,調用方將控制權給到子協程(調用方與子協程直接通信),子協程執行接下來的代碼,直到再遇到wait(此實例沒有)→

 4、 最後執行到return語句,子協程向上級協程(print_sum拋出異常:StopIteration),同時將return返回的值返回給上級協程(print_sum中的result接收值),print_sum繼續執行暫時時後續的代碼,直到遇到return語句 →

 5、向 event loop 拋出StopIteration異常,此時協程任務都已經執行完畢,事件迴圈執行完成(event loop :the loop is stopped),close事件迴圈。

調度線程

asyncio.run_coroutine_threadsafe(coro, loop)
等待其他線程返回一個concurrent.futures.Future對象,這是一個線程安全的方法。
這個函數應該從不同的OS線程調用,而不是從事件迴圈所在的線程調用。

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中創建一個new_loop,然後在另外的子線程中開啟一個無限事件迴圈。主線程通過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件迴圈的併發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。
run_in_executor

import time
import asyncio


async def main():
    print(f'{time.ctime()} Hello')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye')
    loop.stop()


def blocking():  # 1
    time.sleep(0.5)  # 2
    print(f'{time.ctime()} Hello from a thread!')


loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)  # 3

loop.run_forever()
pending = asyncio.Task.all_tasks(loop=loop)  # 4
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()

輸出

Fri Jan  4 15:32:03 2019 Hello
Fri Jan  4 15:32:04 2019 Hello from a thread!
Fri Jan  4 15:32:04 2019 Goodbye

下麵對上面的函數的序號進行講解:

1 這個函數調用了常規的sleep(),這會阻塞主線程並阻止loop運行,我們不能使這個函數變成協程,更糟糕的是不能在主線程運行loop時調用它,解決辦法是用一個executor來運行它;
2 註意一點,這個sleep運行時間比協程中的sleep運行時間要短,後文再討論如果長的話會發生什麼;
3 該方法幫助我們在事件loop里用額外的線程或進程執行函數,這個方法的返回值是一個Future對象,意味著可以用await來切換它;
4 掛起的task中不包含前面的阻塞函數,並且這個方法只返回task對象,絕對不會返回Future對象。

綁定回調

綁定回調,在task執行完畢的時候可以獲取執行的結果,回調的最後一個參數是future對象,通過該對象可以獲取協程返回值。如果回調需要多個參數,可以通過偏函數導入

import time
import asyncio
 
now = lambda : time.time()
 
async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)
 
def callback(future):  # 回調函數
    print('Callback: ', future.result())
 
start = now()
 
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
get_future = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)  # 添加回調函數
loop.run_until_complete(get_future)
 
print('TIME: ', now() - start)

回調函數需要多個參數時,future參數要放最後。執行完成,我們可以通過參數future獲取協程的執行結果:future.result()

import functools   # functools.partial:偏函數,能將帶參數的函數包裝成一個新的函數
def callback(t, future): # 回調函數 ,future放最後
    print('Callback:', t, future.result())
 
task.add_done_callback(functools.partial(callback, 2)

asyncio.iscoroutine(obj)

Return True if obj is a coroutine object.
判斷是否為coroutine對象,如果是返回True

asyncio.iscoroutinefunction(func)

判斷是否為coroutine函數,如果是返回True

參考資料

https://docs.python.org/3.7/library/asyncio-task.html
https://www.jianshu.com/p/b5e347b3a17c

微信公眾號:python學習開發 加微信italocxa 入群。

原文地址:https://www.cnblogs.com/c-x-a/p/10220398.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 很多時候我們會發現輸入的一長串內容不得不全部刪除重新輸入,這時比起一直按著退格鍵不放一個清除內容按鈕更受歡迎。 今天我將介紹三種為QLineEdit添加清除內容按鈕的方法,其中兩種方法有較強的功能針對性,另一種方法則是通用的,不僅可以用來實現清除輸入內容,還可以擴展出其他功能。 本文索引 方法1:s ...
  • os.walk(top,topdown=True,onerror=None,followlinks=False) os.walk()是python中內置(built-in)的目錄樹生成(directory tree generator)函數。 對於每一個在top目錄下的子目錄(包括top目錄本身), ...
  • 1.項目啟動類application.java類名上增加@EnableEurekaServer註解,聲明是註冊中心 1 import org.springframework.boot.SpringApplication; 2 import org.springframework.boot.autoc ...
  • 平時我們使用最多的數據結構肯定是 HashMap,但是在使用的時候我們必須知道每個鍵值對的生命周期,並且手動清除它;但是如果我們不是很清楚它的生命周期,這時候就比較麻煩;通常有這樣幾種處理方式: 由一個線程定時處理,可以是 或者 ; 利用重寫 ,實現 FIFOCache 或者 LRUCache;可以 ...
  • 題目: “一幫一學習小組”是中小學中常見的學習組織方式,老師把學習成績靠前的學生跟學習成績靠後的學生排在一組。本題就請你編寫程式幫助老師自動完成這個分配工作,即在得到全班學生的排名後,在當前尚未分組的學生中,將名次最靠前的學生與名次最靠後的異性學生分為一組。 輸入格式: 輸入第一行給出正偶數N(≤5 ...
  • C/S結構瞭解 所謂的C/S就是客戶端(client)和伺服器端(server)的簡稱,也就是在基於這個的基礎上編寫相關的代碼;一個就是客戶端一個就是服務端。 TCP(client) 客戶端編寫 因為是在Python2.7的版本所以使用該socket包 import socket 定義地址和埠 t ...
  • 本文講解了 Python 的 property 特性,即一種符合 Python 哲學地設置 getter 和 setter 的方式。 ...
  • 一 Servlet的API的訪問 1 完全解耦和的方式 完全看不到request、response、session。 (1)創建jsp界面 (2)編寫action (3)struts_xml 運行效果圖 調用put方法相當於往request域中存數據註意:這種方式只能獲得像 request sess ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...