1.管道 進程間通信(IPC)方式二:管道(不推薦使用,瞭解即可),埠易導致數據不安全的情況出現。 2.共用數據 進程之間數據共用的模塊之一Manager模塊(少用): 進程間數據是獨立的,可以藉助於隊列或管道實現通信,二者都是基於消息傳遞的雖然進程間數據獨立,但可以通過Manager實現數據共用 ...
1.管道
進程間通信(IPC)方式二:管道(不推薦使用,瞭解即可),埠易導致數據不安全的情況出現。
1 from multiprocessing import Pipe,Process 2 3 4 def func(conn1,conn2): 5 msg = conn1.recv() # 接收了conn2傳遞的 6 # msg1 = conn2.recv() # 接收了conn1傳遞的 7 print('>>>',msg) 8 # print('>>>',msg1) 9 10 11 if __name__ == '__main__': 12 # 拿到管道的兩端,雙工通信方式,兩端都可以收發消息 13 conn1,conn2 = Pipe() # 必須在Process之前產生管道 14 p = Process(target=func,args=(conn1,conn2,)) # 管道給子進程 15 p.start() 16 conn1.send('hello') 17 conn1.close() 18 conn2.send('小子') 19 conn2.close() 20 21 print('進程結束') 22 23 # 註意管道不用了就關閉防止異常
2.共用數據
進程之間數據共用的模塊之一Manager模塊(少用):
進程間數據是獨立的,可以藉助於隊列或管道實現通信,二者都是基於消息傳遞的雖然進程間數據獨立,但可以通過Manager實現數據共用:
1 from multiprocessing import Manager,Process,Lock 2 3 4 def func1(dic,loc): 5 # loc.acquire() # 不加鎖易出錯 6 dic['num'] -= 1 7 # loc.release() 8 9 10 if __name__ == '__main__': 11 m = Manager() 12 loc = Lock() 13 dic = m.dict({'num':100}) 14 p_list = [] 15 for i in range(100): 16 p = Process(target=func1, args=(dic,loc)) 17 p_list.append(p) 18 p.start() 19 20 [pp.join() for pp in p_list] 21 22 print('>>>>>',dic['num']) 23 # 共用時不加鎖,很可能導致同一個數據被多個子進程取用,數據是不安全的,且超多進程消耗大量資源易導致卡死.基於Manager的數據共用
多進程共同去處理共用數據的時候,就和我們多進程同時去操作一個文件中的數據是一樣的,不加鎖就會出現錯誤的結果,進程不安全的,所以也需要加鎖。
總結:進程間應該儘量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。
3.進程池 Pool
創建進程需要消耗時間,銷毀進程(空間,變數,文件信息等等的內容)也需要消耗時間。開啟成千上萬的進程,操作系統無法讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還需要進行切換並且記錄每個進程的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是操作系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束進程,這就需要用到進程池:
定義一個池子,在裡面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現併發效果。
創建方法:
Pool([numprocess [,initializer [, initargs]]]):創建進程池
參數介紹:
1 numprocess:要創建的進程數,如果省略,將預設使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,預設為None 3 initargs:是要傳給initializer的參數組
常用方法:
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。 '''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數併發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。''' p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用 主要方法介紹
1 import time 2 from multiprocessing import Process,Pool 3 4 5 def func1(i): 6 num = 0 7 for j in range(5): 8 num += i 9 10 11 if __name__ == '__main__': 12 pool = Pool(6) # 創建進程池 13 14 p_list = [] 15 start_time = time.time() 16 for i in range(500): 17 p = Process(target=func1,args=(i,)) 18 p_list.append(p) 19 p.start() 20 21 [pp.join() for pp in p_list] 22 end_time = time.time() 23 print('耗時:',end_time-start_time) 24 25 s_time = time.time() 26 pool.map(func1,range(500)) # map 27 e_time = time.time() 28 print('耗時:',e_time - s_time) # 耗時遠遠小於直接開500進程進程池 簡單應用
apply同步方法:
1 from multiprocessing import Process,Pool 2 import time 3 4 5 def func1(i): 6 num = 0 7 for j in range(3): 8 num += i 9 time.sleep(1) 10 print(num) 11 return num 12 13 14 if __name__ == '__main__': 15 pool = Pool(6) 16 17 for i in range(10): 18 res = pool.apply(func1,args=(i,)) # apply 進程同步/串列方法 效率低,不常用 19 # print(res)apply 進程同步/串列方法
apply_async非同步方法:
1 from multiprocessing import Process,Pool 2 import time 3 4 5 def func1(i): 6 num = 0 7 for j in range(5): 8 num += i 9 time.sleep(1) 10 # print('>>>>>',num) 11 return num 12 13 14 if __name__ == '__main__': 15 pool = Pool(6) 16 17 red_list = [] 18 for i in range(10): 19 res = pool.apply_async(func1,args=(i,)) 20 red_list.append(res) 21 22 pool.close() # 不是關閉,只是鎖定進程池,告訴主進程不會再添加數據進去 23 pool.join() # 等待子程式執行完 24 25 for ress in red_list: 26 print(ress.get()) # get方法取出返回值num 按添加順序取出已保存在緩存區的結果 所以是順序列印出的View Code
回調函數:運用時註意一點,回調函數的形參執行有一個,如果你的執行函數有多個返回值,那麼也可以被回調函數的這一個形參接收,接收的是一個元祖,包含著你執行函數的所有返回值。
1 from multiprocessing import Pool,Process 2 import time,os 3 4 5 def func1(n): 6 # print('子進程的pid:',os.getpid()) 7 return n*n 8 9 10 def func2(i): 11 res = i**2 12 # print('callback的pid:',os.getpid()) 13 print(res) 14 return res 15 16 17 if __name__ == '__main__': 18 pool = Pool(4) 19 pool.apply_async(func1,args=(3,),callback=func2) # callback把前面的返回值作參數傳給後面 20 # print('主進程的pid:',os.getpid()) # 主進程執行了callback 21 pool.close() 22 pool.join()回調函數 callback
4.總結
進程之間的通信:隊列、管道、數據共用也算
信號量和事件也相當於鎖,也是全局的,所有進程都能拿到這些鎖的狀態,進程之間這些鎖啊信號量啊事件啊等等的通信,其實底層還是socekt,只不過是基於文件的socket通信,而不是跟上面的數據共用啊空間共用啊之類的機制,我們之前學的是基於網路的socket通信,socket的兩個家族,一個文件的一個網路的,所以如果說這些鎖之類的報錯,可能你看到的就是類似於socket的錯誤。工作中常用的是鎖,信號量和事件不常用,但是信號量和事件面試的時候會問到(做瞭解)