1. 基礎介紹 ThreadPoolExecutor是Python標準庫concurrent.futures模塊中的一個類,用於實現線程池的功能。 ThreadPoolExecutor模塊相比於threading等模塊,通過submit方法返回的是一個Future對象,它代表了一個未來可期的結果。通 ...
1. 基礎介紹
ThreadPoolExecutor
是Python標準庫concurrent.futures
模塊中的一個類,用於實現線程池的功能。
ThreadPoolExecutor
模塊相比於threading
等模塊,通過submit
方法返回的是一個Future
對象,它代表了一個未來可期的結果。通過Future
對象,我們可以在主線程(或主進程)中獲取某個線程(或任務)的狀態以及返回值,實現了多線程和多進程編碼介面的一致性。
具體來說,Future
對象具有以下特點:
-
獲取狀態和返回值:通過
result()
方法可以獲取一個任務的執行結果。如果任務尚未完成,調用result()
方法會阻塞主線程,直到任務完成並返回結果。 -
非同步通知:當一個線程完成時,主線程可以立即得到通知。可以通過
done()
方法判斷任務是否已完成,或使用add_done_callback()
方法註冊一個回調函數,在任務完成時自動調用該函數。 -
異常處理:如果任務拋出異常,
Future
對象會將異常拋出到主線程。可以使用exception()
方法獲取異常對象。
通過返回Future
對象,我們可以更方便地管理和控制線程池中的任務。可以在主線程中獲取任務的狀態、返回值和異常信息,避免了線程之間的顯式同步和等待。
總的來說,ThreadPoolExecutor
模塊提供了一種高級的多線程編程介面,使得多線程編程更加簡潔和易用。它實現了多線程和多進程的編碼介面一致性,使得我們可以使用類似的方式處理多線程和多進程編程任務。
2. 基礎使用
創建線程池對象
可以使用ThreadPoolExecutor
類創建一個線程池對象。可以指定線程池的大小(即可同時運行的線程數量),也可以使用預設值(大小為系統預設的處理器數量)。
from concurrent.futures import ThreadPoolExecutor import time def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 表示在這個線程池中同時運行的線程有3個線程
提交任務
使用submit()
方法向線程池提交任務,該方法接受一個可調用對象(函數、方法等)作為參數,並返回一個Future
對象,表示非同步執行的結果。
def my_task(arg): # 執行任務的代碼 return result # 提交任務到線程池 future = executor.submit(my_task, arg)
獲取任務結果
可以使用Future
對象的result()
方法來獲取任務的結果。如果任務尚未完成,result()
方法會阻塞當前線程,直到任務完成並返回結果。
# 獲取任務的結果 result = future.result()
獲取一組任務結果
使用submit()
方法向線程池提交一組任務,並獲取返回的Future
對象列表,使用as_completed()
函數迭代處理Future
對象列表,它會在任務完成時產生結果。可以使用next()
函數或直接使用for
迴圈來獲取結果
def my_task(arg): # 執行任務的代碼 return result args = [arg1, arg2, arg3, ...] futures = [executor.submit(my_task, arg) for arg in args] # 使用next()函數獲取每個任務的結果 for future in as_completed(futures): result = future.result() # 處理任務結果 # 或者使用for迴圈獲取每個任務的結果 for future in as_completed(futures): result = future.result() # 處理任務結果
批量提交任務
除了逐個提交任務,還可以使用map()
方法批量提交任務。map()
方法接受一個可調用對象和一個可迭代的參數列表,然後並行地對參數列表中的每個參數調用可調用對象,並返回一個迭代器,用於獲取每個任務的結果。
def my_task(arg): # 執行任務的代碼 return result args = [arg1, arg2, arg3, ...] # 批量提交任務並獲取結果 results = executor.map(my_task, args)
等待任務完成
使用wait()
方法等待所有已提交的任務完成。可以指定超時時間,如果超時時間到達而還有任務未完成,則不再等待並返回結果。
from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED # 等待所有任務完成 executor.wait(futures) # 等待任意任務完成 executor.wait(futures, return_when=FIRST_COMPLETED) # 等待所有任務完成或達到超時時間(單位為秒) executor.wait(futures, timeout=10)
wait()
方法接受三個參數:
fs
:要等待的Future
對象列表。timeout
:可選參數,指定等待的超時時間(單位為秒)。如果超時時間到達而還有任務未完成,則不再等待並返回結果。return_when
:可選參數,指定返回結果的條件。預設為ALL_COMPLETED
,表示等待所有任務完成;也可以指定為FIRST_COMPLETED
,表示等待任意一個任務完成。
關閉線程池
在不再需要線程池時,應該調用shutdown()
方法關閉線程池。關閉線程池後,將不再接受新的任務提交,但會等待已提交的任務完成。
# 關閉線程池 executor.shutdown()