MultiProcessing模塊是一個優秀的類似多線程MultiThreading模塊處理併發的包 之前接觸過一點這個庫,但是並沒有深入研究,這次閑著無聊就研究了一下,算是解惑吧。 ...
MultiProcessing模塊是一個優秀的類似多線程MultiThreading模塊處理併發的包
之前接觸過一點這個庫,但是並沒有深入研究,這次閑著無聊就研究了一下,算是解惑吧。
今天先研究下apply_async與map方法。傳聞就是這兩個方法分配進程池中的進程給相關函數,我想驗證下。
看下官網對這兩個的解釋:
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
A variant of the apply() method which returns a result object.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.
If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來運行它
下麵看下程式吧:
from multiprocessing import Pool import time import os def func(msg): print('msg: %s %s' % (msg, os.getpid())) time.sleep(3) print("end") if __name__ == '__main__': pool = Pool(4) for i in range(4): msg = 'hello %d' % (i) pool.apply_async(func, (msg, )) # pool.map(func, range(4)) print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~") pool.close() pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束 print("Sub-process(es) done.")
運行結果:
去掉map註釋,在apply_async函數處加上註釋
看下進程池進程不夠的情況下的程式及運行結果:
from multiprocessing import Pool import time import os def func(msg): print('msg: %s %s' % (msg, os.getpid())) time.sleep(3) print("end") if __name__ == '__main__': pool = Pool(3) '''for i in range(4): msg = 'hello %d' % (i) pool.apply_async(func, (msg, ))''' pool.map(func, range(4)) print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~") pool.close() pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束 print("Sub-process(es) done.")
程式結果:
可以看到,如果進程池的進程數量大於等於所要運行的函數的次數,那就可以很順利,而且看著結果也很理所當然;但是如果進程池的進程的數量小於所要運行的函數的次數,那麼就會有一個進程發生阻塞,即兩個或多個函數共用一個進程.
而且,apply_async函數的第二個參數傳入的是一個參數值,一旦運行這個函數,就會分配一個進程給函數,註意是非同步的哦,因此如果需要分配多個進程就需要有一個for迴圈或是while迴圈;對於map函數,其第二個參數值接收的是一個迭代器,因此就不用在用for迴圈了。要記住,這兩個函數所實現的就是依次將進程池裡的進程分配給函數。
順便吐槽下,全英文的 MultiProcessing官網 看的很懵逼痛苦,又很有意思,不得不說,對英語還是很有幫助的.....