1. multiprocess模塊 仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由於提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分: ...
1. multiprocess模塊
仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由於提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分,進程同步部分,進程池部分,進程之間數據共用。
1.1 multiprocess.process模塊
1.1.1 process模塊介紹
process模塊是一個創建進程的模塊,藉助這個模塊,就可以完成進程的創建。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)
強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號參數介紹:
group參數未使用,值始終為None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=(1,2,'egon',)
kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
name為子進程的名稱
方法介紹:
p.start():啟動進程,並調用該子進程中的p.run()
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麼也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹:
p.daemon:預設值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(瞭解即可)
p.authkey:進程的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字元串。這個鍵的用途是為涉及網路連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(瞭解即可)
在Windows操作系統中由於沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
1.1.2 使用process模塊創建進程
在一個python進程中開啟子進程,start方法和併發效果。
import time from multiprocessing import Process def f(name): print('hello', name) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() time.sleep(1) print('執行主進程的內容了')
join方法:
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() #p.join() print('我是父進程')
查看主進程和子進程的進程號:
import os from multiprocessing import Process def f(x): print('子進程id :',os.getpid(),'父進程id :',os.getppid()) return x*x if __name__ == '__main__': print('主進程id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()
進階,多個進程同時運行(註意,子進程的執行順序不是根據啟動順序決定的)
多個進程同時運行:
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p)
多個進程同時運行,再談join方法(1):
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst] print('父進程在執行')
多個進程同時運行,再談join方法(2):
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) # [p.join() for p in p_lst] print('父進程在執行')
除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式。
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() #start會自動調用run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主線程')
進程之間的數據隔離問題:
from multiprocessing import Process def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主進程內: ',n)
1.1.3 守護進程
會隨著主進程的結束而結束。
主進程創建守護進程:
1.守護進程會在主進程代碼執行結束後就終止
2.守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
註意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
守護進程的啟動:
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行 p.start() time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id print('主')
主進程代碼執行結束守護進程立即結束:
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#列印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的列印信息123,因為主進程列印main----時,p1也執行了,但是隨即被終止.
1.1.4 socket聊天併發實例
server:
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程一定要寫到這下麵 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()
client:
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
1.1.4 多進程中的其他方法
進程對象的其他方法:terminate,is_alive:
from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網紅臉聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活 print(p1.is_alive()) #結果為True print('開始') print(p1.is_alive()) #結果為False
進程對象的其他屬性:pid和name:
class Myprocess(Process): def __init__(self,person): self.name=person # name屬性是Process中的屬性,標示進程的名字 super().__init__() # 執行父類的初始化方法會覆蓋name屬性 #self.name = person # 在這裡設置就可以修改進程名字了 #self.person = person #如果不想覆蓋進程名,就修改屬性名稱就可以了 def run(self): print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) time.sleep(random.randrange(1,5)) print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) p1=Myprocess('哪吒') p1.start() print(p1.pid) #可以查看子進程的進程id
1.2 進程同步(multiprocess.Lock)
1.2.1 鎖--multiprocess.Lock
通過剛剛的學習,我們千方百計實現了程式的非同步,讓多個任務可以同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。儘管併發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
多進程搶占輸出資源:
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start()
使用鎖維護執行順序:
# 由併發變成了串列,犧牲了運行效率,但避免了競爭 import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start()
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程式又重新變成串列了,這樣確實會浪費了時間,卻保證了數據的安全。
接下來,我們以模擬搶票為例,來看看數據安全的重要性。
多進程同時搶購餘票:
#文件db的內容為:{"count":1} #註意一定要用雙引號,不然json無法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀數據的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網路延遲 json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task) p.start()
使用鎖來保證數據安全:
#文件db的內容為:{"count":5} #註意一定要用雙引號,不然json無法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀數據的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫數據的網路延遲 json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共用數據實現進程間通信,但問題是:
1.效率低(共用數據基於文件,而文件是硬碟上的數據)
2.需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
1.效率高(多個進程共用一塊記憶體的數據)
2.幫我們處理好鎖問題
這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於記憶體中
隊列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共用數據,儘可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
1.3 隊列(multiprocess.Queue)
1.3.1 隊列
(1) 概念介紹
創建共用的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
Queue([maxsize])
創建共用的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
方法介紹:
Queue([maxsize])
創建共用的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,預設為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。
其他方法(瞭解):
q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動連接後臺線程。這可以防止join_thread()方法阻塞。
q.join_thread()
連接隊列的後臺線程。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
(2) 代碼實例
''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,但是隊列介面 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果隊列已經滿了,程式就會停在這裡,等待數據被別人取走,再將數據放入隊列。 # 如果隊列中的數據一直不被取走,程式就會永遠停在這裡。 try: q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丟掉這個消息。 print('隊列已經滿了') # 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。 print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一樣,如果隊列已經空了,那麼繼續取就會出現阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。 print('隊列已經空了') print(q.empty()) #空了
上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。
子進程發送數據給父進程:
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello']) #調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。 if __name__ == '__main__': q = Queue() #創建一個Queue對象 p = Process(target=f, args=(q,)) #創建一個進程 p.start() print(q.get()) p.join()
上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微複雜一些的例子:
批量生產數據放入隊列再批量獲取結果:
import os import time import multiprocessing # 向queue中輸入數據的函數 def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中輸出數據的函數 def outputQ(queue): info = queue.get() print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info)) # Main if __name__ == '__main__': multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入進程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 輸出進程 for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()
1.4 進程之間的數據共用
展望未來,基於消息傳遞的併發編程是大勢所趨
即便是使用線程,推薦做法也是將程式設計為大量獨立的線程集合,通過消息隊列交換數據。
這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分散式系統中。
但進程間應該儘量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。
以後我們會嘗試使用資料庫來解決現在進程之間的數據共用問題。
Manager模塊介紹:
進程間數據是獨立的,可以藉助於隊列或管道實現通信,二者都是基於消息傳遞的
雖然進程間數據獨立,但可以通過Manager實現數據共用,事實上Manager的功能遠不止於此A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager例子:
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操作共用的數據,肯定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
1.5 進程池和multiprocess.Pool模塊
1.5.1 進程池
為什麼要有進程池,進程池的概念?
在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程麽?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束進程。那麼我們要怎麼做呢?
在這裡,要給大家介紹一個進程池的概念,定義一個池子,在裡面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現併發效果。
1.5.2 multiprocess.Pool模塊
(1) 概念介紹
Pool([numprocess [,initializer [, initargs]]]):創建進程池
參數介紹:
numprocess:要創建的進程數,如果省略,將預設使用cpu_count()的值
initializer:是每個工作進程啟動時要執行的可調用對象,預設為None
initargs:是要傳給initializer的參數組
主要方法:
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。 '''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數併發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。''' p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用
其他方法(瞭解):
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
obj.ready():如果調用完成,返回True
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數