Python 標準類庫-併發執行之multiprocessing-基於進程的並行

来源:https://www.cnblogs.com/shouke/archive/2023/06/19/17472025.html
-Advertisement-
Play Games

### 實踐環境 Python3.6 ### 介紹 `multiprocessing`是一個支持使用類似於線程模塊的API派生進程的包。該包同時提供本地和遠程併發,通過使用子進程而不是線程,有效地避開了全局解釋器鎖。因此,`multiprocessing`模塊允許程式員充分利用給定機器上的多個處理器 ...


實踐環境

Python3.6

介紹

multiprocessing是一個支持使用類似於線程模塊的API派生進程的包。該包同時提供本地和遠程併發,通過使用子進程而不是線程,有效地避開了全局解釋器鎖。因此,multiprocessing模塊允許程式員充分利用給定機器上的多個處理器。它同時在Unix和Windows上運行。

該模塊還引入了線上程模塊中沒有類似程式的API。這方面的一個主要例子是Pool對象,它提供了一種方便的方法,可以在多個輸入值的情況下,為進程之間分配輸入數據(數據並行),實現並行執行函數。以下示例演示了在模塊中定義此類函數,以便子進程能夠成功導入該模塊的常見做法。這個使用Pool實現數據並行的基本示例

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

控制台輸出:

[1, 4, 9]

Process類

multiprocessing中,進程是通過創建一個Process類並調用其start()方法來派生的。Process遵循threading.Thread的API。multiprocess程式的一個微小的例子:

from multiprocessing import Process

def f(name):
    print('hello', name) # 輸出:hello shouke

if __name__ == '__main__':
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

下麵是一個擴展示例,顯示所涉及的各個進程ID:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

控制台輸出:

main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke

上下文和啟動方法

根據平臺的不同,multiprocessing支持三種啟動進程的方式。這些啟動方法是

  • spawn

    父進程啟動一個新的python解釋器進程。子進程將只繼承那些運行進程對象run()方法所需的資源。特別是,來自父進程的不必要的文件描述符和句柄將不會被繼承。與使用forkforkserver相比,使用此方法啟動進程相當慢。可在Unix和Windows上使用。Windows上預設使用該啟動方法。

  • fork

    父進程使用os.fork()來fork Python解釋器。子進程在開始時實際上與父進程相同。父進程的所有資源都由子進程繼承。請註意,安全地fork多線程進程是有問題的。僅在Unix上可用。Unix上預設會用該方法。

  • forkserver

    當程式啟動並選擇forkserver啟動方法時,伺服器進程就會啟動。從那時起,每當需要新進程時,父進程都會連接到伺服器,並請求它fork一個新進程。fork伺服器進程是單線程的,因此使用os.fork()是安全的。不會繼承不必要的資源。在支持通過Unix管道傳遞文件描述符的Unix平臺上可用。

To select a start method you use the set_start_method() in the if __name__ == '__main__' clause of the main module. For example

在3.4版本中進行了更改:在所有unix平臺上添加了spawn,併為一些unix平臺添加了forkserver。子進程不再繼承Windows上的所有父級可繼承句柄。

在Unix上,使用spawnforkserver啟動方法還將啟動一個信號量跟蹤器進程,該進程跟蹤程式進程創建的未鏈接的命名信號量。當所有進程都退出時,信號量跟蹤器將取消任何剩餘信號量的鏈接。通常應該沒有剩餘信號量,但如果一個進程被信號殺死,可能會有一些“泄露”的信號量。(取消命名信號量的鏈接是一個嚴重的問題,因為系統只允許有限的數量,並且在下次重新啟動之前不會自動取消鏈接。)

要選擇啟動方法,請在主模塊的 if __name__ == '__main__'子句中使用set_start_method()。例如

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get()) # 輸出 hello
    p.join()

set_start_method() 在一個程式中只能用一次

或者,也可以使用get_context()來獲取上下文對象。上下文對象與multiprocessing模塊具有相同的API,並允許在同一程式中使用多個啟動方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

請註意,與一個上下文相關的對象可能與不同上下文的進程不相容。特別是,使用fork上下文創建的鎖不能傳遞給使用spawnforkserver啟動方法啟動的進程。

想要使用特定啟動方法的庫可能應該使用get_context()來避免干擾庫用戶的選擇

在進程之間交換對象

multiprocessing支持進程之間的兩種通信通道

  • 隊列

    multiprocessing.Queue 類近乎是queue.Queue的克隆. 例如:

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

    隊列是線程和進程安全的。

    錯誤用法示例如下:

    from multiprocessing import Process, Queue
    
    q = Queue()
    
    def f():
        global q
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        p = Process(target=f)
        p.start()
        print(q.get())    # 取不到值
        p.join()
    

    涉及到類的時候咋處理呢?示例如下

    from multiprocessing import Process, Queue
    
    class TestClass:
        def __init__(self, q):
            self.q = q
    
        def f(self):
            self.q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        obj = TestClass(q)
        p = Process(target=obj.f)
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

    或者

    from multiprocessing import Process, Queue
    
    q = Queue()
    
    class TestClass:
        def f(self, q):
            q.put([42, None, 'hello'])
    
    
    if __name__ == '__main__':
        q = Queue()
        obj = TestClass()
        p = Process(target=obj.f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

    特別需要註意的是,由進程調用的target類函數中的其它普通屬性,和其它類函數中的同名屬性並不是共用的,除非也使用隊列或者其它共用方式,錯誤用法示例如下:

    import threading
    import time
    
    from multiprocessing import Process, Queue
    
    class TestClass:
        def __init__(self, q):
            self.q = q
            self.task_done = False
    
        def f1(self):
            i = 0
            while i < 5:
                self.q.put('hello')
                time.sleep(0.3)
                i += 1
            self.task_done = True
    
        def f2(self):
            
            # while死迴圈了
            while not self.q.empty() or not self.task_done: # self.task_done永遠為True
                try:
                    print(self.q.get_nowait())
                except Exception:
                    pass
    
        def run(self):
            thread = threading.Thread(target=self.f1,
                                      name="f1")
            thread.start()
    
            p = Process(target=self.f2)
            p.start()
    
    
    if __name__ == '__main__':
        q = Queue()
        obj = TestClass(q)
        obj.run()
    

    正確做法如下:

    import threading
    import time
    
    from multiprocessing import Process, Queue, active_children, Value
    
    class TestClass:
        def __init__(self, q, task_done):
            self.q = q
            self.task_done = task_done
    
        def f1(self):
            i = 0
            while i < 5:
                self.q.put('hello')
                time.sleep(0.3)
                i += 1
            self.task_done.value = 1
    
        def f2(self):
            item = ''
            while not self.q.empty() or self.task_done.value == 0:
                try:
                    item = self.q.get_nowait()
                    print(item)
                except Exception:
                    pass
    
        def run(self):
            thread = threading.Thread(target=self.f1,
                                      name="f1")
            thread.start()
    
            p = Process(target=self.f2)
            p.start()
    
    
    if __name__ == '__main__':
        q = Queue()
        task_done = Value('h', 0)
        obj = TestClass(q, task_done)
        obj.run()
    

    或者

  • 管道

    multiprocessing.Pipe函數返回一對由管道連接的連接對象,預設情況下管道是雙向的。例如:

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        p.join()
    

    multiprocessing.Pipe返回的兩個連接對象表示管道的兩端。每個連接對象都有multiprocessing.connection.sendmultiprocessing.connection.recv() 方法(以及其他方法)。請註意,如果兩個進程(或線程)試圖同時讀取或寫入管道的同一端,則管道中的數據可能會被破壞。當然,同時使用不同管道末端的進程不會有破壞數據的風險。

進程同步

multiprocessing包含來自threading中所有同步原語的等效項。例如,可以使用鎖來確保一次只有一個進程列印到標準輸出:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

進程之間共用狀態

如上所述,在進行併發編程時,通常最好儘可能避免使用共用狀態。當使用多個進程時尤其如此。

但是,如果您確實需要使用一些共用數據,那麼multiprocessing提供了幾種方法

共用記憶體

可以使用multiprocessing.Valuemultiprocessing.Array將數據存儲在共用記憶體映射中。例如,以下代碼

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value) # 輸出:3.1415927
    print(arr[:]) # 輸出:[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

創建numarr時使用的'd''i'參數是數組模塊使用的類型代碼:'d'表示雙精度浮點,'i'表示有符號整數。這些共用對象將是進程和線程安全的。

為了在使用共用記憶體時獲得更大的靈活性,可以使用multiprocessing.sharedtypes模塊,該模塊支持創建從共用記憶體分配的任意ctypes對象。

伺服器進程(Server Process)

Manager()返回的管理器對象控制一個伺服器進程,該進程可保存Python對象,並允許其他進程使用代理操作它們。

管理器對象返回的管理器支持類型 list, dict, multiprocessing.managers.Namespace, multiprocessing.Lock, multiprocessing.RLock, multiprocessing.Semaphore, multiprocessing.BoundedSemaphore, multiprocessing.Condition, multiprocessing.Event, multiprocessing.Barrier, multiprocessing.Queue, multiprocessing.Valuemultiprocessing.Array。例如

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d) # 輸出:{1: '1', '2': 2, 0.25: None}
        print(l) # 輸出:[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

伺服器進程管理器比使用共用記憶體對象更靈活,因為它們可以支持任意對象類型。此外,單個管理器可以由不同電腦上的進程通過網路共用。然而,它們比使用共用記憶體要慢。

使用進程池

Pool類代表一個工作進程池。它具有允許以幾種不同方式將任務轉移給工作進程的方法。

例如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import os
from multiprocessing import Pool, TimeoutError

def f(x):
    return x*x

if __name__ == '__main__':
    # 啟動 4 個工作進程
    with Pool(processes=4) as pool:
        # 輸出 "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10))) # 輸出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
        # 註意,此時採用的同步行,雖然是多進程,也要代碼全部執行完成才會繼續往下執行

        # 按任意順序列印相同數字
        print('列印相同數字')
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # 非同步計算“f(20)”
        print('非同步計算“f(20)”')
        res = pool.apply_async(f, (20,))      # 僅在一個進程中運行
        print(res.get(timeout=1))             # 列印 "400"

        # 非同步計算 "os.getpid()"
        print('非同步計算 "os.getpid()"')
        res = pool.apply_async(os.getpid, ()) # 僅在一個進程中運行
        print(res.get(timeout=1))             # 列印進程ID

        # 非同步啟動多個計算,可能使用更多進程
        print('非同步啟動多個計算')
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # 讓單個worker進程休眠10秒
        print('讓單個worker進程休眠10秒')
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("遇到multiprocessing.TimeoutError")

        print("此時,pool仍可用於更多的工作")

    # 退出 with 代碼塊,pool就停用了
    print("現在,pool已關閉,並且不再可用")

輸出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
列印相同數字
0
1
4
9
16
25
36
49
64
81
非同步計算“f(20)”
400
非同步計算 "os.getpid()"
13556
非同步啟動多個計算
[13556, 13556, 13556, 13556]
讓單個worker進程休眠10秒
遇到multiprocessing.TimeoutError
此時,pool仍可用於更多的工作
現在,pool已關閉,並且不再可用

請註意,池的方法只能由創建池的進程使用。

此程式包中的功能要求 __main__模塊可由子級導入。這意味著一些示例,如multiprocessing.pool.pool示例將無法在互動式解釋器中工作。例如

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process SpawnPoolWorker-6:
Process SpawnPoolWorker-7:
Process SpawnPoolWorker-5:
Traceback (most recent call last):
...
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>

(如果你嘗試這樣做,它實際上會以半隨機的方式輸出三個交錯的完整traceback,然後你可能不得不以某種方式停止主進程。)

API參考

multiprocessing包大部分複製線程模塊的API。

multiprocessing.Processexception

Process
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 

Process對象表示在立進程中運行的活動。Process類具有threading.Thread的所有方法的等價項。

構造函數應始終使用關鍵字參數調用。

  • group 應始終為None,它的存在只是為了與threading.Thread.target相容。

  • targetrun()方法調用的可調用對象。預設為None,表示不調用任何內容。

  • name 是進程名稱。

  • argstarget調用的參數元組。

  • kwargstarget調用的關鍵字參數字典。

  • daemon 用於設置將進程是否為守護進程,True - 是 或False - 否。如果為None(預設值),則將從創建進程中繼承。

預設情況下,不會向target傳遞任何參數。

如果子類重寫構造函數,則必須確保在對進程執行其他操作之前調用基類構造函數(Process.__init__())。

在版本3.3中更改:添加daemon參數

  • run()

    表示進程活動的方法。

    可以在子類中重寫此方法。標準run()方法調用作為target參數傳遞給對象構造函數的可調用對象(如果有的話),其中順序參數和關鍵字參數分別取自argskwargs參數

  • start()
    啟動進程活動。

    沒改進程對下最多只能調用一次。 它安排在單獨的進程中調用對象的run()方法。

  • join([timeout])

    如果可選參數timeoutNone(預設值),則該方法將阻塞,直到調用其join()方法的進程終止為止。如果timeout是一個正數,則表示最多阻塞timeout參數指定的秒數。請註意,如果該方法的進程終止或方法超時,則該方法將返回None。檢查進程的退出碼以確定它是否已終止。

    一個進程可以被join多次。

    註意:阻塞表示不繼續往下執行,如果阻塞超時,程式繼續往下還行,如果此時target未運行完成,主程式會等待其運行完成後才終止。

    進程不能join自身,因為這會導致死鎖。在進程啟動之前嘗試join進程是錯誤的。

  • name
    進程的名稱。一個字元串,僅用於識別目的。它沒有語義。多個進程可能被賦予相同的名稱。

    初始名稱由構造函數設置。如果沒有向構造函數提供顯式名稱,則進程名被構造為形如Process-N1:N2:…:Nk字元串,其中每個Nk是其父進程的第N個子節點。

  • is_alive()
    返回進程是否還存活

    大致上,進程對象從start()方法返回的那一刻起一直處於活動狀態,直到子進程終止。

  • daemon

    進程的守護進程標誌,一個布爾值。這必須在調用start()之前設置。

    初始值是從創建進程時繼承的。

    當進程退出時,它會嘗試終止其所有守護進程子進程。

    請註意,守護進程不允許創建子進程。否則,如果守護進程在其父進程退出時被終止,它的子進程將成為孤兒進程。此外,這些不是Unix守護進程或服務,它們是正常進程,如果非守護進程退出,它們將被終止(而不是被join)。

除了threading.Thread API之外,Process對象還支持以下屬性和方法:

  • pid
    返回進程ID。進程派生之前,其值為None

  • exitcode
    子進程的退出碼。如果進程尚未終止,則其值為None。負值-N表示子進程被信號N終止。

  • terminate()
    終止進程。在Unix上,這是使用SIGTERM信號完成的;在Windows上使用TerminateProcess()。請註意,退出handler和和finally子句等將不會被執行。

    請註意,進程的子進程不會被終止,它們只會成為孤兒進程

  • ..略,更多參考請查閱官方文檔

示例

Process的一些方法的示例用法

import multiprocessing, time, signal

p = multiprocessing.Process(target=time.sleep, args=(1000,))
print(p, p.is_alive()) # 輸出:<Process(Process-1, initial)> False
p.start()
print(p, p.is_alive()) # 輸出:<Process(Process-1, started)> True
p.terminate()
time.sleep(0.1)
print(p, p.is_alive()) # 輸出:<Process(Process-1, stopped[SIGTERM])> False
print(p.exitcode == -signal.SIGTERM) # 輸出:True
異常
  • exception multiprocessing.ProcessError

    所有multiprocessing異常的基類

  • exception multiprocessing.BufferTooShort

    當提供的緩衝區對象太小而無法讀取消息時引發的異常。

  • exception multiprocessing.AuthenticationError

    發生身份驗證錯誤時引發的異常

  • exception multiprocessing.TimeoutError

    具有timeout的方法超時引發的異常。

管道和隊列

  • class multiprocessing.Pipe([duplex])

    返回一對錶示管道終端的multiprocessing.Connection對象(conn1,conn2)。如果duplexTrue(預設值),則管道為雙向管道。如果duplexFalse,則管道是單向的:conn1只能用於接收消息,conn2只能用於發送消息

  • class multiprocessing.Queue([maxsize])

    返回使用管道和一些鎖/信號量實現的進程共用隊列。當進程第一次將項目放入隊列時,會啟動一個feeder線程,該線程將對象從緩衝區傳輸到管道中。來自標準庫的queue模塊的常見queue.Emptyqueue.Full異常被引發以發出超時信號。multiprocessing.Queue實現了Queue.Queue的所有方法,除了task_done()join()

  • qsize()

    返回隊列的大致大小。由於多線程/多進程的語義,這是不可靠的。

    請註意,這可能會在Unix平臺(如Mac OS X)上觸發NotImplementedError,因為其未實現sem_getvalue()

  • empty()

    如果隊列為空,則返回True,否則返回False。由於多線程/多處理語義的原因,這是不可靠的。

  • full()

    如果隊列已滿,則返回True,否則返回False。由於多線程/多處理語義的原因,這是不可靠的。

  • put(obj[, block[, timeout]])

    將obj放入隊列。如果可選參數blockTrue(預設值),並且timeoutNone(預設值),則必要時阻塞,直到有可用空閑slot。如果timeout是一個正數,最多會阻塞timeout指定秒數,並拋出queue.Full異常,如果在該時間內沒有可用slot的話。如果blockFalse,如果有可用空閑slot,則將項目放入隊列中,否則拋出queue.Full異常(在這種情況下會忽略timeout)。

  • put_nowait(obj)
    等價於put(obj, False)

  • get([block[, timeout]])

    從隊列中刪除並返回被刪除項目。如果參數blockTrue(預設值),並且timeoutNone(預設值),則獲取不到項目時阻塞,直到有可獲取項。如果timeout是一個正數,最多會阻塞timeout指定秒數,並拋出queue.Empty異常,如果在超時時間內沒有可用項目的話。如果blockFalse,如果有可獲取項,則立即返回項目,否則拋出queue.Empty異常(在這種情況下會忽略timeout)。

  • get_nowait()
    等價於get(False)

  • ..略,更多參考請查閱官方文檔

...略,更多參考請查閱官方文檔

雜項

  • multiprocessing.active_children()

    返回當前進程的所有活動子進程的列表。調用該方法的副作用是“阻塞”任何已經完成的進程(原文:Calling this has the side effect of “joining” any processes which have already finished。)

  • multiprocessing.cpu_count()

    返回系統的CPU數量。該數量並不等於當前進程可以使用的CPU數量。可用cpu的數量可以通過len(os.sched_getaffinity(0))獲取,不過可能會拋NotImplementedError異常。

  • multiprocessing.``current_process()

    返回當前進程對應的multiprocessing.Process對下。類似threading.current_thread()

  • multiprocessing.get_all_start_methods()

    返回支持的啟動方法的列表,其中第一個是預設方法。可能的啟動方法有'fork', 'spawn''forkserver'。在Windows上,僅 'spawn'可用。在Unix上,始終支持'fork''spawn',預設值為“'fork'

    3.4版新增

  • multiprocessing.get_start_method(allow_none=False)

    返回用於啟動進程的啟動方法的名稱。如果尚未設置啟動方法,且allow_noneFalse,則返回預設方法名詞,如果尚未設置啟動方法,並且allow_noneTrue,則返回None。返回值可以是'fork', 'spawn', 'forkserver'None. 'fork'為Unix上的預設值,而'spawn'則是Windows上的預設值。

    3.4版新增。

  • multiprocessing.``set_start_method(method)

    設置應用於啟動子進程的方法。method可以是 'fork', 'spawn''forkserver'。請註意,最多只能調用一次,並且應該在主模塊的if__name__=='__main__'子句中使用。

    3.4版新增。

  • ..略,更多參考請查閱官方文檔

..略,更多參考請查閱官方文檔

Process工具

可以創建一個進程池,用於執行使用multiprocessing.pool.Pool類提交給它的任務。

Pool類
  • class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

    一個進程池對象,用於控制可以向其提交作業的工作進程池。它支持帶有超時和回調的非同步結果,並具有並行map實現。

    • processes 是要使用的工作進程的數量。如果processesNone,則預設使用os.cpu_count()返回的數字。
    • initializer 如果值不為None,那麼每個工作進程在啟動時都會調用initializer(*initargs)
    • maxtasksperchild 是工作進程在退出並替換為新的工作進程之前可以完成的任務數,以便釋放未使用的資源。預設的maxtasksperchildNone,這意味著工作進程存活時間將與進程池一樣長。
    • context 用於指定用於啟動工作進程的上下文。通常,進程池是使用上下文對象的函數multiprocessing.Pool()Pool()方法創建的。在這兩種情況下,上下文都設置得適當。

    請註意,池對象的方法只能由創建池的進程調用。

    3.2版新增:maxtasksperchild

    3.4版新增:context

    註意:

    池中的工作進程通常在工作隊列的整個持續時間內保持存活。在其他系統(如Apache、mod_wsgi等)中發現的一種釋放工作進程所持有資源的常見模式是,允許池中的工作進程在退出、清理和生成新進程以取代舊進程之前只完成一定數量的工作。池的maxtasksperchild參數向最終用戶暴露了這一能力。

    apply(func[, args[, kwds]])

    使用參數args和關鍵字參數kwds調用func。它會阻塞,直到可獲取結果為止。考慮到阻塞問題,apply_async()更適合併行執行工作。此外,func只在池的一個工作進程中執行。

    apply_async(func[, args[, kwds[, callback[, error_callback]]]])

    apply()方法的變體,返回結果對象。

    如果指定了callback,那麼它應該是一個接受單個參數的可調用函數。當可獲取結果時,將對其應用callback,除非調用失敗,在這種情況下,將對其應用error_callback

    如果指定了error_callback,那麼它應該是一個接受單個參數的可調用函數。如果目標函數失敗,則會使用異常實例調用error_callback

    回調應該立即完成,否則處理結果的線程將被阻塞。

    map(func, iterable[, chunksize])

    內置函數map()的並行等價物(不過它只支持一個iterable參數)。它會阻塞,直到可獲取結果。

    該方法將iterable分割為多個塊,並將這些塊作為單獨的任務提交給進程池。可以通過將chunksize設置為正整數來指定這些塊的(近似)大小。

    map_async(func, iterable[, chunksize[, callback[, error_callback]]])

    map()方法的一個變體,它返回一個結果對象。

    如果指定了callback ,那麼它應該是一個接受單個參數的可調用函數。當可獲取結果時,將對其應用callback,除非調用失敗,在這種情況下,將應用error_callback

    如果指定了error_callback,那麼它應該是一個接受單個參數的可調用函數。如果目標函數失敗,則會使用異常實例調用error_callback

    回調應該立即完成,否則處理結果的線程將被阻塞。

    imap(func, iterable[, chunksize])

    map()的一個更惰性版本。

    chunksize參數與map()方法使用的參數相同。對於非常長的迭代,使用較大的chunksize值可以使作業比使用預設值1更快地完成。

    此外,如果chunksize為1,則imap()方法返回的迭代器的next()方法有一個可選的timeout參數:如果無法在timeout秒內返回結果,next(timeout)將引發multiprocessing.TimeoutError

    imap_unordered(func, iterable[, chunksize])
    imap()相同,只是返回迭代器的結果的順序是任意的。(只有當只有一個工作進程時,才能保證順序“正確”)

    starmap(func, iterable[, chunksize])

    類似於map(),只是iterable的元素被當做參數,不拆解。

    因此,[(1,2), (3,4)]的迭代結果是[func(1,2),func(3,4)]。

    3.3版新增。

    starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

    starma()map_async()的組合,對可迭代項中的可迭代項進行迭代,併在未拆解可迭代項的情況下調用func。返回一個結果對象。

    3.3版新增。

    close()
    阻止將更多任務提交到進程池中。完成所有任務後,工作進程將退出。

    terminate()

    在未完成未完成的工作的情況下立即停止工作進程。當進程池對象被垃圾回收時,將立即調用terminate()

    join()

    等待工作進程退出。在使用join()之前,必須調用close()terminate()

    3.3版新增:進程池對象現在支持上下文管理協議——請參閱上下文管理器類型__enter__()返回池對象,__exit_()調用terminate()

AsyncResult
  • class multiprocessing.pool.AsyncResult

    Pool.apply_async()和Pool.map_async()返回的結果類。

get([timeout])
當結果已準備好時返回結果。如果timeout不是None,並且沒有在timeout秒內獲取到結果,則會引發multiprocessing.TimeoutError。如果遠程調用引發了異常,則該異常將由get()重新拋出。

wait([timeout])
等待,直到結果可獲取,或者直到超過timeout秒。

ready()
返回調用是否完成

successful()

返回調用是否已完成,不引發異常。如果結果還未準備好,將引發AssertionError

進程池使用示例
from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

...略

作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436

Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
           微信打賞                        支付寶打賞                  全國軟體測試交流QQ群  
              


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在微服務工程的技術選型中,會涉及到很多組件的集成,最常用包括:緩存、消息隊列、搜索、定時任務、存儲等幾個方面;隨著系統的服務數量上升,統一管理各種組件的複雜度也會提高; ...
  • 初識Java 1.Java背景知識 java是美國sun公司(Stanford University Network)在1995年推出的一門電腦高級編程語言。 Java早期稱為Oak(橡樹),後期改名為Java。 Java之父:詹姆斯·高斯林(James Gosling)。 2009年sun公司被 ...
  • 使用 QCustomPlot 繪圖庫輔助開發時整理的學習筆記。本篇介紹如何使用 QCustomPlot 繪製 x-y 曲線圖,需要 x 軸數據與 y 軸數據都已知,示例中使用的 QCustomPlot 版本為 Version 2.1.1,QT 版本為 5.9.2。 ...
  • 某日二師兄參加XXX科技公司的C++工程師開發崗位第19面: > 面試官:什麼是智能指針? > > 二師兄:智能指針是C++11引入的類模板,用於管理資源,行為類似於指針,但不需要手動申請、釋放資源,所以稱為智能指針。 > > 面試官:C++11引入了哪些智能指針? > > 二師兄:三種,分別是`s ...
  • # 高階函數 ## 函數可以作為參數進行傳遞和返回值進行返回 ```Scala //傳一個a乘b 就返回一個函數,邏輯是實現兩數相乘 //傳一個a*b 返回一個函數,邏輯是實現兩數相乘 //傳一個axb 返回一個函數,邏輯是實現兩數相乘 def funTest6(str:String,fun:(St ...
  • typora-copy-images-to: upload # 頁面預覽 ## 訂單詳情 ![image-20230227071834134](https://s2.loli.net/2023/06/19/8rXsPWOn3MdlRNx.png) ![image-20230227071900964] ...
  • 緩衝池是主存儲器中的一個區域,在訪問 table 和索引數據時 InnoDB 會對其進行緩存。緩衝池允許直接從記憶體中訪問頻繁使用的數據,從而加快處理速度。在專用伺服器上,通常將高達 80% 的物理記憶體分配給緩衝池。 ...
  • > 在[上一章](https://www.yuque.com/docs/share/adb5b1e4-f3c6-46fd-ba4b-4dabce9b4f2a?# 《現代C++學習指南-類型系統》)我們探討了C++的類型系統,並提出了從低到高,又從高到低的學習思路,本文就是一篇從高到低的學習指南,希望 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...