上一篇隨筆我們學了全局解釋器鎖,前面也學了互斥鎖,今天學習一些與鎖相關的點,例如遞歸鎖,信號量,Event,還會學習我們已經很熟悉的隊列,不過這次的隊列是作為一個模塊出現的。 ...
上一篇隨筆我們學了全局解釋器鎖,前面也學了互斥鎖,今天學習一些與鎖相關的點,例如遞歸鎖,信號量,Event,還會學習我們已經很熟悉的隊列,不過這次的隊列是作為一個模塊出現的。
一、同步鎖
1、join與互斥鎖
線程搶的是GIL鎖,GIL鎖相當於執行許可權,拿到執行許可權後才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發現Lock仍然沒有被釋放則阻塞,即便是拿到執行許可權GIL也要立刻交出來
join是等待所有,即整體串列,而鎖只是鎖住修改共用數據的部分,即部分串列,要想保證數據安全的根本原理在於讓併發變成串列,join與互斥鎖都可以實現,毫無疑問,互斥鎖的部分串列效率要更高
2、GIL VS Lock
鎖的目的是為了保護共用的數據,同一時間只能有一個線程來修改共用的數據。結論:保護不同的數據就應該加不同的鎖。
GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程式的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock
分析:
1)100個線程去搶GIL鎖,即搶執行許可權
2) 肯定有一個線程先搶到GIL(暫且稱為線程1),然後開始執行,一旦執行就會拿到lock.acquire()
3)極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行許可權,即釋放GIL
4)直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的線程再重覆2 3 4的過程
3、join與互斥鎖對比實例
1)未處理代碼:
#不加鎖:併發執行,速度快,數據不安全 from threading import currentThread,Thread import time def task(): time.sleep(1) global n print('%s is running' %currentThread().getName()) temp = n time.sleep(0.1) n = temp - 1 if __name__ == '__main__': n = 100 t_l = [] s1 = time.time() for i in range(100): t=Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() s2 = time.time() print('主:%s n:%s' %(s2-s1,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:1.1128411293029785 n:99 '''初始
2)加互斥鎖:
#不加鎖:未加鎖部分併發執行,加鎖部分串列執行,速度慢,數據安全 from threading import currentThread,Thread,Lock import time def task(): #未加鎖的代碼併發運行 time.sleep(1) print('%s start to run' %currentThread().getName()) global n #加鎖的代碼串列運行 mutex.acquire() temp = n time.sleep(0.1) n = temp - 1 mutex.release() if __name__ == '__main__': n = 100 mutex = Lock() t_l = [] s1 = time.time() for i in range(100) : t = Thread(target=task) t_l.append(t) t.start() for t in t_l : t.join() s2 = time.time() print('主:%s n:%s' %(s2-s1,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:11.091605186462402 n:0 '''Lock
3)join效果
from threading import currentThread,Thread import time def task(): time.sleep(1) print('%s start to run' %currentThread().getName()) global n temp = n time.sleep(0.1) n = temp - 1 if __name__ == '__main__': n = 100 s1 = time.time() for i in range(100): t = Thread(target=task) t.start() t.join() s2 = time.time() print('主:%s n:%s' %(s2-s1,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:110.16416668891907 n:0 '''join
即在start之後立刻使用jion,肯定會將100個任務的執行變成串列,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是start後立即join:任務內的所有代碼都是串列執行的,而加鎖,只是加鎖的部分即修改共用數據的部分是串列的單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
二、死鎖現象與遞歸鎖
1、死鎖現象
進程也有死鎖與遞歸鎖與線程中相同
死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程
from threading import Lock,Thread import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('\033[31m%s 拿到A鎖' %self.name) mutexB.acquire() print('\033[32m%s 拿到B鎖' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('\033[33m%s 拿到B鎖' %self.name) time.sleep(1) mutexA.acquire() print('\033[34m%s 拿到A鎖' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
死鎖狀態,程式永遠無法結束:
2、遞歸鎖
上述情況可以用遞歸鎖解決
遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:
from threading import ,Thread,RLock import time mutexB=mutexA=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止 class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('\033[31m%s 拿到A鎖' %self.name) mutexB.acquire() print('\033[32m%s 拿到B鎖' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('\033[33m%s 拿到B鎖' %self.name) time.sleep(1) mutexA.acquire() print('\033[34m%s 拿到A鎖' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
三、信號量Semaphore
Semaphore也是一種鎖不過這個鎖可以自己定義同時可以進入鎖的線程數
Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。
實例:
1、互斥鎖Lock就像家裡的廁所每次只能進一人,進去後鎖門其他人在外面等著(這是學進程互斥鎖時的例子)
from multiprocessing import Process,Lock,current_process import time,random def work(mutex): mutex.acquire() #上鎖 print('%s 上廁所' %current_process().name) time.sleep(random.randint(1,3)) print('%s 走了' %current_process().name) mutex.release() #開鎖 if __name__ == '__main__': mutex=Lock() #實例化(互斥鎖) print('start...') for i in range(20): t=Process(target=work,args=(mutex,)) t.start()
2、信號量Semaphore就像是街道的公共廁所有固定個數的隔間(例如5個),剛開始可以進去5個,然後出來幾個便可以再進去幾個
from threading import Thread,Semaphore,currentThread import time,random sm=Semaphore(5) def task(): sm.acquire() print('%s 上廁所' %currentThread().getName()) time.sleep(random.randint(1,3)) print('%s 走了' %currentThread().getName()) sm.release() if __name__ == '__main__': for i in range(20): t=Thread(target=task) t.start()
與進程池相似但是完全不同的概念,進程池Pool(4),最大隻能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
四、Event
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程式中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為瞭解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麼這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麼它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; event.clear():恢復event的狀態值為False。
1、模擬紅綠燈
from threading import Thread,Event,currentThread import time e=Event() def traffic_lights(): time.sleep(5) e.set() def car(): print('\033[41m%s 等' %currentThread().getName()) e.wait() print('\033[42m%s 跑' %currentThread().getName()) if __name__ == '__main__': for i in range(10): t=Thread(target=car) t.start() traffic_thread=Thread(target=traffic_lights) traffic_thread.start()
2、有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL伺服器,如果連接不成功,都會去嘗試重新連接。那麼我們就可以採用threading.Event機制來協調各個工作線程的連接操作
from threading import Thread,Event,currentThread import time e=Event() def conn_mysql(): count=1 while not e.is_set(): if count > 3: raise ConnectionError('嘗試鏈接的次數過多') print('\033[35m%s 第%s次嘗試' %(currentThread().getName(),count)) e.wait(timeout=1) count+=1 print('\033[32m%s 開始鏈接' %currentThread().getName()) def check_mysql(): print('\033[34m%s 檢測mysql...' %currentThread().getName()) time.sleep(2) e.set() if __name__ == '__main__': for i in range(3): t=Thread(target=conn_mysql) t.start() t=Thread(target=check_mysql) t.start()
五、定時器
定時器,指定n秒後執行某操作
from threading import Timer def hello(n): print("hello, world",n) #三秒後運行hello函數傳入參數123 t = Timer(3, hello, args=(123,)) t.start()
六、線程queue
queue隊列 :使用import queue,用法與進程Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
隊列線上程編程中尤其有用,因為必須在多個線程之間安全地交換信息。
1、queue.
Queue
() 先進先出
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
2、queue.
LifoQueue
() 後進先出
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
3、queue.
PriorityQueue
() 存儲數據時可設置優先順序的隊列
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先順序越高,優先順序高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''