導航: 1、創建進程的兩種方式2、Process的方法3、進程間的通訊1,進程隊列Queue--先進先出4、進程間的通訊2,管道通訊 Pipe5、進程間的數據共用,Manager6、多進程同步問題7、進程池Pool python中多進程可以解決cpython解釋器多線程中GIL存在的問題,可以利用C ...
導航:
1、創建進程的兩種方式
2、Process的方法
3、進程間的通訊1,進程隊列Queue--先進先出
4、進程間的通訊2,管道通訊 Pipe
5、進程間的數據共用,Manager
6、多進程同步問題
7、進程池Pool
python中多進程可以解決cpython解釋器多線程中GIL存在的問題,可以利用CPU的多核資源,實現真的併發效果。操作系統中每個線程有自己的記憶體空間,數據並不共用。
python中使用multiprocessing包提供的介面給我們創建多進程,multiprocessing與threading的使用方法相似。
1、創建進程的兩種方式
1)通過multiprocessing.Process創建
Process(group=None, target=None, name=None, args=(), kwargs={})
- group 線程組,沒什麼用,預設為空就好
- target 要執行的方法
- name 進程的名字
- args/kwargs 執行target方法要傳入的參數
1 def fn(word): 2 # ------子進程的邏輯----- 3 print("父進程", os.getppid()) 4 print("子進程", os.getpid()) 5 print("傳入的參數:", word) 6 7 8 if __name__ == "__main__": 9 p = Process(target=fn, args=("haha",)) # 創建子進程 10 p.start() # 開啟子進程 11 print("主進程", os.getpid()) 12 13 14 # 輸出結果 15 主進程 5916 16 父進程 5916 17 子進程 2936 18 傳入的參數: haha通過Process創建一個子進程
很簡單這樣就可以創建一個子進程了,可以看第3 4 11行列印的進程id可以知道這是不同的進程。
這裡需要說明的是:在所有的進程當中子進程都是由父進程創建出來的,在這個例子中,子進程的的父進程就是主進程,可以看到第3 11行列印的進程id。
在windows下,創建進程一定要放在__main__中,不然會報錯
2)通過繼承Process類創建子進程
1 from multiprocessing import Process 2 import os 3 4 class MyProcess(Process): 5 def __init__(self, name): 6 super().__init__() # 如果重寫了初始化方法,在初始化方法中一定要調用父類的初始化方法 7 self.name = name 8 9 def run(self): 10 # ------子進程的邏輯----- 11 print("父進程", os.getppid()) 12 print("子進程", os.getpid()) 13 print("子進程的名字", self.name) 14 15 16 if __name__ == "__main__": 17 p = MyProcess("haha") # 創建子進程 18 p.start() # 開啟子進程 19 print("主進程", os.getpid()) 20 21 22 # 輸出結果 23 主進程 740 24 父進程 740 25 子進程 7052 26 子進程的名字 haha通過繼承Process創建子進程
通過繼承Process創建子進程需要重寫 run 方法,這是子進程邏輯的入口,當開啟子進程時會自動調用這個方法
上邊這兩種方式中,主進程執行完所有的邏輯後會等待子進程結束在一起結束,與fork函數創建的方式不一樣
2、Process的方法
1)is_alive()
is_alive方法判斷指定對象進程的存活狀態。進程 start 後一直到該進程結束都返回True。進程 start 前 或進程已經結束返回False
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子進程的邏輯----- 6 time.sleep(1) # 模擬子進程執行需要消耗的時間 7 8 9 if __name__ == "__main__": 10 p = Process(target=fn) # 創建子進程 11 print("p.start前-->", p.is_alive()) 12 p.start() # 開啟子進程 13 print("p.start後-->", p.is_alive()) 14 time.sleep(2) # 模擬主進程執行需要消耗的時間,為了確保子進程先結束 15 print("子進程結束後-->", p.is_alive()) 16 17 18 # 輸出結果 19 p.start前--> False 20 p.start後--> True 21 子進程結束後--> Falseis_alive方法
2)join()
join(timeout=None)方法,堵塞當前環境的進程,直到調此方法的進程結束後再繼續往下執行。可設置timeout值,最多堵塞timeout時間(秒)。註意:join方法只能在start()後才可以使用
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子進程的邏輯----- 6 time.sleep(3) # 模擬子進程執行需要消耗的時間 7 print("子進程中-->", time.ctime()) 8 9 10 if __name__ == "__main__": 11 p = Process(target=fn) # 創建子進程 12 p.start() # 開啟子進程 13 print("p.join前-->", time.ctime()) 14 p.join() # 堵塞當前環境的進程,直到調此方法的進程結束後再繼續往下執行 15 print("p.join後-->", time.ctime()) 16 17 18 # 輸出內容 19 p.join前--> Sat Sep 29 16:03:40 2018 20 子進程中--> Sat Sep 29 16:03:43 2018 21 p.join後--> Sat Sep 29 16:03:43 2018join方法
這裡可以看到第15行語句一直等到第7行執行完才輸出
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子進程的邏輯----- 6 time.sleep(2) # 模擬子進程執行需要消耗的時間 7 print("子進程中-->", time.ctime()) 8 9 10 if __name__ == "__main__": 11 p = Process(target=fn) # 創建子進程 12 p.start() # 開啟子進程 13 print("p.join前-->", time.ctime()) 14 p.join(1) # 設置超時1秒 15 print("p.join後-->", time.ctime()) 16 17 18 # 輸出內容 19 p.join前--> Sat Sep 29 16:04:02 2018 20 p.join後--> Sat Sep 29 16:04:03 2018 21 子進程中--> Sat Sep 29 16:04:04 2018join設置timeout值
這裡可以看到第15行語句只堵塞了1秒的時間
3)start(),進程準備就緒,等待cpu的執行(調度)
4)run(),繼承Process類的子類,需要重寫的方法,當進程對象調用 start 方法時自動執行 run 方法,也是進程的入口
5)terminate(),不管進程是否執行完,直接終止進程
6)daemon屬性True/False
與線程的setDeamon()一樣。將該進程對象設置為守護進程,效果:父進程將不再等待子進程,父進程結束時,子進程一起結束。註意:daemon屬性只能在 start() 前設置
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子進程的邏輯----- 6 time.sleep(2) # 模擬子進程執行需要消耗的時間 7 print("----------") 8 9 10 if __name__ == "__main__": 11 p = Process(target=fn) # 創建子進程 12 p.daemon = True 13 p.start() # 開啟子進程 14 print("++++++++++") 15 16 17 # 輸出內容 18 ++++++++++daemon屬性
子進程中的第7行語句並沒有執行,即:子進程在父進程結束時也跟著結束了
3、進程間的通訊1,進程隊列Queue--先進先出
Queue(maxsize=-1),maxsize=-1預設隊列長度沒有最大值,maxsize=5表示隊列長度最大值為5
1)put(obj, block=True, timeout=None)
- obj 添加進隊列的值,可以添加任意類型的值
- block 預設為True,當隊列滿時,繼續添加則發生堵塞,直到隊列get()值出去;block=False,隊列滿時繼續添加不堵塞,但會拋出queue.Full異常
- timeout 堵塞超時,當隊列滿時,繼續添加發生堵塞,堵塞超時timeout秒,超時則會拋出queue.Full異常
1 >>> from multiprocessing import Process, Queue 2 >>> q = Queue(3) # 創建進程隊列,隊列最大長度為3 3 >>> q.put("haha") # 往隊列添加字元串 4 >>> q.put([]) # 往隊列添加列表 5 >>> q.put({}) # 往隊列添加字典 6 >>> q.put(5) # 往隊列添加數字 7 _ # 滿時繼續添加,發生堵塞(游標會一直卡在這) 8 9 >>> q.put(5,block=False) # 往隊列添加數字,不堵塞添加 10 Traceback (most recent call last): 11 File "<stdin>", line 1, in <module> 12 File "C:\D_Program\Python\Python37\lib\multiprocessing\queues.py", line 83, in put 13 raise Full 14 queue.Full 15 16 >>> q.put(5, timeout=3) # 往隊列添加數字,並設置超時3秒 17 Traceback (most recent call last): 18 File "<stdin>", line 1, in <module> 19 File "C:\D_Program\Python\Python37\lib\multiprocessing\queues.py", line 83, in put 20 raise Full 21 queue.Fullput方法簡單使用,這裡還不涉及進程間的通訊
2)get(block=True, timeout=None) 使用方法與put一樣的用法,返回先添加的值
1 >>> q.get() 2 'haha' 3 >>> q.get() 4 [] 5 >>> q.get() 6 {} 7 >>> q.get() 8 _ # 為空時繼續取值,發生堵塞(游標會一直卡在這) 9 10 >>> q.get(timeout=3) 11 Traceback (most recent call last): 12 File "<stdin>", line 1, in <module> 13 File "C:\D_Program\Python\Python37\lib\multiprocessing\queues.py", line 105, in get 14 raise Empty 15 _queue.Emptyget方法簡單使用,這裡還不涉及進程間的通訊
3)qsize() 返回當前隊列的長度
1 >>> from multiprocessing import Queue 2 >>> q = Queue(3) 3 >>> q.put("haha") 4 >>> q.qsize() 5 1 6 >>> q.put(100) 7 >>> q.qsize() 8 2 9 >>> q.get() 10 'haha' 11 >>> q.qsize() 12 1qsize返回當前隊列長度,這裡還不涉及進程間的通訊
4)put_nowait(obj) 相當於put(obj, block=False)
5)get_nowait() 相當於get(block=False)
6)empty() 判斷隊列是否為空,True:空,False:不為空
7)full() 判斷隊列是否已滿,True:已滿,False:未滿
8)close() 關閉隊列,關閉後,將不能添加或取出值
9)通過隊列實現多個進程間的通訊
1 from multiprocessing import Process, Queue 2 import time 3 4 def write_queue(q): 5 for i in range(5): 6 q.put(i) # 往隊列添加值 7 print("put %d" % i) 8 9 10 def read_queue(q): 11 time.sleep(1) # 確保write_queue中隊列已經有值 12 while not q.empty(): 13 s = q.get() # 取出隊列的值 14 print("get %s" % s) 15 16 17 if __name__ == "__main__": 18 q = Queue(5) 19 p1 = Process(target=write_queue, args=(q,)) # 創建子進程 20 p2 = Process(target=read_queue, args=(q,)) # 創建子進程 21 p1.start() # 開啟子進程 22 p2.start() # 開啟子進程 23 24 25 # 輸出結果 26 put 0 27 put 1 28 put 2 29 put 3 30 put 4 31 get 0 32 get 1 33 get 2 34 get 3 35 get 4通過隊列的多進程間通信,target
1 from multiprocessing import Process, Queue 2 import time 3 4 class WriteProcess(Process): 5 def __init__(self, q): 6 super().__init__() 7 self.q = q 8 9 def run(self): 10 print("子進程WriteProcess-->") 11 for i in range(5): 12 self.q.put(i) 13 print("put %d" % i) 14 15 16 class ReadProcess(Process): 17 def __init__(self, q): 18 super().__init__() 19 self.q = q 20 21 def run(self): 22 time.sleep(1) # 確保write_queue中隊列已經有值 23 print("子進程ReadProcess-->") 24 while not self.q.empty(): 25 s = self.q.get() 26 print("get %s" % s) 27 28 29 if __name__ == "__main__": 30 q = Queue(5) 31 p1 = WriteProcess(q) # 創建子進程 32 p2 = ReadProcess(q) # 創建子進程 33 p1.start() # 開啟子進程 34 p2.start() # 開啟子進程 35 36 37 # 輸出結果 38 子進程WriteProcess--> 39 put 0 40 put 1 41 put 2 42 put 3 43 put 4 44 子進程ReadProcess--> 45 get 0 46 get 1 47 get 2 48 get 3 49 get 4通過隊列的多進程間通訊--run方法
4、進程間的通訊2,管道通訊 Pipe
1)Pipe(duplex=True)
Pipe是一個函數,返回元組(Connection(), Connection()). 即返回管道的兩端。預設duplex=True為全雙工模式,duplex=Fasle中第一個Connection只能接收信息,第二個Connection只能發送消息
2)Connection常用方法
- send(obj) 將對象obj發送到管道另一端,發送的數據必須是可序列化的對象。
- recv() 從管道的另一端接收send()發送的數據。沒有數據可接收,將發生堵塞。
- send_bytes(buffer, offset=-1, size=-1) 發送位元組緩衝區,buffer是支持支持位元組緩衝的任意對象,offset為buffer的位元組偏移量(可以當初下標),size為要發送的位元組數。
- recv_bytes(maxlength=-1) 接收send_bytes()發送的一次數據,maxlength指定接收長度,超出這個長度則拋出將引發IOError異常,沒有數據可接收,將發生堵塞。
- poll([timeout]) 返回 True/False,判斷管道內是否有數據可以接收,True:數據可接收。timeout為堵塞的時間秒,timeout=None時一直堵塞,直到有數據可以接收
- close() 關閉鏈接,關閉鏈接後將不能繼續使用管道,當不再使用管道時可將其關閉
1 from multiprocessing import Process, Pipe 2 import time 3 4 class MyProcess1(Process): 5 def __init__(self, con): 6 super().__init__() 7 self.con = con 8 9 def run(self): 10 self.con.send("12345") 11 print("MyProcess1--send-->", "12345") 12 msg = self.con.recv_bytes().decode(encoding="utf-8") 13 print("MyProcess1--recv_bytes-->", msg) 14 15 16 class MyProcess2(Process): 17 def __init__(self, con): 18 super().__init__() 19 self.con = con 20 21 def run(self): 22 msg = self.con.recv() 23 print("MyProcess2--recv-->", msg) 24 self.con.send_bytes("哈哈".encode("UTF-8")) 25 print("MyProcess2--send_bytes-->", "哈哈") 26 27 28 if __name__ == "__main__": 29 con1, con2 = Pipe() 30 p1 = MyProcess1(con1) # 創建子進程 31 p2 = MyProcess2(con2) # 創建子進程 32 p1.start() # 開啟子進程 33 p2.start() # 開啟子進程 34 35 36 # 輸出結果 37 MyProcess1--send--> 12345 38 MyProcess2--recv--> 12345 39 MyProcess2--send_bytes--> 哈哈 40 MyProcess1--recv_bytes--> 哈哈多進程間的管道通訊
5、進程間的數據共用,Manager
Manager提供多進程間的數據共用,Manager內的主要方法有 dict(mapping_or_sequence), list(sequence), Value(typecode, value), Array(typecode, sequence)
1 from multiprocessing import Process, Manager 2 3 def fn(dic, li, a): 4 dic['count'] -= 1 5 print("子進程-->a", a.value) # a 為Value對象,通過a.value將值取出、修改 6 print("子進程-->li", li) 7 8 9 if __name__ == '__main__': 10 with Manager() as manager: 11 dic = manager.dict({'count': 1000}) # 字典 12 li = manager.list([1, 2, 3, 4]) # 列表 13 a = manager.Value("int", 100) # 傳值 14 p = Process(target=fn, args=(dic, li, a)) 15 p.start() 16 p.join() 17 print("主進程-->dic", dic) # 列印子進程修改後的內容 18 19 20 # 輸出結果 21 子進程-->a 100 22 子進程-->li [1, 2, 3, 4] 23 主進程-->dic {'count': 0}多進程數據共用Manager
6、多進程同步問題
當多個進程同時訪問同一個資源時就得涉及到同步的問題,如:向控制台列印數據,必須要確保同一時刻只能有一個進程在列印數據,否則將會出現亂序的效果;多進程同時對一個文件進行讀寫等
1 from multiprocessing import Process, Lock 2 import time 3 4 def fn(lock): 5 # ------子進程的邏輯----- 6 # lock.acquire() # 未加鎖 7 for i in range(3): 8 print("12", end="") 9 time.sleep(0.005) # 為了看到效果,添加的延遲 10 print("34") 11 # lock.release() 12 13 14 if __name__ == "__main__": 15 lock = Lock() 16 for i in range(3): 17 p = Process(target=fn, args=(lock,)) # 創建子進程 18 p.start() # 開啟子進程 19 20 21 # 輸出結果 22 121234 23 1234 24 1234 25 121234 26 1234 27 34 28 1234 29 34 30 1234未加鎖
1 from multiprocessing import Process, Lock 2 import time 3 4 def fn(lock): 5 # ------子進程的邏輯----- 6 lock.acquire() # 加鎖 7 for i in range(3): 8