本文轉至http://www.cnblogs.com/kaituorensheng/p/4465768.html,在其基礎上進行了一些小小改動。 在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程式控制制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用m ...
本文轉至http://www.cnblogs.com/kaituorensheng/p/4465768.html,在其基礎上進行了一些小小改動。
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程式控制制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
例1:使用進程池
from multiprocessing import freeze_support,Pool import time def Foo(i): time.sleep(2) print('___time---',time.ctime()) return i+100 def Bar(arg): print('----exec done:',arg,time.ctime()) if __name__ == '__main__': freeze_support() pool = Pool(3) #線程池中的同時執行的進程數為3 for i in range(4): pool.apply_async(func=Foo,args=(i,),callback=Bar) #線程池中的同時執行的進程數為3,當一個進程執行完畢後,如果還有新進程等待執行,則會將其添加進去 # pool.apply(func=Foo,args=(i,)) print('end') pool.close() pool.join()#調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束View Code
執行結果:
end ___time--- Thu Jun 16 15:11:45 2016 ----exec done: 100 Thu Jun 16 15:11:45 2016 ___time--- Thu Jun 16 15:11:45 2016 ----exec done: 101 Thu Jun 16 15:11:45 2016 ___time--- Thu Jun 16 15:11:45 2016 ----exec done: 102 Thu Jun 16 15:11:45 2016 ___time--- Thu Jun 16 15:11:47 2016 ----exec done: 103 Thu Jun 16 15:11:47 2016
函數解釋:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
- close() 關閉pool,使其不在接受新的任務。
- terminate() 結束工作進程,不在處理未完成的任務。
- join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之後使用。
執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事後才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"後。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for迴圈後直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程式在pool.join()處等待各個進程的結束。
例2:使用進程池(阻塞)
from multiprocessing import freeze_support,Pool import time def Foo(i): time.sleep(2) print('___time---',time.ctime()) return i+100 def Bar(arg): print('----exec done:',arg,time.ctime()) if __name__ == '__main__': freeze_support() pool = Pool(3) #線程池中的同時執行的進程數為3 for i in range(4): pool.apply(func=Foo,args=(i,)) print('end') pool.close() pool.join()#調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束View Code
執行結果
___time--- Thu Jun 16 15:15:16 2016 ___time--- Thu Jun 16 15:15:18 2016 ___time--- Thu Jun 16 15:15:20 2016 ___time--- Thu Jun 16 15:15:22 2016 end
例3:使用進程池,並關註結果
import multiprocessing import time def func(msg): print('hello :',msg,time.ctime()) time.sleep(2) print('end',time.ctime()) return 'done' + msg if __name__=='__main__': pool = multiprocessing.Pool(2) result = [] for i in range(3): msg = 'hello %s' %i result.append(pool.apply_async(func=func,args=(msg,))) pool.close() pool.join() for res in result: print('***:',res.get()) print('AAAAAAAAll end--')View Code
執行結果
hello : hello 0 Thu Jun 16 15:26:33 2016
hello : hello 1 Thu Jun 16 15:26:33 2016
end Thu Jun 16 15:26:35 2016
hello : hello 2 Thu Jun 16 15:26:35 2016
end Thu Jun 16 15:26:35 2016
end Thu Jun 16 15:26:37 2016
***: donehello 0
***: donehello 1
***: donehello 2
AAAAAAAAll end--
註:get()函數得出每個返回結果的值
例4:使用多個進程池
import multiprocessing import time,os,random def Lee(): print('\nRun task Lee--%s******ppid:%s'%(os.getpid(),os.getppid()),'~~~~',time.ctime()) start = time.time() time.sleep(random.randrange(10)) end = time.time() print('Task Lee,runs %0.2f seconds.'%(end-start),'~~~~',time.ctime()) def Marlon(): print("\nRun task Marlon-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime()) start = time.time() time.sleep(random.random() * 40) end=time.time() print( 'Task Marlon runs %0.2f seconds.' %(end - start),'~~~~',time.ctime()) def Allen(): print( "\nRun task Allen-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime()) start = time.time() time.sleep(random.random() * 30) end = time.time() print( 'Task Allen runs %0.2f seconds.' %(end - start),'~~~~',time.ctime()) def Frank(): print( "\nRun task Frank-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime()) start = time.time() time.sleep(random.random() * 20) end = time.time() print( 'Task Frank runs %0.2f seconds.' %(end - start),'~~~~',time.ctime()) if __name__ == '__main__': func_list = [Lee,Marlon,Allen,Frank] print('parent process id %s'%os.getpid()) pool = multiprocessing.Pool(4) for func in func_list: pool.apply_async(func) #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中 print( 'Waiting for all subprocesses done...') pool.close() pool.join() #調用join之前,一定要先調用close() 函數,否則會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束 print ('All subprocesses done.')View Code
執行結果
parent process id 98552 Waiting for all subprocesses done... Run task Lee--97316******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016 Run task Marlon-95536******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016 Run task Allen-95720******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016 Run task Frank-98784******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016 Task Allen runs 0.31 seconds. ~~~~ Thu Jun 16 15:20:51 2016 Task Lee,runs 7.00 seconds. ~~~~ Thu Jun 16 15:20:57 2016 Task Frank runs 14.48 seconds. ~~~~ Thu Jun 16 15:21:05 2016 Task Marlon runs 31.72 seconds. ~~~~ Thu Jun 16 15:21:22 2016 All subprocesses done.
#coding: utf-8 import multiprocessing def m1(x): print x * x if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count()) i_list = range(8) pool.map(m1, i_list)
一次執行結果
0
1
4
9
16
25
36
49
參考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx
問題:http://bbs.chinaunix.net/thread-4111379-1-1.html
#coding: utf-8 import multiprocessing import logging def create_logger(i): print i class CreateLogger(object): def __init__(self, func): self.func = func if __name__ == '__main__': ilist = range(10) cl = CreateLogger(create_logger) pool = multiprocessing.Pool(multiprocessing.cpu_count()) pool.map(cl.func, ilist) print "hello------------>"
一次執行結果
0
1
2
3
4
5
6
7
8
9
hello------------>