python提供了一個跨平臺的多進程支持——multiprocessing模塊,其包含Process類來代表一個進程對象 1、Process語法結構:(註: 傳參的時候一定使用關鍵字傳參) 2、自定義進程類:需要繼承Process類 自定義類的時候必須註意的事項: 第一,必須繼承Process類的構 ...
python提供了一個跨平臺的多進程支持——multiprocessing模塊,其包含Process類來代表一個進程對象
1、Process語法結構:(註: 傳參的時候一定使用關鍵字傳參)
2、自定義進程類:需要繼承Process類 自定義類的時候必須註意的事項: 第一,必須繼承Process類的構造方法 第二,必須重寫Process類的run()方法 第三,不能用實例化對象直接調用run()方法,而是調用start()方法 第四,在進程改變實例化對象的數據時,這個數據是被隔離的,即改變數據不成功
# 自定義進程類 class ProcessClass(Process): g_num = 100 def __init__(self, interval): # 這行代碼必須添加上 super().__init__() self.interval = interval self.result = "初始化" def run(self): global g_num g_num = 120 print("子進程{},開始執行,父進程為{}".format(os.getpid(), os.getppid())) start = time.time() time.sleep(2) stop = time.time() print("{}執行結束,耗時{:0.2f}秒".format(os.getpid(), stop-start)) self.result = "運行之後的結果" if __name__ == "__main__": t_start = time.time() print("當前進程{}".format(os.getpid())) p = ProcessClass(2) p.start() p.join() t_stop = time.time() # 數據隔離 print("子進程 任務 運行結果:", p.result) # -----> 初始化 數據未改變 print(ProcessClass.g_num) # ------>100 數據未改變 print("(%s)執行結束,耗時%0.2f" % (os.getpid(), t_stop - t_start))
3、進程間的通信 像之前改變數據,就要使用到進程間的通信,可以使用multiprocessing模塊的Queue類來實現進程間的通信 Queue的常用方法: qsize(): 返回當前隊列包含的消息(數據)數量 empty():如果隊列為空,返回True,否則False full():如果隊列滿了,返回True,否則False get() 或者 put()分別時阻塞式獲取 或者 阻塞式存儲隊列的一條消息(數據),然後獲取 或者 添加這條消息,如果隊列為空 或者 隊列滿了,在運行的過程阻塞 get_nowait() 或者 put_nowait()分別時非阻塞式獲取 或者 非阻塞式存儲隊列的一條消息(數據),然後移除 和 添加這條消息,如果隊列為空 或者 隊列滿了,會拋出相應的異常
實例如下:
# 自定義進程類 class ProcessClass(Process): g_num = 100 def __init__(self, interval, q): # 這行代碼必須添加上 super().__init__() self.interval = interval self.result = "初始化" # 初始化一個隊列實例化對象 self.q = q def run(self): global g_num g_num = 120 print("子進程{},開始執行,父進程為{}".format(os.getpid(), os.getppid())) start = time.time() time.sleep(self.interval) stop = time.time() print("{}執行結束,耗時{:0.2f}秒".format(os.getpid(), stop-start)) self.result = "運行之後的結果" # 將消息(數據)添加到隊列中 self.q.put(g_num) self.q.put(self.result) def get_data(self): return self.result if __name__ == "__main__": # 初始化一個隊列實例化對象,參數為隊列的長度 queues = Queue(5) print("當前進程{}".format(os.getpid())) p = ProcessClass(2, queues) t_start = time.time() p.start() p.join() t_stop = time.time() # 數據隔離 print("子進程 任務 運行結果:", p.get_data()) # -----> 初始化 數據未改變 print(ProcessClass.g_num) # ------>100 數據未改變 print("子進程 任務 運行結果:", queues.get()) # -----> 120 數據未改變 print("子進程 任務 運行結果:", queues.get()) # -----> 運行之後的結果 數據未改變 print("(%s)執行結束,耗時%0.2f" % (os.getpid(), t_stop - t_start))4、進程池: 如果進程有成千上萬個,手動創建進程的工作量巨大,這個時候應該用到multiprocessing模塊中的Pool類 這個類下有幾個方法比較常用: apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行 ,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給 func的參數列表,kwds為傳遞給func的關鍵字參數列表; apply(func[, args[, kwds]]):使用阻塞方式調用func close():關閉Pool,使其不再接受新的任務; terminate():不管任務是否完成,立即終止; join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用 主要實例:
# 進程池 def worker(msg): start = time.time() print("{}開始執行,進程號為{}".format(msg, os.getpid())) # random.random()生成0~1之間的隨機數 time.sleep(random.random()*3) # time.sleep(3) stop = time.time() print(msg, "執行完畢,耗時{:.2f}".format(stop-start)) if __name__ == "__main__": # 定義一個進程池,最大數為3 po = Pool(3) for i in range(10): # 非阻塞式操作Pool.apply_async(要調用的目標,(傳遞給目標的參數元組,)) # 每次迴圈將會用空閑出來的子進程去調用目標 po.apply_async(worker, (i, )) # 阻塞式操作 # po.apply(worker, (i, )) print("start".center(24, "-")) po.close() po.join() print("end".center(24, "-"))