### 實踐環境 Python3.6 ### 介紹 `multiprocessing`是一個支持使用類似於線程模塊的API派生進程的包。該包同時提供本地和遠程併發,通過使用子進程而不是線程,有效地避開了全局解釋器鎖。因此,`multiprocessing`模塊允許程式員充分利用給定機器上的多個處理器 ...
實踐環境
Python3.6
介紹
multiprocessing
是一個支持使用類似於線程模塊的API派生進程的包。該包同時提供本地和遠程併發,通過使用子進程而不是線程,有效地避開了全局解釋器鎖。因此,multiprocessing
模塊允許程式員充分利用給定機器上的多個處理器。它同時在Unix和Windows上運行。
該模塊還引入了線上程模塊中沒有類似程式的API。這方面的一個主要例子是Pool
對象,它提供了一種方便的方法,可以在多個輸入值的情況下,為進程之間分配輸入數據(數據並行),實現並行執行函數。以下示例演示了在模塊中定義此類函數,以便子進程能夠成功導入該模塊的常見做法。這個使用Pool
實現數據並行的基本示例
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
控制台輸出:
[1, 4, 9]
Process類
在multiprocessing
中,進程是通過創建一個Process
類並調用其start()
方法來派生的。Process
遵循threading.Thread
的API。multiprocess
程式的一個微小的例子:
from multiprocessing import Process
def f(name):
print('hello', name) # 輸出:hello shouke
if __name__ == '__main__':
p = Process(target=f, args=('shouke',))
p.start()
p.join()
下麵是一個擴展示例,顯示所涉及的各個進程ID:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('shouke',))
p.start()
p.join()
控制台輸出:
main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke
上下文和啟動方法
根據平臺的不同,multiprocessing
支持三種啟動進程的方式。這些啟動方法是
-
spawn
父進程啟動一個新的python解釋器進程。子進程將只繼承那些運行進程對象
run()
方法所需的資源。特別是,來自父進程的不必要的文件描述符和句柄將不會被繼承。與使用fork或forkserver相比,使用此方法啟動進程相當慢。可在Unix和Windows上使用。Windows上預設使用該啟動方法。 -
fork
父進程使用
os.fork()
來fork Python解釋器。子進程在開始時實際上與父進程相同。父進程的所有資源都由子進程繼承。請註意,安全地fork多線程進程是有問題的。僅在Unix上可用。Unix上預設會用該方法。 -
forkserver
當程式啟動並選擇forkserver啟動方法時,伺服器進程就會啟動。從那時起,每當需要新進程時,父進程都會連接到伺服器,並請求它fork一個新進程。fork伺服器進程是單線程的,因此使用
os.fork()
是安全的。不會繼承不必要的資源。在支持通過Unix管道傳遞文件描述符的Unix平臺上可用。
To select a start method you use the set_start_method()
in the if __name__ == '__main__'
clause of the main module. For example
在3.4版本中進行了更改:在所有unix平臺上添加了spawn,併為一些unix平臺添加了forkserver。子進程不再繼承Windows上的所有父級可繼承句柄。
在Unix上,使用spawn或forkserver啟動方法還將啟動一個信號量跟蹤器進程,該進程跟蹤程式進程創建的未鏈接的命名信號量。當所有進程都退出時,信號量跟蹤器將取消任何剩餘信號量的鏈接。通常應該沒有剩餘信號量,但如果一個進程被信號殺死,可能會有一些“泄露”的信號量。(取消命名信號量的鏈接是一個嚴重的問題,因為系統只允許有限的數量,並且在下次重新啟動之前不會自動取消鏈接。)
要選擇啟動方法,請在主模塊的 if __name__ == '__main__'
子句中使用set_start_method()
。例如
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get()) # 輸出 hello
p.join()
set_start_method()
在一個程式中只能用一次
或者,也可以使用get_context()
來獲取上下文對象。上下文對象與multiprocessing
模塊具有相同的API,並允許在同一程式中使用多個啟動方法。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
請註意,與一個上下文相關的對象可能與不同上下文的進程不相容。特別是,使用fork上下文創建的鎖不能傳遞給使用spawn或forkserver啟動方法啟動的進程。
想要使用特定啟動方法的庫可能應該使用get_context()
來避免干擾庫用戶的選擇
在進程之間交換對象
multiprocessing
支持進程之間的兩種通信通道
-
隊列
multiprocessing.Queue
類近乎是queue.Queue
的克隆. 例如:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
隊列是線程和進程安全的。
錯誤用法示例如下:
from multiprocessing import Process, Queue q = Queue() def f(): global q q.put([42, None, 'hello']) if __name__ == '__main__': p = Process(target=f) p.start() print(q.get()) # 取不到值 p.join()
涉及到類的時候咋處理呢?示例如下
from multiprocessing import Process, Queue class TestClass: def __init__(self, q): self.q = q def f(self): self.q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() obj = TestClass(q) p = Process(target=obj.f) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
或者
from multiprocessing import Process, Queue q = Queue() class TestClass: def f(self, q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() obj = TestClass() p = Process(target=obj.f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
特別需要註意的是,由進程調用的target類函數中的其它普通屬性,和其它類函數中的同名屬性並不是共用的,除非也使用隊列或者其它共用方式,錯誤用法示例如下:
import threading import time from multiprocessing import Process, Queue class TestClass: def __init__(self, q): self.q = q self.task_done = False def f1(self): i = 0 while i < 5: self.q.put('hello') time.sleep(0.3) i += 1 self.task_done = True def f2(self): # while死迴圈了 while not self.q.empty() or not self.task_done: # self.task_done永遠為True try: print(self.q.get_nowait()) except Exception: pass def run(self): thread = threading.Thread(target=self.f1, name="f1") thread.start() p = Process(target=self.f2) p.start() if __name__ == '__main__': q = Queue() obj = TestClass(q) obj.run()
正確做法如下:
import threading import time from multiprocessing import Process, Queue, active_children, Value class TestClass: def __init__(self, q, task_done): self.q = q self.task_done = task_done def f1(self): i = 0 while i < 5: self.q.put('hello') time.sleep(0.3) i += 1 self.task_done.value = 1 def f2(self): item = '' while not self.q.empty() or self.task_done.value == 0: try: item = self.q.get_nowait() print(item) except Exception: pass def run(self): thread = threading.Thread(target=self.f1, name="f1") thread.start() p = Process(target=self.f2) p.start() if __name__ == '__main__': q = Queue() task_done = Value('h', 0) obj = TestClass(q, task_done) obj.run()
或者
-
管道
multiprocessing.Pipe
函數返回一對由管道連接的連接對象,預設情況下管道是雙向的。例如:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
multiprocessing.Pipe
返回的兩個連接對象表示管道的兩端。每個連接對象都有multiprocessing.connection.send
和multiprocessing.connection.recv()
方法(以及其他方法)。請註意,如果兩個進程(或線程)試圖同時讀取或寫入管道的同一端,則管道中的數據可能會被破壞。當然,同時使用不同管道末端的進程不會有破壞數據的風險。
進程同步
multiprocessing
包含來自threading
中所有同步原語的等效項。例如,可以使用鎖來確保一次只有一個進程列印到標準輸出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
進程之間共用狀態
如上所述,在進行併發編程時,通常最好儘可能避免使用共用狀態。當使用多個進程時尤其如此。
但是,如果您確實需要使用一些共用數據,那麼multiprocessing
提供了幾種方法
共用記憶體
可以使用multiprocessing.Value
或multiprocessing.Array
將數據存儲在共用記憶體映射中。例如,以下代碼
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value) # 輸出:3.1415927
print(arr[:]) # 輸出:[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
創建num
和arr
時使用的'd'
和'i'
參數是數組模塊使用的類型代碼:'d'
表示雙精度浮點,'i'
表示有符號整數。這些共用對象將是進程和線程安全的。
為了在使用共用記憶體時獲得更大的靈活性,可以使用multiprocessing.sharedtypes
模塊,該模塊支持創建從共用記憶體分配的任意ctypes
對象。
伺服器進程(Server Process)
Manager()
返回的管理器對象控制一個伺服器進程,該進程可保存Python對象,並允許其他進程使用代理操作它們。
管理器對象返回的管理器支持類型 list
, dict
, multiprocessing.managers.Namespace
, multiprocessing.Lock
, multiprocessing.RLock
, multiprocessing.Semaphore
, multiprocessing.BoundedSemaphore
, multiprocessing.Condition
, multiprocessing.Event
, multiprocessing.Barrier
, multiprocessing.Queue
, multiprocessing.Value
和multiprocessing.Array
。例如
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d) # 輸出:{1: '1', '2': 2, 0.25: None}
print(l) # 輸出:[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
伺服器進程管理器比使用共用記憶體對象更靈活,因為它們可以支持任意對象類型。此外,單個管理器可以由不同電腦上的進程通過網路共用。然而,它們比使用共用記憶體要慢。
使用進程池
Pool
類代表一個工作進程池。它具有允許以幾種不同方式將任務轉移給工作進程的方法。
例如:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import os
from multiprocessing import Pool, TimeoutError
def f(x):
return x*x
if __name__ == '__main__':
# 啟動 4 個工作進程
with Pool(processes=4) as pool:
# 輸出 "[0, 1, 4,..., 81]"
print(pool.map(f, range(10))) # 輸出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 註意,此時採用的同步行,雖然是多進程,也要代碼全部執行完成才會繼續往下執行
# 按任意順序列印相同數字
print('列印相同數字')
for i in pool.imap_unordered(f, range(10)):
print(i)
# 非同步計算“f(20)”
print('非同步計算“f(20)”')
res = pool.apply_async(f, (20,)) # 僅在一個進程中運行
print(res.get(timeout=1)) # 列印 "400"
# 非同步計算 "os.getpid()"
print('非同步計算 "os.getpid()"')
res = pool.apply_async(os.getpid, ()) # 僅在一個進程中運行
print(res.get(timeout=1)) # 列印進程ID
# 非同步啟動多個計算,可能使用更多進程
print('非同步啟動多個計算')
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# 讓單個worker進程休眠10秒
print('讓單個worker進程休眠10秒')
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("遇到multiprocessing.TimeoutError")
print("此時,pool仍可用於更多的工作")
# 退出 with 代碼塊,pool就停用了
print("現在,pool已關閉,並且不再可用")
輸出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
列印相同數字
0
1
4
9
16
25
36
49
64
81
非同步計算“f(20)”
400
非同步計算 "os.getpid()"
13556
非同步啟動多個計算
[13556, 13556, 13556, 13556]
讓單個worker進程休眠10秒
遇到multiprocessing.TimeoutError
此時,pool仍可用於更多的工作
現在,pool已關閉,並且不再可用
請註意,池的方法只能由創建池的進程使用。
此程式包中的功能要求
__main__
模塊可由子級導入。這意味著一些示例,如multiprocessing.pool.pool
示例將無法在互動式解釋器中工作。例如>>> from multiprocessing import Pool >>> p = Pool(5) >>> def f(x): ... return x*x ... >>> p.map(f, [1,2,3]) Process SpawnPoolWorker-6: Process SpawnPoolWorker-7: Process SpawnPoolWorker-5: Traceback (most recent call last): ... AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)> AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
(如果你嘗試這樣做,它實際上會以半隨機的方式輸出三個交錯的完整traceback,然後你可能不得不以某種方式停止主進程。)
API參考
multiprocessing
包大部分複製線程模塊的API。
multiprocessing.Process
和exception
Process
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
Process
對象表示在立進程中運行的活動。Process
類具有threading.Thread
的所有方法的等價項。
構造函數應始終使用關鍵字參數調用。
-
group
應始終為None
,它的存在只是為了與threading.Thread.target
相容。 -
target
供run()
方法調用的可調用對象。預設為None
,表示不調用任何內容。 -
name
是進程名稱。 -
args
是target
調用的參數元組。 -
kwargs
是target
調用的關鍵字參數字典。 -
daemon
用於設置將進程是否為守護進程,True
- 是 或False
- 否。如果為None
(預設值),則將從創建進程中繼承。
預設情況下,不會向target
傳遞任何參數。
如果子類重寫構造函數,則必須確保在對進程執行其他操作之前調用基類構造函數(Process.__init__()
)。
在版本3.3中更改:添加daemon
參數
-
run()
表示進程活動的方法。
可以在子類中重寫此方法。標準run()方法調用作為target參數傳遞給對象構造函數的可調用對象(如果有的話),其中順序參數和關鍵字參數分別取自
args
和kwargs
參數 -
start()
啟動進程活動。沒改進程對下最多只能調用一次。 它安排在單獨的進程中調用對象的
run()
方法。 -
join([timeout])
如果可選參數
timeout
為None
(預設值),則該方法將阻塞,直到調用其join()
方法的進程終止為止。如果timeout
是一個正數,則表示最多阻塞timeout
參數指定的秒數。請註意,如果該方法的進程終止或方法超時,則該方法將返回None
。檢查進程的退出碼以確定它是否已終止。一個進程可以被
join
多次。註意:阻塞表示不繼續往下執行,如果阻塞超時,程式繼續往下還行,如果此時
target
未運行完成,主程式會等待其運行完成後才終止。進程不能
join
自身,因為這會導致死鎖。在進程啟動之前嘗試join
進程是錯誤的。 -
name
進程的名稱。一個字元串,僅用於識別目的。它沒有語義。多個進程可能被賦予相同的名稱。初始名稱由構造函數設置。如果沒有向構造函數提供顯式名稱,則進程名被構造為形如
Process-N1:N2:…:Nk
字元串,其中每個Nk
是其父進程的第N個子節點。 -
is_alive()
返回進程是否還存活大致上,進程對象從
start()
方法返回的那一刻起一直處於活動狀態,直到子進程終止。 -
daemon
進程的守護進程標誌,一個布爾值。這必須在調用
start()
之前設置。初始值是從創建進程時繼承的。
當進程退出時,它會嘗試終止其所有守護進程子進程。
請註意,守護進程不允許創建子進程。否則,如果守護進程在其父進程退出時被終止,它的子進程將成為孤兒進程。此外,這些不是Unix守護進程或服務,它們是正常進程,如果非守護進程退出,它們將被終止(而不是被
join
)。
除了threading.Thread
API之外,Process
對象還支持以下屬性和方法:
-
pid
返回進程ID。進程派生之前,其值為None
-
exitcode
子進程的退出碼。如果進程尚未終止,則其值為None
。負值-N
表示子進程被信號N終止。 -
terminate()
終止進程。在Unix上,這是使用SIGTERM信號完成的;在Windows上使用TerminateProcess()
。請註意,退出handler和和finally子句等將不會被執行。請註意,進程的子進程不會被終止,它們只會成為孤兒進程
-
..略,更多參考請查閱官方文檔
示例
Process
的一些方法的示例用法
import multiprocessing, time, signal
p = multiprocessing.Process(target=time.sleep, args=(1000,))
print(p, p.is_alive()) # 輸出:<Process(Process-1, initial)> False
p.start()
print(p, p.is_alive()) # 輸出:<Process(Process-1, started)> True
p.terminate()
time.sleep(0.1)
print(p, p.is_alive()) # 輸出:<Process(Process-1, stopped[SIGTERM])> False
print(p.exitcode == -signal.SIGTERM) # 輸出:True
異常
-
exception
multiprocessing.ProcessError
所有
multiprocessing
異常的基類 -
exception
multiprocessing.BufferTooShort
當提供的緩衝區對象太小而無法讀取消息時引發的異常。
-
exception
multiprocessing.AuthenticationError
發生身份驗證錯誤時引發的異常
-
exception
multiprocessing.TimeoutError
具有
timeout
的方法超時引發的異常。
管道和隊列
-
class
multiprocessing.Pipe([duplex])
返回一對錶示管道終端的
multiprocessing.Connection
對象(conn1,conn2)
。如果duplex
為True
(預設值),則管道為雙向管道。如果duplex
為False
,則管道是單向的:conn1
只能用於接收消息,conn2
只能用於發送消息 -
class
multiprocessing.Queue([maxsize])
返回使用管道和一些鎖/信號量實現的進程共用隊列。當進程第一次將項目放入隊列時,會啟動一個feeder線程,該線程將對象從緩衝區傳輸到管道中。來自標準庫的
queue
模塊的常見queue.Empty
和queue.Full
異常被引發以發出超時信號。multiprocessing.Queue
實現了Queue.Queue
的所有方法,除了task_done()
和join()
-
qsize()
返回隊列的大致大小。由於多線程/多進程的語義,這是不可靠的。
請註意,這可能會在Unix平臺(如Mac OS X)上觸發
NotImplementedError
,因為其未實現sem_getvalue()
。 -
empty()
如果隊列為空,則返回
True
,否則返回False
。由於多線程/多處理語義的原因,這是不可靠的。 -
full()
如果隊列已滿,則返回
True
,否則返回False
。由於多線程/多處理語義的原因,這是不可靠的。 -
put(obj[, block[, timeout]])
將obj放入隊列。如果可選參數
block
為True
(預設值),並且timeout
為None
(預設值),則必要時阻塞,直到有可用空閑slot。如果timeout
是一個正數,最多會阻塞timeout
指定秒數,並拋出queue.Full
異常,如果在該時間內沒有可用slot的話。如果block
為False
,如果有可用空閑slot,則將項目放入隊列中,否則拋出queue.Full
異常(在這種情況下會忽略timeout
)。 -
put_nowait(obj)
等價於put(obj, False)
-
get([block[, timeout]])
從隊列中刪除並返回被刪除項目。如果參數
block
為True
(預設值),並且timeout
為None
(預設值),則獲取不到項目時阻塞,直到有可獲取項。如果timeout
是一個正數,最多會阻塞timeout
指定秒數,並拋出queue.Empty
異常,如果在超時時間內沒有可用項目的話。如果block
為False
,如果有可獲取項,則立即返回項目,否則拋出queue.Empty
異常(在這種情況下會忽略timeout
)。 -
get_nowait()
等價於get(False)
-
..略,更多參考請查閱官方文檔
...略,更多參考請查閱官方文檔
雜項
-
multiprocessing.active_children()
返回當前進程的所有活動子進程的列表。調用該方法的副作用是“阻塞”任何已經完成的進程(原文:Calling this has the side effect of “joining” any processes which have already finished。)
-
multiprocessing.cpu_count()
返回系統的CPU數量。該數量並不等於當前進程可以使用的CPU數量。可用cpu的數量可以通過
len(os.sched_getaffinity(0))
獲取,不過可能會拋NotImplementedError
異常。 -
multiprocessing.``current_process
()返回當前進程對應的
multiprocessing.Process
對下。類似threading.current_thread()
-
multiprocessing.get_all_start_methods()
返回支持的啟動方法的列表,其中第一個是預設方法。可能的啟動方法有
'fork'
,'spawn'
和'forkserver'
。在Windows上,僅'spawn'
可用。在Unix上,始終支持'fork'
和'spawn'
,預設值為“'fork'
。3.4版新增
-
multiprocessing.get_start_method(allow_none=False)
返回用於啟動進程的啟動方法的名稱。如果尚未設置啟動方法,且allow_none為
False
,則返回預設方法名詞,如果尚未設置啟動方法,並且allow_none為True
,則返回None
。返回值可以是'fork'
,'spawn'
,'forkserver'
或None
.'fork'
為Unix上的預設值,而'spawn'
則是Windows上的預設值。3.4版新增。
-
multiprocessing.``set_start_method
(method)設置應用於啟動子進程的方法。method可以是
'fork'
,'spawn'
或'forkserver'
。請註意,最多只能調用一次,並且應該在主模塊的if__name__=='__main__'
子句中使用。3.4版新增。
-
..略,更多參考請查閱官方文檔
..略,更多參考請查閱官方文檔
Process工具
可以創建一個進程池,用於執行使用multiprocessing.pool.Pool
類提交給它的任務。
Pool類
-
class
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
一個進程池對象,用於控制可以向其提交作業的工作進程池。它支持帶有超時和回調的非同步結果,並具有並行map實現。
processes
是要使用的工作進程的數量。如果processes
為None
,則預設使用os.cpu_count()
返回的數字。initializer
如果值不為None
,那麼每個工作進程在啟動時都會調用initializer(*initargs)
。maxtasksperchild
是工作進程在退出並替換為新的工作進程之前可以完成的任務數,以便釋放未使用的資源。預設的maxtasksperchild
為None
,這意味著工作進程存活時間將與進程池一樣長。context
用於指定用於啟動工作進程的上下文。通常,進程池是使用上下文對象的函數multiprocessing.Pool()
或Pool()
方法創建的。在這兩種情況下,上下文都設置得適當。
請註意,池對象的方法只能由創建池的進程調用。
3.2版新增:
maxtasksperchild
3.4版新增:
context
註意:
池中的工作進程通常在工作隊列的整個持續時間內保持存活。在其他系統(如Apache、mod_wsgi等)中發現的一種釋放工作進程所持有資源的常見模式是,允許池中的工作進程在退出、清理和生成新進程以取代舊進程之前只完成一定數量的工作。池的
maxtasksperchild
參數向最終用戶暴露了這一能力。apply(func[, args[, kwds]])
使用參數
args
和關鍵字參數kwds
調用func。它會阻塞,直到可獲取結果為止。考慮到阻塞問題,apply_async()
更適合併行執行工作。此外,func
只在池的一個工作進程中執行。apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply()
方法的變體,返回結果對象。如果指定了
callback
,那麼它應該是一個接受單個參數的可調用函數。當可獲取結果時,將對其應用callback
,除非調用失敗,在這種情況下,將對其應用error_callback
。如果指定了
error_callback
,那麼它應該是一個接受單個參數的可調用函數。如果目標函數失敗,則會使用異常實例調用error_callback
。回調應該立即完成,否則處理結果的線程將被阻塞。
map(func, iterable[, chunksize])
內置函數map()的並行等價物(不過它只支持一個
iterable
參數)。它會阻塞,直到可獲取結果。該方法將
iterable
分割為多個塊,並將這些塊作為單獨的任務提交給進程池。可以通過將chunksize
設置為正整數來指定這些塊的(近似)大小。map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map()
方法的一個變體,它返回一個結果對象。如果指定了
callback
,那麼它應該是一個接受單個參數的可調用函數。當可獲取結果時,將對其應用callback
,除非調用失敗,在這種情況下,將應用error_callback
。如果指定了
error_callback
,那麼它應該是一個接受單個參數的可調用函數。如果目標函數失敗,則會使用異常實例調用error_callback
。回調應該立即完成,否則處理結果的線程將被阻塞。
imap(func, iterable[, chunksize])
map()
的一個更惰性版本。chunksize
參數與map()
方法使用的參數相同。對於非常長的迭代,使用較大的chunksize
值可以使作業比使用預設值1更快地完成。此外,如果
chunksize
為1,則imap()
方法返回的迭代器的next()
方法有一個可選的timeout
參數:如果無法在timeout
秒內返回結果,next(timeout)
將引發multiprocessing.TimeoutError
imap_unordered(func, iterable[, chunksize])
與imap()
相同,只是返回迭代器的結果的順序是任意的。(只有當只有一個工作進程時,才能保證順序“正確”)starmap(func, iterable[, chunksize])
類似於
map()
,只是iterable的元素被當做參數,不拆解。因此,[(1,2), (3,4)]的迭代結果是[func(1,2),func(3,4)]。
3.3版新增。
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
starma()
和map_async()
的組合,對可迭代項中的可迭代項進行迭代,併在未拆解可迭代項的情況下調用func。返回一個結果對象。3.3版新增。
close()
阻止將更多任務提交到進程池中。完成所有任務後,工作進程將退出。terminate()
在未完成未完成的工作的情況下立即停止工作進程。當進程池對象被垃圾回收時,將立即調用
terminate()
。join()
等待工作進程退出。在使用
join()
之前,必須調用close()
或terminate()
。3.3版新增:進程池對象現在支持上下文管理協議——請參閱上下文管理器類型
__enter__()
返回池對象,__exit_()
調用terminate()
AsyncResult
類
-
class
multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()
返回的結果類。
get([timeout])
當結果已準備好時返回結果。如果timeout
不是None
,並且沒有在timeout
秒內獲取到結果,則會引發multiprocessing.TimeoutError
。如果遠程調用引發了異常,則該異常將由get()
重新拋出。
wait([timeout])
等待,直到結果可獲取,或者直到超過timeout
秒。
ready()
返回調用是否完成
successful()
返回調用是否已完成,不引發異常。如果結果還未準備好,將引發AssertionError
。
進程池使用示例
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
...略
作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
微信打賞
支付寶打賞 全國軟體測試交流QQ群