前面兩篇文章,寫了python線程同步原語的基本應用。下麵這篇文章主要是通過閱讀源碼來瞭解這幾個類的內部原理和是怎麼協同一起工作來實現python多線程的。 相關文章鏈接:python同步原語--線程鎖 python--線程同步原語 一、關於Condition類 Condition的用法: 用來記錄 ...
前面兩篇文章,寫了python線程同步原語的基本應用。下麵這篇文章主要是通過閱讀源碼來瞭解這幾個類的內部原理和是怎麼協同一起工作來實現python多線程的。
相關文章鏈接:python同步原語--線程鎖
一、關於Condition類
Condition的用法:
用來記錄線程的狀態變數
查看Condition的源碼,會看到作者給開發者提供的文檔說明。‘Class that implemets a condition variable’寫得很明白,這是一個用來記錄線程狀態的類。
1. Condition對象初始化
從這段代碼可以看出,Condition使用了threading模塊的Rlock類,關於Rlock的用法可以看我之前寫的一篇文章python同步原語--線程鎖 。在對象初始化的同時,將Rlock的請求鎖和釋放鎖方法賦給了內部的self.acquire和self.release對象方法。當初始化對象同時初始化這兩個方法,也就是說,每個對象在實例化的時候都會實例一個新的可重入鎖(RLock)。這樣可以避免不同對象(condition實例對象)間對類中共用方法的爭奪,避免出現死鎖的問題。
這段代碼非常的重要。如果熟悉python的上下文管理的朋友應該一看就明白,這是上下文管理中的進入和退出操作。當在調用with時,程式會自動調用_ _enter_ _方法,在程式執行完畢,退出此上下文環境時,自動調用_ _exit_ _方法。那麼在這裡的_ _enter_ _和_ _exit_ _方法分別有什麼用呢?通過閱讀源碼發現,前者是調用了Rlock的acquier方法(獲取鎖),而後者調用了Rlock的release方法(釋放鎖)。在下麵我會繼續講這兩個方法在類中的作用。
2. wait()方法
源碼中對wait()方法的定義是‘Wait untified or until a timeout occurs’。意思是阻塞等待知道有提示(notify)或者超時時間(timeout)的到達。
再看看wait()函數的內部邏輯
_is_owned()方法是判斷此Condition對象是否有獲取到鎖,如果沒有獲取到鎖(可能是可重入鎖的獲取次數已經達到預定值,不過這種情況很少發生),就會報出錯誤。接下來是對需要等待的程式進行一些列的處理。先是給這個程式分配鎖,對它的程式空間和內部變數進行封鎖。同時把這個加鎖後的程式放進雙端隊列(deque)‘等待者們’中。
好像wait()方法的功能到此就結束了。但是註意到下麵還有try函數塊,旁邊一行註釋寫著‘restore state no matter what’然後又舉了一個KeyboardInterrupt的異常情況。意思是當出現了例如鍵盤輸入ctrl+C這類操作的時候,程式如何退出阻塞。如果在調用wait方法的時候沒有傳入timeout參數,那麼,等待者程式就會重新獲取鎖。如果有timeout參數,就會根據參數來確定退出阻塞的時間。這就是為什麼我們有時在輸入ctrl+C強行退出阻塞的時候,程式會等待一會兒才給出退出程式的提示的原因。
3. notify()方法
接下來這個notify()方法在Condition類中也是非常的重要(queue模塊內部也調用了這個函數)
notify()方法內部實現:
notify直接翻譯過來就是‘提示’的意思。那麼為什麼Condition對象需要‘提示’呢?閱讀源碼下來,其真正的功能不是提示,而是鎖的釋放,並且在釋放了指定數量的waiters之後,順便將他們從‘等待者們’隊列中刪除。如果直接理解為提示,就會很難理解了。但這是老外在定義函數時的寫法,本人的理解是,有點像給阻塞的程式發出信號(提示),停止阻塞(釋放鎖),這麼理解應該也算勉強解釋得過去吧。
Condition內部另外還有一個notify_all()方法,這個方法對‘等待者們’隊列中的所有的程式都發出‘提示’,釋放鎖,而沒有像notify中那樣有數量n的限制。
源碼:
那麼總結上面的Condititon內部的方法實現,可以看出,Condition類是為了實現一種狀態的‘保存’,即在多線程編程的情況下,由於線程間共用空間而容易引發錯誤,往往需要讓一些線程先執行,而後面的線程等待(阻塞)。那麼如果這些程式需要阻塞等待,就會調用Condition類實例對象的wait方法,當結束等待的信號發出時,就會調用Condition的notify方法對隊列中的程式進行釋放鎖操作。
二、關於Segmaphore和BoundedSegmaphore
如果在主機執行IO密集型任務的時候再執行這種短時間內完成大量任務(多線程)的程式時,電腦就有很大可能會宕機。
這時候就可以為這段程式添加一個計數器(counter)功能,來限制一個時間點內的線程數量。當每次進行IO操作時,都需要向segmaphore請求資源(鎖),如果沒有請求到,就阻塞等待,請求成功才就像執行任務。
那麼segmaphore的內部實現是怎樣的呢?實質上segmaphore也是鎖,其內部也是通過Lock和Condition實現的。Lock是單鎖,而segmaphore是可以自己定義的多鎖。在初始化segmaphore時,需要傳入參數counter。當線程向segmaphore請求資源(鎖)時,內部的counter會自動減1。當釋放資源(鎖)的時,counter就會自動加1。
segmaphore主要有兩個方法,acquire()和release()方法。
1. acquire()方法
官方的定義:
def acquire(self, blocking=True, timeout=None):
當內部的counter(源碼實際上是用value變數保存)等於0的時候,其他線程acquire會阻塞。這個時候,之前向segmaphore發出請求並獲得鎖的線程,它們如果同時執行完任務並希望釋放鎖時,那麼鎖的釋放是隨機的。任何一個完成任務的線程都會釋放鎖,這個順序跟線程向請求的時間和任務完成的時間是沒有任何關係的。
參數的解析:
1)blocking:預設為True,當線程請求不到資源的時候,會阻塞等待。如果設置為False,則線程請求不到資源時不會阻塞。
2)timeout:如果設置blocking = True,即預設值時,經過timeout時間會退出阻塞。
2. release()方法
這個方法與Lock的release方法很像,具體可以看看我之前寫的關於鎖的一篇文章。
源碼:
解析:
當一個實例請求釋放鎖的時候,segmaphore內部的_value會自動加1,同時調用notify方法,將被鎖住的線程‘喚醒’。
三、關於Event類
閱讀源碼知道,Event是也基於Condition和Lock實現的
1. set()方法
在 python--線程同步原語 這篇文章我曾經寫過一個案例,在進程中調用一次event.set()函數就可以一次性通知(釋放)所有阻塞的等待的鎖。其內部實現的原理在這裡,最關鍵的一個方法是notify_all()。
在調用set()方法的時候,方法內部會將_flags設置為True,即等待的事件會退出阻塞。
2. clear()方法
clear()方法不同作太多解析了,就是內部的_flags重新設置為False
3. wait()方法
wait()方法的定義:
def wait(self, timeout=None):
內部實現:
原理還是很簡單的,實際上Event的wait()方法正是調用了Condition中的實例方法wait()。調用wait()方法的時候可以傳入參數timeout(超時時間),作為Event事件自動退出阻塞的時間界限。