Python併發編程之學習非同步IO框架:asyncio 中篇(十)

来源:https://www.cnblogs.com/wongbingming/archive/2018/05/30/9114171.html
-Advertisement-
Play Games

大家好,併發編程 進入第十章。好了,今天的內容其實還挺多的,我準備了三天,到今天才整理完畢。希望大家看完,有所收穫的,能給小明一個贊。這就是對小明最大的鼓勵了。為了更好地銜接這一節,我們先來回顧一下上一節的內容。 上一節「」,我們首先介紹了,如何創建一個協程對象.主要有兩種方法 通過async關鍵字 ...


大家好,併發編程 進入第十章。
好了,今天的內容其實還挺多的,我準備了三天,到今天才整理完畢。希望大家看完,有所收穫的,能給小明一個贊。這就是對小明最大的鼓勵了。
為了更好地銜接這一節,我們先來回顧一下上一節的內容。

上一節「」,我們首先介紹了,如何創建一個協程對象.
主要有兩種方法

  • 通過async關鍵字,
  • 通過@asyncio.coroutine 裝飾函數。

然後有了協程對象,就需要一個事件迴圈容器來運行我們的協程。其主要的步驟有如下幾點:

  • 將協程對象轉為task任務對象
  • 定義一個事件迴圈對象容器用來存放task
  • 將task任務扔進事件迴圈對象中並觸發

為了讓大家,對生成器和協程有一個更加清晰的認識,我還介紹了yieldasync/await的區別。

最後,我們還講了,如何給一個協程添加回調函數。

好了,用個形象的比喻,上一節,其實就只是講了協程中的單任務。哈哈,是不是還挺難的?希望大家一定要多看幾遍,多敲代碼,不要光看哦。

那麼這一節,我們就來看下,協程中的多任務

. 本文目錄

  • 協程中的併發
  • 協程中的嵌套
  • 協程中的狀態
  • gather與wait

. 協程中的併發

協程的併發,和線程一樣。舉個例子來說,就好像 一個人同時吃三個饅頭,咬了第一個饅頭一口,就得等這口咽下去,才能去啃第其他兩個饅頭。就這樣交替換著吃。

asyncio實現併發,就需要多個協程來完成任務,每當有任務阻塞的時候就await,然後其他協程繼續工作。

第一步,當然是創建多個協程的列表。

# 協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)

# 協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

# 將協程轉成task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

第二步,如何將這些協程註冊到事件迴圈中呢。

有兩種方法,至於這兩種方法什麼區別,稍後會介紹。

  • 使用asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
  • 使用asyncio.gather()
# 千萬註意,這裡的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))

最後,return的結果,可以用task.result()查看。

for task in tasks:
print('Task ret: ', task.result())

完整代碼如下

import asyncio

# 協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)

# 協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

# 將協程轉成task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
print('Task ret: ', task.result())

輸出結果

Waiting:  1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s

. 協程中的嵌套

使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來。

來看個例子。

import asyncio

# 用於內部的協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)

# 外部的協程函數
async def main():
# 創建三個協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

# 將協程轉為task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

# 【重點】:await 一個task列表(協程)
# dones:表示已經完成的任務
# pendings:表示未完成的任務
dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

如果這邊,使用的是asyncio.gather(),是這麼用的

# 註意這邊返回結果,與await不一樣

results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)

輸出還是一樣的。

Waiting:  1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s

仔細查看,可以發現這個例子完全是由 上面「協程中的併發」例子改編而來。結果完全一樣。只是把創建協程對象,轉換task任務,封裝成在一個協程函數里而已。外部的協程,嵌套了一個內部的協程。

其實你如果去看下asyncio.await()的源碼的話,你會發現下麵這種寫法

loop.run_until_complete(asyncio.wait(tasks))

看似沒有嵌套,實際上內部也是嵌套的。

這裡也把源碼,貼出來,有興趣可以看下,沒興趣,可以直接跳過。

# 內部協程函數
async def _wait(fs, timeout, return_when, loop):
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)

def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)

for f in fs:
f.add_done_callback(_on_completion)

try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()

done, pending = set(), set()
for f in fs:
f.remove_done_callback(_on_completion)
if f.done():
done.add(f)
else:
pending.add(f)
return done, pending

# 外部協程函數
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')

if loop is None:
loop = events.get_event_loop()

fs = {ensure_future(f, loop=loop) for f in set(fs)}
# 【重點】:await一個內部協程
return await _wait(fs, timeout, return_when, loop)

. 協程中的狀態

還記得我們在講生成器的時候,有提及過生成器的狀態。同樣,在協程這裡,我們也瞭解一下協程(準確的說,應該是Future對象,或者Task任務)有哪些狀態。

Pending:創建future,還未執行
Running:事件迴圈正在調用執行任務
Done:任務執行完畢
Cancelled:Task被取消後的狀態

可手工 python3 xx.py 執行這段代碼,

import asyncio
import threading
import time

async def hello():
print("Running in the loop...")
flag = 0
while flag < 1000:
with open("F:\\test.txt", "a") as f:
f.write("------")
flag += 1
print("Stop the loop")

if __name__ == '__main__':
coroutine = hello()
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)

# Pending:未執行狀態
print(task)
try:
t1 = threading.Thread(target=loop.run_until_complete, args=(task,))
# t1.daemon = True
t1.start()

# Running:運行中狀態
time.sleep(1)
print(task)
t1.join()
except KeyboardInterrupt as e:
# 取消任務
task.cancel()
# Cacelled:取消任務
print(task)
finally:
print(task)

順利執行的話,將會列印 Pending -> Pending:Runing -> Finished 的狀態變化

假如,執行後 立馬按下 Ctrl+C,則會觸發task取消,就會列印 Pending -> Cancelling -> Cancelling 的狀態變化。

. gather與wait

還記得上面我說,把多個協程註冊進一個事件迴圈中有兩種方法嗎?

  • 使用asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
  • 使用asyncio.gather()
# 千萬註意,這裡的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))

asyncio.gatherasyncio.wait 在asyncio中用得的比較廣泛,這裡有必要好好研究下這兩貨。

還是照例用例子來說明,先定義一個協程函數

import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))

接收參數方式

asyncio.wait

接收的tasks,必須是一個list對象,這個list對象里,存放多個的task。

它可以這樣,用asyncio.ensure_future轉為task對象

tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

也可以這樣,不轉為task對象。

loop = asyncio.get_event_loop()

tasks=[
factorial("A", 2),
factorial("B", 3),
factorial("C", 4)
]

loop.run_until_complete(asyncio.wait(tasks))

asyncio.gather

接收的就比較廣泛了,他可以接收list對象,但是 * 不能省略

tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B",
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 在單體架構的秒殺活動中,為了減輕DB層的壓力,這裡我們採用了Lock鎖來實現秒殺用戶排隊搶購。然而很不幸的是儘管使用了鎖,但是測試過程中仍然會超賣,執行了N多次發現依然有問題。輸出一下代碼吧,可能大家看的比較真切: 代碼寫在service層,bean預設是單例的,也就是說lock肯定是一個對象 ...
  • Redis 概述 在我們日常的Java Web開發中,無不都是使用資料庫來進行數據的存儲,由於一般的系統任務中通常不會存在高併發的情況,所以這樣看起來並沒有什麼問題,可是一旦涉及大數據量的需求,比如一些商品搶購的情景,或者是主頁訪問量瞬間較大的時候,單一使用資料庫來保存數據的系統會因為面向磁碟,磁碟 ...
  • https://www.cnblogs.com/taoweiji/archive/2012/12/14/2818787.html GridBagLayout是java裡面最重要的佈局管理器之一,可以做出很複雜的佈局,可以說GridBagLayout是必須要學好的的, GridBagLayout 類是 ...
  • 對於volatile型變數的特殊規則 關鍵字volatile可以說是Java虛擬機提供的最輕量級的同步機制。 在處理多線程數據競爭問題時,不僅僅是可以使用synchronized關鍵字來實現,使用volatile也可以實現。 Java記憶體模型對volatitle專門定義了一些特殊的訪問規則,當一個變 ...
  • 承接上篇文章Django Rest Framework源碼剖析(二) 許可權,當服務的介面被頻繁調用,導致資源緊張怎麼辦呢?當然或許有很多解決辦法,比如:負載均衡、提高伺服器配置、通過代理限制訪問頻率等,但是django rest framework自身就提供了訪問頻率的控制,可以從代碼本身做控制。 ...
  • Long Parameter List(過長參數列) Divergent Change(發散式變化) Shotgun Surgery(散彈式修改) Feature Envy(依戀情結) Data Clumps(數據泥團) Primitive Obsession(基本型別偏執) Switch Stat... ...
  • 在Flask中鉤子函數是使用特定的裝飾器的函數。為什麼叫做鉤子函數呢,是因為鉤子函數可以在正常執行的代碼中,插入一段自己想要執行的代碼,那麼這種函數就叫做鉤子函數。 before_first_request:Flask項目第一次部署後會執行的鉤子函數。 before_request:請求已經到達了F ...
  • 博客園是面向開發者的知識分享社區,不允許發佈任何推廣、廣告、政治方面的內容。 博客園首頁(即網站首頁)只能發佈原創的、高質量的、能讓讀者從中學到東西的內容。 如果博文質量不符合首頁要求,會被工作人員移出首頁,望理解。如有疑問,請聯繫[email protected]。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...