concurrent.futures模塊簡單使用(線程池,進程池) ...
一、基類Executor
Executor類是ThreadPoolExecutor 和ProcessPoolExecutor 的基類。它為我們提供瞭如下方法:
submit(fn, *args, **kwargs):提交任務。以 fn(*args **kwargs) 方式執行並返回 Future 對像。
fn:函數地址。
*args:位置參數。
**kwargs:關鍵字參數。
map(func, *iterables, timeout=None, chunksize=1):
func:函數地址。
iterables:一個可迭代對象,以迭代的方式將參數傳遞給函數。
timeout:這個參數沒弄明白,如果是None等待所有進程結束。
chunksize:使用 ProcessPoolExecutor 時,這個方法會將 iterables 分割任務塊,並作為獨立的任務提交到執行池中。這些塊的數量可以由 chunksize 指定設置。 對很長的迭代器來說,設置chunksize 值比預設值 1 能顯著地提高性能。 chunksize 對 ThreadPoolExecutor 沒有效果。
shutdown(wait=True):如果為True會等待線程池或進程池執行完成後釋放正在使用的資源。如果 wait 為 False,將立即返回,所有待執行的期程完成執行後會釋放已分配的資源。 不管 wait 的值是什麼,整個 Python 程式將等到所有待執行的期程完成執行後才退出。
二、線程池對象
ThreadPoolExecutor 是 Executor 的子類,下麵介紹ThreadPoolExecutor 的參數。
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
max_workers:線程池的數量。
thread_name_prefix:線程名首碼。預設線程名ThreadPoolExecutor-線程數
initializer:一個函數或方法,在啟用線程前會調用這個函數(給線程池添加額外任務)。
initargs :以元祖的方式給initializer中的函數傳遞參數。
這裡需要說明的是除了max_workers這個參數外其它三個參數基本很少用。max_workers很好理解就是線程池的數量。
下麵來說initializer和initargs 這兩個奇怪的家伙。
示例一:
from concurrent.futures import ThreadPoolExecutor def work(): print('工作線程') def test(num): print('test:',num) executor = ThreadPoolExecutor(max_workers=2,initializer=test(7)) # 開啟2個線程 initializer指定參數test(7) executor.submit(work) executor.submit(work) # 列印內容如下 test: 7 工作線程 工作線程
示例二:
from concurrent.futures import ThreadPoolExecutor def work(): print('工作線程') def test(num): print('test:',num) executor = ThreadPoolExecutor(max_workers=2,initializer=test,initargs=(7,)) # 這裡我們使用initargs=(7,)的方式給test傳遞參數。 executor.submit(work) executor.submit(work) # 列印內容如下 test: 7 工作線程 工作線程 test: 7
通過示例一和示例二我們可以發現initializer=test(7)時,test函數只被調用了1次,當initializer=test,initargs=(7,)時,test被調用了2次。具體原因沒有去分析。感覺沒什麼用。以後有時間看看源碼在補上。
三、進程池對象
ProcessPoolExecutor 也是 Executor 的子類,下麵是ProcessPoolExecutor 參數介紹:
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
max_workers:工作進程數。如果 max_workers 為 None 或未給出,它將預設為機器的處理器個數。 如果 max_workers 小於等於 0,則將引發 ValueError。 在 Windows 上,max_workers 必須小於等於 61,否則將引發 ValueError。 如果 max_workers 為 None,則所選擇的預設最多為 61,即使存在更多處理器。
mp_context :可以是一個多進程上下文或是 None。 它將被用來啟動工作進程。 如果 mp_context 為 None 或未給出,將使用預設的多進程上下文。
initializer:一個函數或方法,在啟用線程前會調用這個函數。
initargs :以元祖的方式給initializer中的函數傳遞參數。
關於說initializer和initargs 與ThreadPoolExecutor 類似這裡不多說了。
四、創建線程池
from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工作線程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 創建線程池,數量為5 for i in range(5): executor.submit(work, i) print('主線程') # 列印內容如下 主線程 工作線程: 0 工作線程: 1 工作線程: 2 工作線程: 3 工作線程: 4
# 使用shutdown等待所有線程結束後在列印主線程 from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工作線程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 創建線程池,數量為5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待線程池結束 print('主線程') # 列印內容如下 工作線程: 0 工作線程: 1 工作線程: 2 工作線程: 3 工作線程: 4 主線程
如果想要線上程執行的過程中添加額外的功能,可以使用initializer參數,如下:
from concurrent.futures import ThreadPoolExecutor def work(num): print('工作線程:',num) def test(num): print('額外任務:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) print('主線程') # 列印內容如下 額外任務: 7 工作線程: 0 額外任務: 7 工作線程: 1 額外任務: 7 工作線程: 2 額外任務: 7 工作線程: 3 額外任務: 7 工作線程: 4 主線程
五、進程池
進程池與線程池用法基本一致,只是名字和實現不一樣而已。
from concurrent.futures import ProcessPoolExecutor import time def work(num): time.sleep(1) print('工作進程:',num) if __name__ == '__main__': executor = ProcessPoolExecutor(max_workers=5) # 創建進程池,數量為5 for i in range(5): executor.submit(work, i) print('主線程') # 列印內容如下 主線程 工作進程: 0 工作進程: 1 工作進程: 2 工作進程: 3 工作進程: 4 # 使用shutdown等待所有線程結束後在列印主線程 from concurrent.futures import ProcessPoolExecutor import time def work(num): time.sleep(1) print('工作進程:',num) if __name__ == '__main__': executor = ProcessPoolExecutor(max_workers=5) # 創建進程池,數量為5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待進程池結束 print('主線程') # 列印內容如下 工作進程: 0 工作進程: 1 工作進程: 2 工作進程: 3 工作進程: 4 主線程
如果想要線上程執行的過程中添加額外的功能,可以使用initializer參數,如下:
from concurrent.futures import ProcessPoolExecutor def work(num): print('工作進程:',num) def test(num): print('額外任務:',num) if __name__ == '__main__': executor = ProcessPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) print('主線程') # 列印內容如下 額外任務: 7 工作進程: 0 工作進程: 1 工作進程: 2 工作進程: 3 工作進程: 4 額外任務: 7 額外任務: 7 額外任務: 7 額外任務: 7 主線程
未完待續