"Python3 多進程編程(Multiprocess programming)" "為什麼使用多進程" "具體用法" "Python多線程的通信" "進程對列Queue" "生產者消費者問題" "JoinableQueue" "Queue實例" "管道Pipe" Python3 多進程編程(Mul ...
- Python3 多進程編程(Multiprocess programming)
- Python多線程的通信
- 進程對列Queue
-
Python3 多進程編程(Multiprocess programming)
為什麼使用多進程
python中的多線程其實並不是真正的多線程,不能充分地使用多核CPU的資源,此時需要使用需要使用多進程解決問題。
具體用法
Python中的多進程是通過multiprocessing
模塊來實現的,和多線程的threading.Thread
類似,利用multiprocessing.Process
來創建一個進程對象。進程對象的方法和線程對象的方法類似,也有start(), join()等。
直接啟用
代碼實例
import multiprocessing from time import sleep def clock(interval): i = 0 while i<5: i += 1 print(f"Run >>> {i}") sleep(interval) print("Ending!") if __name__ == '__main__': p = multiprocessing.Process(target = clock, args = (1,)) p.start() p.join()
運行結果
Run >>> 1 Run >>> 2 Run >>> 3 Run >>> 4 Run >>> 5 Ending!
繼承自
multiprocessing.Process
調用與多線程使用方法類似
- 直接繼承Process
- 重寫run函數
- 類實例可以直接運行
代碼實例
import multiprocessing from time import sleep class ClockProcess(multiprocessing.Process): def __init__(self, interval): super().__init__() self.interval = interval def run(self): i = 0 while i<5: i += 1 print(f"Run >>> {i}") sleep(self.interval) print("Ending!") if __name__ == '__main__': p = ClockProcess(1) p.start() p.join()
運行結果
Run >>> 1 Run >>> 2 Run >>> 3 Run >>> 4 Run >>> 5 Ending!
守護進程
- 設置該進程為守護進程,即認為此進程不重要,主進程結束後,該進程隨即結束。
- 用法
Process.daemon = Ture
未使用守護進程
import multiprocessing from time import sleep def clock(interval): i = 0 while i<5: i += 1 print(f"Run >>> {i}") sleep(interval) print("Ending!") def run(): p = multiprocessing.Process(target = clock, args = (1,)) p.start() if __name__ == '__main__': run() sleep(2) print("ENDING!")
運行結果
Run >>> 1 Run >>> 2 ENDING! Run >>> 3 Run >>> 4 Run >>> 5 Ending!
使用守護進程
import multiprocessing from time import sleep def clock(interval): i = 0 while i<5: i += 1 print(f"Run >>> {i}") sleep(interval) print("Ending!") def run(): p = multiprocessing.Process(target = clock, args = (1,)) p.daemon = True p.start() if __name__ == '__main__': run() sleep(2) print("ENDING!")
運行結果
Run >>> 1 Run >>> 2 ENDING!
Python多線程的通信
進程是系統獨立調度核分配系統資源的基本單位,進程之間是相互獨立的,進程之間的數據也不能共用,這是多進程在使用中與多線程最明顯的區別。
所以要使用多進程來彌補Python中多線程的不足,解決多進程之間的通信時關鍵。
進程對列Queue
在python多進程中,Queue其實就是進程之間的數據管道,實現進程通信。
生產者消費者問題
倉庫(固定大小的中間緩衝區)
生產者
持續生產數據傳入倉庫
消費者
持續從倉庫總提取數據
在實際運行時會產設的問題。生產者的主要作用是生成一定量的數據放到倉庫中,然後重覆此過程。
與此同時,消費者也在倉庫消耗這些數據。該問題的關鍵就是要保證生產者不會在倉庫滿時加入數據,消費者也不會在倉庫空時消耗數據。
JoinableQueue
JoinableQueue同樣通過multiprocessing使用。
JoinableQueue([maxsize]):就是一個Queue對象,但允許項目的使用者通知生成者項目已經被成功處理。
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
JoinableQueue與Queue對象的方法一致,且之外還具有:
JoinableQueue.task_done()
:使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常。JoinableQueue.join()
:生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用JoinableQueue.task_done()
方法為止。
Queue實例
使用JoinableQueue實現生產者消費者模型
代碼
import multiprocessing from time import ctime,sleep def consumer(input_q): print("消費開始:", ctime()) while True: # 處理項 item = input_q.get() print ("消費 >>>>>>>>>", item) # 此處替換為有用的工作 input_q.task_done() # 發出信號通知任務完成 sleep(1) print ("消費結束:", ctime()) ##此句未執行,因為q.join()收集到四個task_done()信號後,主進程啟動,未等到print此句完成,程式就結束了 def producer(sequence, output_q): print ("生產開始:", ctime()) for item in sequence: output_q.put(item) print ("生產 >>>>>>>>>", item) sleep(1) print ("生產結束:", ctime()) if __name__ == '__main__': q = multiprocessing.JoinableQueue() # 運行消費者進程 cons_p = multiprocessing.Process (target = consumer, args = (q,)) cons_p.daemon = True cons_p.start() # 生產多個項,sequence代表要發送給消費者的項序列 # 在實踐中,這可能是生成器的輸出或通過一些其他方式生產出來 sequence = [1,2,3,4] producer(sequence, q) # 等待所有項被處理 q.join()
運行結果
生產開始: Wed Oct 16 22:06:11 2019 生產 >>>>>>>>> 1 消費開始: Wed Oct 16 22:06:11 2019 消費 >>>>>>>>> 1 生產 >>>>>>>>> 2 消費 >>>>>>>>> 2 生產 >>>>>>>>> 3 消費 >>>>>>>>> 3 生產 >>>>>>>>> 4 消費 >>>>>>>>> 4 生產結束: Wed Oct 16 22:06:15 2019
管道Pipe
待續