個人筆記,如有疏漏,還請指正。 使用多線程(threading)和多進程(multiprocessing)完成常規的併發需求,在啟動的時候 start、join 等步驟不能省,複雜的需要還要用 1 2 個隊列。 隨著需求越來越複雜,如果沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大 ...
個人筆記,如有疏漏,還請指正。
使用多線程(threading)和多進程(multiprocessing)完成常規的併發需求,在啟動的時候 start、join 等步驟不能省,複雜的需要還要用 1-2 個隊列。
隨著需求越來越複雜,如果沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大。
對於需要併發執行、但是對實時性要求不高的任務,我們可以使用 concurrent.futures 包中的 PoolExecutor 類來實現。
這個包提供了兩個執行器:線程池執行器 ThreadPoolExecutor 和進程池執行器 ProcessPoolExecutor,兩個執行器提供同樣的 API。
池的概念主要目的是為了重用:讓線程或進程在生命周期內可以多次使用。它減少了創建創建線程和進程的開銷,提高了程式性能。重用不是必須的規則,但它是程式員在應用中使用池的主要原因。
池,只有固定個數的線程/進程,通過 max_workers 指定。
- 任務通過 executor.submit 提交到 executor 的任務隊列,返回一個 future 對象。
- Future 是常見的一種併發設計模式。一個Future對象代表了一些尚未就緒(完成)的結果,在「將來」的某個時間就緒了之後就可以獲取到這個結果。
- 任務被調度到各個 workers 中執行。但是要註意,一個任務一旦被執行,在執行完畢前,會一直占用該 worker!
- 如果 workers 不夠用,其他的任務會一直等待!因此 PoolExecutor 不適合實時任務。
import concurrent.futures
import time
from itertools import count
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def evaluate_item(x):
for i in count(x): # count 是無限迭代器,會一直遞增。
print(f"{x} - {i}")
time.sleep(0.01)
if __name__ == "__main__":
# 進程池
start_time_2 = time.time()
# 使用 with 在離開此代碼塊時,自動調用 executor.shutdown(wait=true) 釋放 executor 資源
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 將 10 個任務提交給 executor,並收集 futures
futures = [executor.submit(evaluate_item, item) for item in number_list]
# as_completed 方法等待 futures 中的 future 完成
# 一旦某個 future 完成,as_completed 就立即返回該 future
# 這個方法,使每次返回的 future,總是最先完成的 future
# 而不是先等待任務 1,再等待任務 2...
for future in concurrent.futures.as_completed(futures):
print(future.result())
print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")
上面的代碼中,item 為 1 2 3 4 5 的五個任務會一直占用所有的 workers,而 6 7 8 9 10 這五個任務會永遠等待!!!
API 詳細說明
concurrent.futures 包含三個部分的 API:
- PoolExecutor:也就是兩個執行器的 API
- 構造器:主要的參數是 max_workers,用於指定線程池大小(或者說 workers 個數)
submit(fn, *args, **kwargs)
:將任務函數 fn 提交到執行器,args 和 kwargs 就是 fn 需要的參數。- 返回一個 future,用於獲取結果
map(func, *iterables, timeout=None, chunksize=1)
:當任務是同一個,只有參數不同時,可以用這個方法代替 submit。iterables 的每個元素對應 func 的一組參數。- 返回一個 futures 的迭代器
shutdown(wait=True)
:關閉執行器,一般都使用 with 管理器自動關閉。
- Future:任務被提交給執行器後,會返回一個 future
future.result(timout=None)
:最常用的方法,返回任務的結果。如果任務尚未結束,這個方法會一直等待!- timeout 指定超時時間,為 None 時沒有超時限制。
exception(timeout=None)
:給出任務拋出的異常。和 result() 一樣,也會等待任務結束。cancel()
:取消此任務add_done_callback(fn)
:future 完成後,會執行fn(future)
。running()
:是否正在運行done()
:future 是否已經結束了,boolean- ...詳見官方文檔
- 模塊帶有的實用函數
concurrent.futures.as_completed(fs, timeout=None)
:等待 fs (futures iterable)中的 future 完成- 一旦 fs 中的某 future 完成了,這個函數就立即返回該 future。
- 這個方法,使每次返回的 future,總是最先完成的 future。而不是先等待任務 1,再等待任務 2...
- 常通過
for future in as_completed(fs):
使用此函數。
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
:一直等待,直到 return_when 所指定的事發生,或者 timeout- return_when 有三個選項:ALL_COMPLETED(fs 中的 futures 全部完成),FIRST__COMPLETED(fs 中任意一個 future 完成)還有 FIRST_EXCEPTION(某任務拋出異常)
Future 設計模式
這裡的 PoolExecutor 的特點,在於它使用了 Future 設計模式,使任務的執行,與結果的獲取,變成一個非同步的流程。
我們先通過 submit/map 將任務放入任務隊列,這時任務就已經開始執行了!然後我們在需要的時候,通過 future 獲取結果,或者直接 add_done_callback(fn)
。
這裡任務的執行是在新的 workers 中的,主進程/線程不會阻塞,因此主線程可以乾其他的事。這種方式被稱作非同步編程。
畫外
concurrent.futures 基於 multiprocessing.pool 實現,因此實際上它比直接使用 線程/進程 的 Pool 要慢一點。但是它提供了更方便簡潔的 API。
參考
- 使用Python進行併發編程-PoolExecutor篇
- Python Parallel Programming Cookbook
- concurrent.futures — Launching parallel tasks
- 進程線程協程與併發並行
- 並行設計模式(一)-- Future模式