Python併發編程之多進程 一、什麼是進程 進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。 進程是資源分配的基本單位 進程有:代碼段,數據段,進程式控制制塊(PCB)組成 二、進程與程式的區別 程式僅僅只是一堆代碼而已,而進程指的是程式的運行過程。 舉例: 想象一位有一手好廚藝的計 ...
Python併發編程之多進程
一、什麼是進程
進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。
進程是資源分配的基本單位
進程有:代碼段,數據段,進程式控制制塊(PCB)組成
二、進程與程式的區別
程式僅僅只是一堆代碼而已,而進程指的是程式的運行過程。
舉例:
想象一位有一手好廚藝的電腦科學家正在為他的女兒烘製生日蛋糕。
他有做生日蛋糕的食譜,
廚房裡有所需的原料:麵粉、雞蛋、韭菜,蒜泥等。
在這個比喻中:
做蛋糕的食譜就是程式(即用適當形式描述的演算法)
電腦科學家就是處理器(cpu)
而做蛋糕的各種原料就是輸入數據。
進程就是廚師閱讀食譜、取來各種原料以及烘製蛋糕等一系列動作的總和。
需要強調的是:同一個程式執行兩次,那也是兩個進程,比如打開暴風影音,雖然都是同一個軟體,但是一個可以播放蒼井空,一個可以播放飯島愛。
三、併發與並行
無論是並行還是併發,在用戶看來都是'同時'運行的,不管是進程還是線程,都只是一個任務而已,真是幹活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務
併發:在同一個時間段內多個任務同時進行,偽並行,即看起來是同時運行。單個cpu+多道技術就可以實現併發(並行也屬於併發)
舉例:
你是一個cpu,你同時談了三個女朋友,每一個都可以是一個戀愛任務,你被這三個任務共用,要玩出併發戀愛的效果,應該是你先跟女友1去看電影,看了一會說:不好,我要拉肚子,然後跑去跟第二個女友吃飯,吃了一會說:那啥,我去趟洗手間,然後跑去跟女友3開了個房
並行:在同一個時間點上多個任務同時進行,同時運行,只有具備多個cpu才能實現並行
單核下,可以利用多道技術,多個核,每個核也都可以利用多道技術(多道技術是針對單核而言的)
舉例:
有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4, 一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術,而一旦任務1的I/O結束了,操作系統會重新調用它(需知進程的調度、分配給哪個cpu運行,由操作系統說了算),可能被分配給四個cpu中的任意一個去執行
四、同步、非同步、阻塞、非阻塞
同步
同步:某一個任務的執行必須依賴於另一個任務的返回結果
所謂同步,就是在發出一個功能調用時,在沒有得到結果之前,該調用就不會返回。按照這個定義,其實絕大多數函數都是同步調用。但是一般而言,我們在說同步、非同步的時候,特指那些需要其他部件協作或者需要一定時間完成的任務
非同步
非同步:某一個任務的執行,不需要依賴於另一個任務的返回,只需要告訴另一個任務一聲
非同步的概念和同步相對。當一個非同步功能調用發出後,調用者不能立刻得到結果。當該非同步功能完成後,通過狀態、通知或回調來通知調用者。
阻塞
阻塞:程式因為類似於IO等待、等待事件等導致無法繼續執行。
阻塞調用是指調用結果返回之前,當前線程會被掛起(如遇到io操作)。函數只有在得到結果之後才會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不同的。對於同步調用來說,很多時候當前線程還是激活的,只是從邏輯上當前函數沒有返回而已。
非阻塞
程式遇到類似於IO操作時,不再阻塞等待,如果沒有及時的處理IO,就報錯或者跳過等其他操作
非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前也會立刻返回,同時該函數不會阻塞當前線程。
五、進程的基本狀態
進程的三大基本狀態:
- 就緒狀態:所有進程需要的資源都獲取到了,等待著CPU的調用
- 執行狀態:獲取到了所有資源包括CPU,進程處於運行狀態
- 阻塞狀態:程停滯不再運行,放棄了CPU,進程此時處於記憶體里
六、multiprocessing模塊介紹
Python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在Python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,併在子進程中執行我們定製的任務(比如函數),該模塊與多線程模塊threading的編程介面類似。
multiprocessing模塊的功能眾多:支持子進程、通信和共用數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
需要再次強調的一點是:與線程不同,進程沒有任何共用狀態,進程修改的數據,改動僅限於該進程內。
七、Process類的介紹
創建進程的類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹
group參數未使用,值始終為None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name為子進程的名稱
方法介紹
p.start():啟動進程,並調用該子進程中的p.run() p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麼也將不會被釋放,進而導致死鎖 p.is_alive():如果p仍然運行,返回True p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹
p.daemon:預設值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置 p.name:進程的名稱 p.pid:進程的pid p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束 p.authkey:進程的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字元串。這個鍵的用途是為涉及網路連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功
八、Process類的使用
註意:在windows中Process()必須放到# if name == 'main':下
創建並開啟子進程的兩種方式
from multiprocessing import Process import os def child_process(): print("這是子進程{0},父進程是{1}".format(os.getpid(), os.getppid())) if __name__ == '__main__': child_p = Process(target=child_process) child_p.start() # child_p.join() print("這是父進程{0}".format(os.getpid()))
from multiprocessing import Process import os class ChildProcess(Process): def __init__(self): super(ChildProcess, self).__init__() def run(self): print("這是子進程{0},父進程是{1}".format(os.getpid(), os.getppid())) if __name__ == '__main__': child_p = ChildProcess() child_p.start() # child_p.join() print("這是父進程{0}".format(os.getpid()))
進程之間的記憶體空間是隔離的
from multiprocessing import Process import os num = 100 def chile_process(): global num num = 0 print("子進程中:{0}".format(num)) if __name__ == '__main__': p = Process(target=chile_process) p.start() print("父進程中:{0}".format(num)) # 父進程中:100 # 子進程中:0
Process中的join()方法
join():主進程等待,等待子進程結束
from multiprocessing import Process import os import time def child_process(): time.sleep(3) print("這是子進程") if __name__ == '__main__': p = Process(target=child_process) p.start() # p.join() print("這是主進程") # 這是主進程 # 這是子進程 # 分析:如果不加join那麼則是先列印主進程中的“這是主進程”,然後等待三秒在列印“這是子進程” from multiprocessing import Process import os import time def child_process(): time.sleep(3) print("這是子進程") if __name__ == '__main__': p = Process(target=child_process) p.start() p.join() print("這是主進程") # 這是子進程 # 這是主進程 # 分析:如果加了join那麼主進程會等待子進程執行完之後再執行主進程,也就是說會先等待三秒然後同時列印出“這是子進程”和“這是主進程”
九、守護進程
守護進程的特點:
守護進程會在主進程執行完成後終止
設置了守護進程後,守護進程不能再開啟子進程,否則會報異常
from multiprocessing import Process import time def func(name): time.sleep(1) print("我是{0}".format(name)) def foo(name): time.sleep(3) print("{0}是誰".format(name)) if __name__ == '__main__': p1 = Process(target=func, args=("oldwang",)) p2 = Process(target=foo, args=("oldwang",)) p1.daemon = True # 一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行 p1.start() p2.start() print("這是主進程...") # 執行結果: # 這是主進程... # oldwang是誰
十、進程同步
進程之間數據不共用,但是共用同一套文件系統,所以訪問同一個文件,或同一個列印終端,是沒有問題的,
而共用帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
# 模擬搶票,購票行為由並行變成了串列,犧牲了效率,提高了數據安全性
from multiprocessing import Process, Lock
import json
import time
import os
def search_ticket():
with open("file/ticket", mode="r", encoding="utf-8") as f:
ticket_num = int(f.read())
print("剩餘票數:{0}".format(ticket_num))
def get_ticket():
with open("file/ticket", mode="r", encoding="utf-8") as f:
ticket = int(f.read())
time.sleep(0.1) # 模擬搶票延時
if ticket:
ticket -= 1
print("{0}搶到了一張票,還剩{1}張票".format(os.getpid(), ticket))
else:
print("{0}沒有搶到票".format(os.getpid()))
f = open("file/ticket", mode="w", encoding="utf-8")
f.write(str(ticket))
def task(lock):
search_ticket()
lock.acquire()
get_ticket()
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(100):
p = Process(target=task, args=(lock,))
p.start()
總結:加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共用數據實現進程間通信,但問題是:
- 效率低(共用數據基於文件,而文件是硬碟上的數據)
- 需要自己加鎖處理
十一、隊列(推薦使用)
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
Queue(底層就是以管道和鎖的方式實現)
方法介紹
maxsize是隊列中允許最大項數,省略則無大小限制。 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣 q.cancel_join_thread():不會在進程退出時自動連接後臺線程。可以防止join_thread()方法阻塞 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。 q.join_thread():連接隊列的後臺線程。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為
from multiprocessing import Queue q = Queue(maxsize=3) q.put(1) q.put({"name":"dogfa"}) q.put([1,2,3]) print(q.full()) # True print(q.get()) # 1 print(q.get()) # {'name': 'dogfa'} print(q.get()) # [1, 2, 3] print(q.empty()) # True
生產者消費者模型
在併發編程中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程式的整體處理數據的速度。
為什麼使用生產者消費者模式
線上程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為瞭解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
JoinableQueue()
#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共用的信號和條件變數來實現的。 #參數介紹: maxsize是隊列中允許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法之外還具有: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
生產者消費者模型的實現
from multiprocessing import JoinableQueue, Process import os import random import time def customer(q): while 1: time.sleep(0.5) print("{0}號顧客吃了{1}".format(os.getpid(), q.get())) q.task_done() def producter(food, q): for i in range(10): time.sleep(random.randint(1, 2)) q.put(food) print("{0}號廚師完成了{1}的製作".format(os.getpid(), food)) q.join() if __name__ == '__main__': q = JoinableQueue() pro1 = Process(target=producter, args=("包子", q)) pro2 = Process(target=producter, args=("油條", q)) pro3 = Process(target=producter, args=("花卷", q)) cus1 = Process(target=customer, args=(q,)) cus2 = Process(target=customer, args=(q,)) cus1.daemon = True cus2.daemon = True lst = [pro1, pro2, pro3, cus1, cus2] [i.start() for i in lst] pro1.join() pro2.join() pro3.join() print("ending...") # 主進程等待pro1,Pro2,pro3執行完成,當pro執行完成意味著cus必定執行完成,所以可以將cus設置成守護進程
生產者消費者模式總結
#程式中有兩類角色 一類負責生產數據(生產者) 一類負責處理數據(消費者) #引入生產者消費者模型為瞭解決的問題是: 平衡生產者與消費者之間的工作能力,從而提高程式整體處理數據的速度 #如何實現: 生產者<-->隊列<——>消費者 #生產者消費者模型實現類程式的解耦和
十二、管道(不推薦使用)
管道
#創建管道的類: Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道 #參數介紹: dumplex:預設管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):通過連接發送對象。obj是與序列化相容的任意對象 #其他方法: conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回連接使用的整數文件描述符 conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的位元組消息。maxlength指定要接收的最大位元組數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發送位元組數據緩衝區,buffer是支持緩衝區介面的任意對象,offset是緩衝區中的位元組偏移量,而size是要發送位元組數。結果數據以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的位元組消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區介面(即bytearray對象或類似的對象)。offset指定緩衝區中放置消息處的位元組位移。返回值是收到的位元組數。如果消息長度大於可用的緩衝區空間,將引發BufferTooShort異常。
利用管道實現進程間的通信
from multiprocessing import Process,Pipe import time,os def consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right=p right.close() for i in seq: left.send(i) # time.sleep(1) else: left.close() if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print('主進程')
十三、數據共用
進程間數據是獨立的,可以藉助於隊列或管道實現通信,二者都是基於消息傳遞的,雖然進程間數據獨立,但可以通過Manager實現數據共用
from multiprocessing import Process, Manager, Lock
import os
import random
import time
def func(dic, lock):
lock.acquire() # 不加鎖肯定會造成數據混亂
time.sleep(random.randrange(2))
dic["count"] -= 1
lock.release()
if __name__ == '__main__':
lock = Lock()
with Manager() as m:
dic = m.dict({"count": 100})
lst = []
for i in range(100):
p = Process(target=func, args=(dic, lock))
lst.append(p)
p.start()
[i.join() for i in lst]
print(dic["count"])
進程間通信應該儘量避免使用上述共用數據的方式
十三、進程池
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程式控制制多台主機,並行操作可以節約大量的時間。多進程是實現併發的手段之一,需要註意的問題是:
- 很明顯需要併發執行的任務通常要遠大於核數
- 一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
- 進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多餘核數目的進程也無法做到並行)
如果當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。我們就可以通過維護一個進程池來控制進程數目。
ps:對於遠程過程調用的高級應用程式而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然後自始至終使用這三個進程去執行所有任務,不會開啟其他進程
創建進程池
Pool([numprocess [,initializer [, initargs]]]):創建進程池
參數介紹
numprocess:要創建的進程數,如果省略,將預設使用cpu_count()的值 initializer:是每個工作進程啟動時要執行的可調用對象,預設為None initargs:是要傳給initializer的參數組
方法介紹
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數併發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法: obj.get():返回結果,如果有必要則等待結果到達。 obj.ready():如果調用完成,返回True obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 obj.wait([timeout]):等待結果變為可用。 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
進程池的使用
使用進程池(非同步調用,apply_async)
from multiprocessing import Process, Pool import os import time import socket def func(i): print(i) time.sleep(1) return i ** 2 if __name__ == '__main__': pool = Pool(os.cpu_count() + 1) ret_lst = [] for i in range(100): # 維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去 ret = pool.apply_async(func, args=(i,)) ret_lst.append(ret) # 沒有後面的join,或get,則程式整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了 print("=======================") # 關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 pool.close() # 調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束 pool.join() # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果 print(ret_lst) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get [print(i.get()) for i in ret_lst]
使用進程池(同步調用,apply)
from multiprocessing import Process, Pool import os import time import socket def func(i): print(i) time.sleep(1) return i ** 2 if __name__ == '__main__': # 維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去 pool = Pool(os.cpu_count() + 1) # 同步執行,即執行完一個拿到結果,再去執行另外一個 ret_lst = [] for i in range(100): ret = pool.apply(func, args=(i,)) ret_lst.append(ret) # 看到的就是最終的結果組成的列表,apply是同步的,所以直接得到結果,沒有get()方法 print(ret_lst)
進程池實現基於TCP協議的socket併發效果
# 服務端 from multiprocessing import Process, Pool import os import time import socket def func(conn, client_addr): print("進程:{0}".format(os.getpid())) while 1: try: c_msg = conn.recv(1024).decode("utf-8") if not c_msg: break print(c_msg) conn.send(c_msg.upper().encode("utf-8")) except Exception: break if __name__ == '__main__': sk = socket.socket() sk.bind(("127.0.0.1", 8080)) sk.listen(5) pool = Pool(os.cpu_count() + 1) while 1: conn, addr = sk.accept() pool.apply_async(func, args=(conn, addr)) # 服務端 import socket sk = socket.socket() sk.connect(("127.0.0.1", 8080)) while 1: c_msg = input(">>") if not c_msg: continue sk.send(c_msg.encode("utf-8")) s_msg = sk.recv(1024).decode("utf-8") print(s_msg)
當連接數達到開啟的進程池中的最大進程數量時,再有其它客戶端進行連接,將會阻塞等待,當另外的客戶端結束連接時才會建立起會話連接。
回調函數(callback())
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) # 拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了 ''' 列印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
如果在主進程中等待進程池中所有任務都執行完畢後,再統一處理結果,則無需回調函數
十四、信號量
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去,如果指定信號量為3,那麼來一個人獲得一把鎖,計數加1,當計數等於3時,後面的人均需要等待。一旦釋放,就有人可以獲得一把鎖
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
from multiprocessing import Process,Semaphore
import time,random
def go_wc(sem,user):
sem.acquire()
print('%s 占到一個茅坑' %user)
time.sleep(random.randint(0,3)) #模擬每個人拉屎速度不一樣,0代表有的人蹲下就起來了
sem.release()
if __name__ == '__main__':
sem=Semaphore(5)
p_l=[]
for i in range(13):
p=Process(target=go_wc,args=(sem,'user%s' %i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
print('============》')
十五、事件
Python線程的事件用於主線程式控制制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
clear:將“Flag”設置為False
set:將“Flag”設置為True
from multiprocessing import Process,Event
import time,random
def car(e,n):
while True:
if not e.is_set(): #Flase
print('\033[31m紅燈亮\033[0m,car%s等著' %n)
e.wait()
print('\033[32m車%s 看見綠燈亮了\033[0m' %n)
time.sleep(random.randint(3,6))
if not e.is_set():
continue
print('走你,car', n)
break
def police_car(e,n):
while True:
if not e.is_set():
print('\033[31m紅燈亮\033[0m,car%s等著' % n)
e.wait(1)
print('燈的是%s,警車走了,car %s' %(e.is_set(),n))
break
def traffic_lights(e,inverval):
while True:
time.sleep(inverval)
if e.is_set():
e.clear() #e.is_set() ---->False
else:
e.set()
if __name__ == '__main__':
e=Event()
# for i in range(10):
# p=Process(target=car,args=(e,i,))
# p.start()
for i in range(5):
p = Process(target=police_car, args=(e, i,))
p.start()
t=Process(target=traffic_lights,args=(e,10))
t.start()
print('============》')
..............