多線程 什麼是鎖? - 鎖通常被用來實現對共用資源的同步訪問。 - 為每一個共用資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖: GIL(Global Inte ...
多線程
什麼是鎖?
- 鎖通常被用來實現對共用資源的同步訪問。
- 為每一個共用資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:
GIL(Global Interpreter Lock) 全局的解釋器鎖
增加鎖的目的:
1.雖然效率十分低,但保證了數據的安全性
2.不同的鎖對應保護不同的數據
3.誰拿到GIL鎖就讓誰得到Cpython解釋器的執行許可權
4.GIL鎖保護的是Cpython解釋器數據的安全,而不會保護你自己程式的數據的安全
5.GIL鎖當遇到阻塞的時候,就被迫把鎖給釋放了,那麼其他的就開始搶鎖了,搶到後把值進行修改,但是第一個拿到鎖的還依舊保持著原本的數據,當再次拿到鎖的時候,數據已經修改了,而第一位拿的還是原來的數值,這樣就造成了混亂,也就保證不了數據的安全了。
同步鎖Lock
- GIL與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(保護的是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程式的數據
實例:沒有加上鎖的情況
- 在執行這個操作的多條bytecodes期間的時候可能中途就換別的線程了,這樣就出現了data races的情況
mport time import threading def addNum(): global num # 在每個線程中都獲取這個全局變數 temp = num print('--get num:',num ) time.sleep(0.01) num = temp - 1 # 對此公共變數進行-1操作 start = time.time() num = 100 # 設定一個共用變數 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: # 等待所有線程執行完畢 t.join() print('final num:', num ) end = time.time() print(end - start) # 此時並不能獲取到正確的答案0 # 100個線程每一個一定都沒有執行完就進行了切換,我們說過sleep就等效於IO阻塞,1s之內不會再切換回來,所以最後的結果一定是99. # 多個線程都在同時操作同一個共用資源,所以造成了資源破壞
實例:加上鎖的情況
- 同步鎖保證了在同一時刻只有一個線程被執行
import time import threading def addNum(): global num # 在每個線程中都獲取這個全局變數 lock.acquire() # 獲取鎖 temp = num print('--get num:',num ) num = temp - 1 # 對此公共變數進行-1操作 lock.release() # 只有在執行完上述內容才會釋放鎖 start = time.time() num = 100 # 設定一個共用變數 thread_list = [] lock = threading.Lock() for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('final num:', num ) end = time.time() print(end - start)
死鎖Lock
線程間共用多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖的情況
因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下將一直等待下去
import threading,time class myThread(threading.Thread): def LoA(self): lockA.acquire() print(self.name,"got lockA",time.ctime()) time.sleep(3) lockB.acquire() print(self.name,"got lockB",time.ctime()) lockB.release() lockA.release() def LoB(self): lockB.acquire() print(self.name,"got lockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"got lockA",time.ctime()) lockA.release() lockB.release() def run(self): self.LoA() self.LoB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() # 解決方案:添加 lock = threading.RLock() # lock = threading.RLock() threads=[] for i in range(3): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join() # 此時線程會卡死,一直等待下去,此時添加遞歸鎖即可解決死鎖的問題
遞歸鎖 RLock
在添加遞歸鎖後,RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源
信號量
信號量用來控制線程併發數的,它也是一把鎖,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1
計數器不能小於0,當計數器為 0時,acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。(類似於停車位的概念)
BoundedSemaphore與Semaphore的唯一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將拋出一個異常
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): # 如果上信號量鎖就往下進行 print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore = threading.Semaphore(5) # 一次允許5個線程進行 # semaphore = threading.BoundedSemaphore(5) # 與上述效果一致 thrs = [] for i in range(20): # 開啟20個線程 thrs.append(myThread()) for t in thrs: t.start()
條件變數同步
有一類線程需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition對象用於條件變數線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了wait()、notify()、notifyAll()方法
lock_con = threading.Condition([Lock / Rlock]): 預設創建一個RLock鎖,用於線程間的通信
wait():條件不滿足時調用,線程會釋放鎖併進入等待阻塞
notify():條件創造後調用,通知等待池激活一個線程
notifyAll():條件創造後調用,通知等待池激活所有線程
import threading,time from random import randint class Producer(threading.Thread): def run(self): global L while True: val=randint(0,100) print('生產者',self.name,":Append"+str(val),L) if lock_con.acquire(): L.append(val) lock_con.notify() # 激活一個線程 lock_con.release() time.sleep(3) class Consumer(threading.Thread): def run(self): global L while True: lock_con.acquire() # wait()過後,從此處開始進行 if len(L)==0: lock_con.wait() # 進入等待阻塞 print('繼續進行') # 我們可以看到並沒有列印這個話,這說明線程不是從wait()下麵繼續進行 print('消費者',self.name,":Delete"+str(L[0]),L) del L[0] lock_con.release() time.sleep(0.25) if __name__=="__main__": L = [] lock_con = threading.Condition() threads = [] for i in range(5): threads.append(Producer()) threads.append(Consumer()) for t in threads: t.start() for t in threads: t.join()
同步條件(Event)
條件同步和條件變數同步差不多意思,只是少了鎖功能,同步條件不是鎖
因為條件同步設計於不訪問共用資源的條件環境。event = threading.Event():條件環境對象,初始值為False;
event.isSet():返回event的狀態值
event.wait():如果 event.isSet()==False將阻塞線程
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度
event.clear():恢復event的狀態值為False
import threading,time import random def light(): if not event.isSet(): event.set() #wait就不阻塞 #綠燈狀態 count = 0
while True: if count < 10: print('\033[42;1m--green light on---\033[0m') elif count <13: print('\033[43;1m--yellow light on---\033[0m') elif count <20: if event.isSet(): event.clear() print('\033[41;1m--red light on---\033[0m') else: count = 0 event.set() # 打開綠燈 time.sleep(1) count += 1 def car(n): while 1: time.sleep(random.randrange(10)) if event.isSet(): # 綠燈 print("car [%s] is running.." % n) else: print("car [%s] is waiting for the red light.." %n) if __name__ == '__main__': event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in range(3): t = threading.Thread(target=car,args=(i,)) t.start()