1.1 基於UDP協議實現簡單的套接字通信 udp是無鏈接的,先啟動哪一端都不會報錯 udp套接字簡單示例 1.1.1.1 客戶端: from socket import * client=socket(AF_INET,SOCK_DGRAM) #數據報協議,創建一個客戶的套接字 while True ...
1.1 基於UDP協議實現簡單的套接字通信
udp是無鏈接的,先啟動哪一端都不會報錯
udp套接字簡單示例
1.1.1.1 客戶端:
from socket import * client=socket(AF_INET,SOCK_DGRAM) #數據報協議,創建一個客戶的套接字 while True: # 通訊迴圈 msg=input('>>: ').strip() client.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) #對話(發送) data,server_addr=client.recvfrom(1024) print(data.decode('utf-8'))
1.1.1.2 服務端:
from socket import * server=socket(AF_INET,SOCK_DGRAM) #數據報協議,創建一個伺服器的套接字 server.bind(('127.0.0.1',8080)) #綁定伺服器套接字 data,client_addr=server.recvfrom(3) #對話(接收) print('第一次:',data)
1.1.2 數據報協議
1.1.2.1 客戶端:
from socket import * client=socket(AF_INET,SOCK_DGRAM) #數據報協議 client.sendto('hello'.encode('utf-8'),('127.0.0.1',8082)) client.sendto('world'.encode('utf-8'),('127.0.0.1',8082))
1.1.2.2 服務端:
from socket import * server=socket(AF_INET,SOCK_DGRAM) #數據報協議 server.bind(('127.0.0.1',8080)) data,client_addr=server.recvfrom(3) print('第一次:',data) data,client_addr=server.recvfrom(3) print('第二次: ',data)
1.2 多進程
顧名思義,進程即正在執行的一個過程。進程是對正在運行程式的一個抽象。
進程的概念起源於操作系統,是操作系統最核心的概念,也是操作系統提供的最古老也是最重要的抽象概念之一。操作系統的其他所有內容都是圍繞進程的概念展開的。
即使可以利用的cpu只有一個(早期的電腦確實如此),也能保證支持(偽)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路復用和空間多路復用+硬體上支持隔離),沒有進程的抽象,現代電腦將不復存在。
操作系統的作用:
1:隱藏醜陋複雜的硬體介面,提供良好的抽象介面
2:管理、調度進程,並且將多個進程對硬體的競爭變得有序
多道技術:
1.產生背景:針對單核,實現併發
ps:
現在的主機一般是多核,那麼每個核都會利用多道技術
有4個cpu,運行於cpu1的某個程式遇到io阻塞,會等到io結束再重新調度,會被調度到4個
cpu中的任意一個,具體由操作系統調度演算法決定。
2.空間上的復用:如記憶體中同時有多道程式
3.時間上的復用:復用一個cpu的時間片
強調:遇到io切,占用cpu時間過長也切,核心在於切之前將進程的狀態保存下來,這樣才能保證下次切換回來時,能基於上次切走的位置繼續運行
1.2.1 開啟進程的兩種方式:
1.2.1.1 multiprocessing模塊介紹
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,併在子進程中執行我們定製的任務(比如函數),該模塊與多線程模塊threading的編程介面類似。
multiprocessing模塊的功能眾多:支持子進程、通信和共用數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
需要再次強調的一點是:與線程不同,進程沒有任何共用狀態,進程修改的數據,改動僅限於該進程內。
1.2.1.2 方式一:
註意:在windows中Process()必須放到# if __name__ == '__main__':下
from multiprocessing import Process import time def task(name): print('%s is running' %name) time.sleep(5) print('%s is done' %name) if __name__ == '__main__': p=Process(target=task,args=('alex',)) p.start() print('主')
1.2.1.3 方式二:
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super(MyProcess,self).__init__() self.name=name def run(self): print('%s is running' %self.name) time.sleep(3) print('%s is done' %self.name) if __name__ == '__main__': p=MyProcess('進程1') p.start() #p.run() print('主')
1.2.2 進程之間記憶體空間是隔離的
from multiprocessing import Process import time n=100 def task(): global n time.sleep(5) n=0 if __name__ == '__main__': p=Process(target=task) p.start() # time.sleep(5) print(p.is_alive()) p.join() print(p.is_alive()) print('主',n)
1.2.3 join方法
主進程等,等待子進程結束
from multiprocessing import Process import time import os def task(n): print('%s is runing' %os.getpid()) time.sleep(n) print('%s is done' %os.getpid()) if __name__ == '__main__': start_time=time.time() p1=Process(target=task,args=(1,)) p2=Process(target=task,args=(2,)) p3=Process(target=task,args=(3,)) p_l=[p1,p2,p3] # p1.start() # p2.start() # p3.start() for p in p_l: p.start() # p3.join() #3 # p1.join() # # p2.join() # for p in p_l: p.join() stop_time=time.time() print('主',(stop_time-start_time))
既然join是等待進程結束,進程不就又變成串列的了嗎?當然不是了,必須明確:p.join()是讓誰等?很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,
進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個併發的進程了, 而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵,join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其餘p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待, 所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
from multiprocessing import Process import time import os def task(n): print('%s is runing' %os.getpid()) time.sleep(n) print('%s is done' %os.getpid()) if __name__ == '__main__': start_time=time.time() p1=Process(target=task,args=(1,)) p2=Process(target=task,args=(2,)) p3=Process(target=task,args=(3,)) p_l=[p1,p2,p3] # p1.start() # p2.start() # p3.start() for p in p_l: p.start() # p3.join() #3 # p1.join() # # p2.join() # for p in p_l: p.join() stop_time=time.time() print('主',(stop_time-start_time))
from multiprocessing import Process import time import os def task(n): print('%s is runing' %os.getpid()) time.sleep(n) print('%s is done' %os.getpid()) if __name__ == '__main__': start_time=time.time() p1=Process(target=task,args=(1,)) p2=Process(target=task,args=(2,)) p3=Process(target=task,args=(3,)) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() stop_time=time.time() print('主',(stop_time-start_time))
1.2.4 進程對象的其他屬性或方法
進程對象的其他方法一:terminate,is_alive
from multiprocessing import Process import time import os def task(n): print('pid:%s ppid:%s' %(os.getpid(),os.getppid())) time.sleep(n) if __name__ == '__main__': p=Process(target=task,args=(15,),name='進程1') p.start() p.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活 # time.sleep(1) print(p.is_alive())#結果為True print('主pid:%s ppid:%s' %(os.getpid(),os.getppid())) # print(p.pid) p.name='xxxx' print(p.name)
1.2.5 守護進程
主進程創建守護進程
守護進程:當子進程執行的任務在父進程代碼運行完畢後就沒有存在的必要了,那
該子進程就應該被設置為守護進程,
守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
註意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
from multiprocessing import Process import time def task(name): p=Process(target=time.sleep,args=(6,)) p.start() print('%s is running' %name) time.sleep(5) print('%s is done' %name) if __name__ == '__main__': p=Process(target=task,args=('alex',)) p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行 p.start() time.sleep(1) print('主')
主進程代碼運行完畢,守護進程就會結束
from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #列印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的列印信息123,因為主進程列印main----時,p1也執行了,但是隨即被終止
1.2.6 互斥鎖
進程之間數據不共用,但是共用同一套文件系統,所以訪問同一個文件,或同一個列印終端,是沒有問題的,而共用帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
多個進程共用同一列印終端, 併發運行,效率高,但競爭同一列印終端,帶來了列印錯亂, 加鎖:由併發變成了串列,犧牲了運行效率,但避免了競爭
多個進程共用同一文件, 併發運行,效率高,但競爭寫同一文件,數據寫入錯亂, 加鎖:購票行為由併發變成了串列,犧牲了運行效率,但保證了數據安全
文件當資料庫,模擬搶票
from multiprocessing import Process,Lock import json import time import random import os def search(): time.sleep(random.randint(1,3)) dic=json.load(open('db.txt','r',encoding='utf-8')) print('%s 查看到剩餘票數%s' %(os.getpid(),dic['count'])) def get(): dic=json.load(open('db.txt','r',encoding='utf-8')) if dic['count'] > 0: dic['count']-=1 time.sleep(random.randint(1,3)) json.dump(dic,open('db.txt','w',encoding='utf-8')) print('%s 購票成功' %os.getpid()) def task(mutex): search() mutex.acquire() get() mutex.release() if __name__ == '__main__': mutex=Lock() for i in range(10): p=Process(target=task,args=(mutex,)) p.start() # p.join()
db.txt
{"count": 1}
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共用數據實現進程間通信,但問題是:
1.效率低(共用數據基於文件,而文件是硬碟上的數據)
2.需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共用一塊記憶體的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
1 隊列和管道都是將數據存放於記憶體中
2 隊列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共用數據,儘可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
1.2.7 隊列
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
創建隊列的類(底層就是以管道和鎖定的方式實現)
Queue([maxsize]):創建共用的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
maxsize是隊列中允許最大項數,省略則無大小限制。
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
from multiprocessing import Queue q=Queue(3) q.put('first') q.put(2) q.put({'count':3}) # q.put('fourth',block=False) #q.put_nowait('fourth') # q.put('fourth',block=True,timeout=3) print(q.get()) print(q.get()) print(q.get()) # print(q.get(block=False)) #q.get_nowait() print(q.get(block=True,timeout=3))
其他方法(瞭解):
1 q.cancel_join_thread():不會在進程退出時自動連接後臺線程。可以防止join_thread()方法阻塞
2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
3 q.join_thread():連接隊列的後臺線程。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為
1.2.8 生產者消費者模型
在併發編程中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程式的整體處理數據的速度。
為什麼要使用生產者和消費者模式
線上程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為瞭解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue import time import random def producer(name,food,q): for i in range(3): res='%s%s' %(food,i) time.sleep(random.randint(1,3)) q.put(res) print('廚師[%s]生產了<%s>' %(name,res)) def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print('吃貨[%s]吃了<%s>' % (name, res)) if __name__ == '__main__': #隊列 q=Queue() #生產者們 p1=Process(target=producer,args=('egon1','泔水',q)) p2=Process(target=producer,args=('egon2','骨頭',q)) #消費者們 c1=Process(target=consumer,args=('管廷威',q)) c2=Process(target=consumer,args=('oldboy',q)) c3=Process(target=consumer,args=('oldgirl',q)) p1.start() p2.start() c1.start() c2.start() c3.start() p1.join() p2.join() q.put(None) q.put(None) q.put(None) print('主')
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就可以break出死迴圈
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共用的信號和條件變數來實現的。
#參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
#方法介紹:
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
from multiprocessing import Process,JoinableQueue import time import random def producer(name,food,q): for i in range(3): res='%s%s' %(food,i) time.sleep(random.randint(1,3)) q.put(res) print('廚師[%s]生產了<%s>' %(name,res)) def consumer(name,q): while True: res=q.get() if res is None:break #生產者在生產完畢後發送結束信號None time.sleep(random.randint(1,3)) print('吃貨[%s]吃了<%s>' % (name, res)) q.task_done() if __name__ == '__main__': #隊列 q=JoinableQueue() #生產者們 p1=Process(target=producer,args=('egon1','泔水',q)) p2=Process(target=producer,args=('egon2','骨頭',q)) #消費者們 c1=Process(target=consumer,args=('管廷威',q)) c2=Process(target=consumer,args=('oldboy',q)) c3=Process(target=consumer,args=('oldgirl',q)) c1.daemon=True c2.daemon=True c3.daemon=True p1.start() p2.start() c1.start() c2.start() c3.start() p1.join() p2.join() q.join() print('主')
1.3 多線程
threading模塊介紹
multiprocess模塊的完全模仿了threading模塊的介面,二者在使用層面,有很大的相似性,因而不再詳細介紹
官網鏈接:https://docs.python.org/3/library/threading.html?highlight=threading#
1.3.1 開啟線程的兩種方式
方式一:
from threading import Thread import time import random def piao(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) if __name__ == '__main__': t1=Thread(target=piao,args=('alex',)) t1.start() print('主')
方式二:
from threading import Thread import time import random class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s is piaoing' %self.name) time.sleep(random.randint(1,3)) print('%s is piao end' %self.name) if __name__ == '__main__': t1=MyThread('alex') t1.start() print('主')
1.3.2 練習
多線程併發
客戶端:
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()
服務端:
from threading import Thread,current_thread from socket import * def comunicate(conn): print('子線程:%s' %current_thread().getName()) while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): print('主線程:%s' %current_thread().getName()) server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn, addr = server.accept() print(addr) # comunicate(conn) t=Thread(target=comunicate,args=(conn,)) t.start() server.close() if __name__ == '__main__': server('127.0.0.1', 8081)
1.3.3 進程與線程的區別
from threading import Thread import time import random import os def piao(): print('%s is piaoing' %os.getpid()) time.sleep(random.randint(1,3)) if __name__ == '__main__': t1=Thread(target=piao,) t2=Thread(target=piao,) t3=Thread(target=piao,) t1.start()a t2.start() t3.start() print('主',os.getpid()) from threading import Thread import time import random import os n=100 def piao(): global n n=0 if __name__ == '__main__': t1=Thread(target=piao,) t1.start() t1.join() print('主',n)
1.3.4 守護線程
無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷毀
需要強調的是:運行完畢並非終止運行
1.對主進程來說,運行完畢指的是主進程代碼運行完畢
2.對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢
主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),然後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(否則會產生僵屍進程),才會結束
主線程在其他非守護線程運行完畢後才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味著進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
from threading import Thread import time def sayhi(name): print('====>') time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) # t.setDaemon(True) t.daemon=True t.start() print('主線程') from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
1.3.5 線程的互斥鎖
from threading import Thread,Lock import time n=100 def task(): global n with mutex: temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': start_time=time.time() mutex=Lock() t_l=[] for i in range(100): t=Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() stop_time=time.time() print('主',n) print('run time