創建進程,守護進程,進程鎖Lock,進程通信IPC,JoinableQueue模塊簡介,進程池 ...
第一種創建進程的方式:
from multiprocessing import Process def f(name): print(name,"在子進程") if __name__ == "__main__": p = Process(target=f,args=("aaa",)) p.start() print("執行主進程內容") # 列印內容如下 執行主進程內容 aaa 在子進程
從列印結果我們可以看出程式先執行了主進程的print之後才執行了子進程的print。這裡主要是因為操作系統在開闢進程時需要花費一定的時間,所以程式在這段時間里,先執行了主進程的print,然後才執行子進程print。
第二種方式創建進程:
from multiprocessing import Process class MyProcess(Process): # 這裡必須要調用Process中的init初始化參數 # 否則會因為無法傳參導致錯誤 def __init__(self,name): super().__init__() # 必須有 self.name = name def run(self): print(f"我是子進程{self.name}") if __name__ == "__main__": p = MyProcess("aaa") p.start() print("我是主進程") # 列印內容如下 我是主進程 我是子進程aaa
可以在創建子進程後使用join的方法,使程式等待子進程結束後在執行join下麵的代碼。
from multiprocessing import Process def f(name): print("我是子進程",name) if __name__ == "__main__": p = Process(target=f,args=("aaa",)) p.start() p.join() # 子進程結束後在,執行下麵的代碼 print("我是主進程") # 列印內容如下 我是子進程 aaa 我是主進程
使用os模塊查看進程PID號。
from multiprocessing import Process import os def f(): print(f"父進程PID:{os.getppid()},子進程PID:{os.getpid()}") if __name__ == "__main__": p = Process(target=f,args=()) p.start() p.join() # 子進程結束後在,執行下麵的代碼 print("主進程內容") # 列印內容如下 父進程PID:1588,子進程PID:3292 主進程內容
執行多個進程:
from multiprocessing import Process def f(process_name): print(f"{process_name}") if __name__ == "__main__": for i in range(3): p = Process(target=f, args=("子進程-"+str(i),)) p.start() print("主進程") # 列印內容如下 主進程 子進程-0 子進程-1 子進程-2
我們會發現主進程比所有子進程都優先執行了,如果我們想要在執行完所有子進程在執行父進程該怎麼辦呢?沒錯是使用join
示例一:
from multiprocessing import Process import time def f(process_name): print(f"{process_name}") if __name__ == "__main__": start_time = time.time() for i in range(3): p = Process(target=f, args=("子進程-"+str(i),)) p.start() p.join() end_time = time.time() print(f"執行了{end_time - start_time}") # 列印內容如下 子進程-0 子進程-1 子進程-2 執行了0.4480257034301758
從列印結果我們可以看出是所有子進程運行後才執行了主進程。但是發現會很慢,這是因為我們的join把原本應多進程同時運行的程式(非同步),變成了同步,必須等待一個子進程結束後才會執行下一個子進程。這樣就違背了我們多進程同時執行的初衷,所以我們的join不能放在那個位置。
下麵請看示例二:
from multiprocessing import Process import time def f(process_name): print(f"{process_name}") if __name__ == "__main__": start_time = time.time() pro_list = [] for i in range(3): p = Process(target=f, args=("子進程-"+str(i),)) p.start() pro_list.append(p) # 將進程對象添加到一個列表中 for i in pro_list: # 迴圈等待所有進程結束 i.join() end_time = time.time() print(f"執行了{end_time - start_time}") # 列印內容如下 子進程-1 子進程-2 子進程-0 執行了0.18201017379760742
對比示例一和示例二我們可以明顯發現示例二真正實現了多個進程的併發效果。
進程的創建(第二種方式創建,很少有人用)
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(f"子進程-{self.name},PID:{os.getpid()}") if __name__ == "__main__": p1 = MyProcess("aaa") p2 = MyProcess("bbb") p3 = MyProcess("ccc") p1.start() p2.start() p3.start() print("主線程") # 列印內容如下 子進程-aaa,PID:7360 子進程-bbb,PID:6956 子進程-ccc,PID:4912
雖然不常用,但是最好知道有這種方式可以創建線程。
守護進程:
有兩個特性:
1、守護進程會在主進程代碼執行結束後就終止。
2、守護進程內無法再開啟子進程,否則拋出異常。
創建守護進程比較簡單如下:
from multiprocessing import Process def f(): print("守護進程") if __name__ == "__main__": p = Process(target=f,args=()) p.daemon=True # 一定要在start前執行daemom=True p.start() print("主進程") # 列印內容如下 主進程
我們發現守護進程並沒有被執行,或者說還沒來得及執行就結束了,我們知道操作系統在開啟進程時要花費一定時間,在這個時間內主進程代碼執行完了,所以守護進程還沒來得及執行就結束了。可以使用join來等待守護進程執行完畢後在結束主進程。
from multiprocessing import Process def f(): print("守護進程") if __name__ == "__main__": p = Process(target=f,args=()) p.daemon=True # 一定要在start前執行daemom=True p.start() p.join() # 等待守護進程結束 print("主進程") # 列印內容如下 守護進程 主進程
進程鎖:
為保證數據的安全性,在有些場合要使用進程鎖,進程鎖會使由原來的並行變成串列,程式效率會下降,但是卻保證了數據的安全性,在數據安全性和程式效率面前,數據的安全性是大於程式的效率的。
下麵以搶票為例,現在票數還有一張:
from multiprocessing import Process import time,json def search(name): # 查票 di = json.load(open("db")) print(f"{name}查票,剩餘票數{di['count']}") def get(name): # 購票 di = json.load(open("db")) time.sleep(0.1) if di["count"] > 0: di["count"] -= 1 time.sleep(0.2) json.dump(di,open("db","w")) print(f"{name}購票成功") def task(name): search(name) get(name) if __name__ == "__main__": for i in range(5): # 只模擬5個人搶一張票 p = Process(target=task,args=("游客-"+str(i),)) p.start() # 列印結果如下 游客-2查票,剩餘票數1 游客-1查票,剩餘票數1 游客-0查票,剩餘票數1 游客-4查票,剩餘票數1 游客-3查票,剩餘票數1 游客-2購票成功 游客-1購票成功 游客-0購票成功 游客-4購票成功 游客-3購票成功
所有人全部購票成功,這就對數據的安全性提出了挑戰。本來只有一張票,但是5個人都顯示購票成功,這當然不是我們想要的結果,問題的原因在於,所有的游客在差不多同一時間都進行了購票,大家看到的票數都是1張,第一個用戶購票後,將票數減1等於0還沒來得及將結果寫入文件,其它用戶也進行了購票的操作,在餘票0被寫入文件的過程中,其它用戶也購票成功,並將結果寫入文件,造成了數據的混亂。
這裡我們使用進程鎖Lock也叫互斥鎖,來解決問題。
from multiprocessing import Process,Lock import time,json def search(name): # 查票 di = json.load(open("db")) print(f"{name}查票,剩餘票數{di['count']}") def get(name): # 購票 di = json.load(open("db")) time.sleep(0.1) if di["count"] > 0: di["count"] -= 1 time.sleep(0.2) json.dump(di,open("db","w")) print(f"{name}購票成功") def task(name,lock): search(name) # 查票 lock.acquire() # 加鎖 get(name) # 購票 lock.release() # 解鎖 if __name__ == "__main__": lock = Lock() # 獲取鎖 for i in range(5): # 只模擬5個人搶一張票 p = Process(target=task,args=("游客-"+str(i),lock)) p.start() # 列印內容如下 游客-0查票,剩餘票數1 游客-1查票,剩餘票數1 游客-2查票,剩餘票數1 游客-3查票,剩餘票數1 游客-4查票,剩餘票數1 游客-0購票成功
在購票時加一個互斥鎖,這樣一個進程在購票時,其它的進程只能查看就不能進行購票的操作了,保證了數據的安全性,最終結果是正確的,這就是為什麼我們明明看到有票,但是點擊購買後卻說沒票的原因,雖然加鎖後使原本並行的程式,變成了串列。但我們要知道在不能保證數據安全的情況下一切效率都是空談。
進程間的通信IPC(Inter-Process Communication)隊列
隊列:Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
隊列的常用方法:
Queue([maxsize])
創建共用的進程隊列。maxsize是隊列中允許的最大數值。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果隊列為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,預設為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( ) 同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動連接後臺線程。這可以防止join_thread()方法阻塞。
q.join_thread()
連接隊列的後臺線程。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
下麵我們已生產者消費者模型來進行演示:
from multiprocessing import Process,Queue import time,random def consumer(name,q): # 消費者 while True: task = q.get() # 從隊列中取出數據 if task == None:break print(f"{name}獲取數據{task}") time.sleep(random.random()) # 消費者效率比生產者效率高 def producer(name,q): # 生產者 for i in range(3): q.put(i) # 向對列中添加數據 print(f"{name}生產數據{i}") time.sleep(random.uniform(1,2)) # 模擬生產者的效率沒有消費者效率高 if __name__ == "__main__": q = Queue() # 獲取一個隊列 pro = [] for i in range(3): # 開啟生產者進程 p = Process(target=producer,args=("生產者"+str(i),q)) p.start() pro.append(p) # 開啟消費者進程 p1 = Process(target=consumer,args=("aaa",q)) p2 = Process(target=consumer,args=("bbb",q)) p1.start() p2.start() for i in pro: # 等待生產者結束 i.join() q.put(None) # 有幾個消費者進程,就put幾次None q.put(None)
JoinableQueue([maxsize]) 模塊
創建可連接的共用進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共用的信號和條件變數來實現的。
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
下麵的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
我們在來實現上述的生產者消費者模型。
from multiprocessing import Process,JoinableQueue import time,random def consumer(name,q): # 消費者 while True: task = q.get() # 從隊列中取出數據 q.task_done() # 通知生產者,我已經取完所有數據了 print(f"{name}獲取數據{task}") time.sleep(random.random()) # 消費者效率比生產者效率高 def producer(name,q): # 生產者 for i in range(1): q.put(i) # 向對列中添加數據 print(f"{name}生產數據{i}") time.sleep(random.uniform(1,2)) # 模擬生產者的效率沒有消費者效率高 q.join() # 生產完畢,等待消費者通知數據已經獲取完了 if __name__ == "__main__": q = JoinableQueue() # 獲取一個隊列 pro = [] for i in range(1): # 開啟生產者進程 p = Process(target=producer,args=("生產者"+str(i),q)) p.start() pro.append(p) # 開啟消費者進程 p1 = Process(target=consumer,args=("aaa",q)) p2 = Process(target=consumer,args=("bbb",q)) p1.daemon=True # 如果不設置守護進程,這兩個進程就不會結束。 p2.daemon=True # 因為他們只是通知生產者我接收到所有數據了,並沒有終止迴圈。 p1.start() p2.start() for i in pro: # 等待生產者結束 i.join()
這裡再次說明將消費者設置成守護進程的原因,q.task_done它只是通知生產者,我把數據已經都取完了,僅此而已,所以while迴圈並不會退出。如果不設置守護進程,程式會卡在while迴圈里。
進程池
進程池就是預先創建一個進程組,然後有任務時從池中分配一個進程去執行任務。當任務數量超過進程池的數量時,就必須等待進程池中有空閑的進程時,才能利用空閑的進程去執行任務。
進程池的優點:
1、充分利用CPU資源。
2、多個進程在同一時刻可以同時執行,達到了並行的效果。
進程池的缺點:進程的創建、銷毀需要耗費CPU的時間。多進程適用於需要複雜計算少I/O阻塞的情況。如果程式不涉及複雜運算,最好是使用線程池。
關於進程池multiprocessing.Pool的一些方法
apply(func [, args [, kwargs]]):
func(*args,**kwargs),然後返回結果。
需要註意的是:apply屬於進程同步的操作,即必須等待一個進程結束後才能執行下一個進程。
apply_async(func [, args [, kwargs]]):
func(*args,**kwargs),然後返回結果。
apply_async屬於進程的非同步操作,所有進程可以同時執行,此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將立即傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。
p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用
同步進程池的示例:
import os,time from multiprocessing import Pool def work(n): print("PID:%s run" %os.getpid()) time.sleep(1) return n ** 2 if __name__ == "__main__": p = Pool(3) # 開啟進程池 res = [] for i in range(3): res.append(p.apply(work,args=(i,))) # 進程同步模式 print(res) # 列印返回結果 # 列印內容如下 PID:6180 run PID:9728 run [0, 1, 4]
因為是進程池的同步,所以進程時的執行順序是有序的,並且必須一個進程執行後才執行下一個進程。
進程池的非同步示例:
import os,time from multiprocessing import Pool def work(n): print("PID:%s run" %os.getpid()) time.sleep(1) return n ** 2 if __name__ == "__main__": p = Pool(3) # 開啟進程池 res = [] for i in range(5): # 進程非同步模式 res.append(p.apply_async(work,args=(i,))) # 因為是非同步,所以開進程會很快 # 所以我們所有進程結束後在列印結果 p.close() # 關閉進程池 p.join() # 等待進程池結束, for i in res: print(i.get(),end=" ") # 列印內容如下 PID:7512 run PID:10176 run PID:7240 run PID:10176 run PID:7512 run 0 1 4 9 16
關於進程池有個問題需要註意,在子進程中不能使用input函數()