上一篇博客講了進程、線程、協程和GIL的基本概念,這篇我們來說說在以下三點: 1> python中使用threading庫來創建線程的兩種方式 2> 使用Event對消來判斷線程是否已啟動 3> 使用Semaphore和BoundedSemaphore兩個類分別來控制線程的併發數以及二者之間的區別。 ...
上一篇博客講了進程、線程、協程和GIL的基本概念,這篇我們來說說在以下三點:
1> python中使用threading庫來創建線程的兩種方式
2> 使用Event對消來判斷線程是否已啟動
3> 使用Semaphore和BoundedSemaphore兩個類分別來控制線程的併發數以及二者之間的區別。
如果想要瞭解基本概念,請移步我的上一篇博客:https://www.cnblogs.com/ss-py/p/10236125.html
正文:
利用threading庫來在創建一個線程:
from threading import Thread def run(name): print('我是: %s' % (name)) if __name__ == '__main__': t = Thread(target=run, args=('線程1號',)) t.start()
運行結果:
首先,創建一個Thread線程,並用target=run來指定這個線程需要執行那個run方法,然後將run方法所需的參數以args參數的形式傳遞過去,
請註意,args所傳的參數是一個元組(tuple)類型,因此即使元組中只有一個參數,也要在這個參數後再加一個逗號,如 args=('線程1號',)
而且,當我們創建一個線程對象後,這個線程並不會立即執行,除非調用它的star()方法,當調用star()方法後,這個線程會調用你使用target傳給他的函數,
並將args中的參數傳遞給這個函數。
Python中的線程會在一個單獨的系統級線程中執行(如一個POSIX線程或一個Windows線程),這些線程全部由操作系統進行管理,線程一旦啟動,
它將獨立運行直至目標函數運行結束。
我們可以調用is_alive()方法來進行判斷該線程是否在運行(is_alive()方法返回True或者False):
我們知道,進程是依賴於線程來執行的,所以當我們的py文件在執行時,我們可以理解為有一個主線程在執行,當我們創建一個子線程時,就相當於當前程式一共有兩個線程在執行。當子線程被創建後,主線程和子線程就獨立運行,相互並不影響。
代碼如下:
1 import time 2 from threading import Thread 3 4 def run(name): 5 time.sleep(2) 6 print('我是: %s' % (name)) 7 8 if __name__ == '__main__': 9 t = Thread(target=run, args=('子線程',)) 10 t.start() 11 print('我是主線程')
執行結果:
在代碼的第9行,創建了一個線程t,讓它來執行run()方法,這時,程式中就有了兩個線程同時存在、各自獨立運行,預設的,主線程是不會等待子線程的運算結果的,所以主線程繼續向下執行,列印L“我是主線程”,而子線程在調用run()方法時sleep了兩秒鐘,之後才列印出“我是子線程”
當然,我們可以手動的調用join()方法來讓主線程等待子線程運行結束後再向下執行:
1 import time 2 from threading import Thread 3 4 def run(name): 5 time.sleep(2) 6 print('我是: %s' % (name)) 7 8 if __name__ == '__main__': 9 t = Thread(target=run, args=('子線程',)) 10 t.start() 11 t.join() 12 print('我是主線程')
這時,程式在進行到第十行後,主線程就卡住了,它在等待子線程運行結束,檔子線程運行結束後,主線程才會繼續向下運行,直至程式退出。
但是,但是,無論主線程等不等待子線程,Python解釋器都會等待所有的線程都終止後才會退出。也就是說,無論主線程等不等待子線程,這個程式最終都
會運行兩秒多,因為子線程sleep了兩秒。
所以,當遇到需要長時間運行的線程或者是需要一直在後臺運行的線程時,可以將其設置為後臺線程(守護線程)daemon=True,如:
t = Thread(target=run, args=('子線程',), daemon=True)
守護線程,顧名思義是守護主線程的線程,他們是依賴於主線程而存在的,當主線程執行結束後,守護線程會被立即註銷,無論該線程是否執行結束。
當然,我們也可以利用join()來使主線程等待主線程。
使用threading庫來創建線程還有一種方式:
from threading import Thread class CreateThread(Thread): def __init__(self): super().__init__() def run(self): print('我是子線程!') t = CreateThread() t.start()
在開始t = Thread(target=run, args=('子線程',))這種方式調用方法時子線程調用的run方法的這個方法名是我隨便起的,實際上叫什麼都行,
但是以繼承Thread類方式實現線程時,線程調用的方法名必須是run() 這個是程式寫死的。
使用Event對象判斷線程是否已經啟動
threading庫中的Event對象包含一個可由線程來設置的信號標誌,它允許線程等待某些事件的發生。
初始狀態時,event對象中的信號標誌被設置為假,如果有一個線程等待event對象,且這個event對象的標誌為假,那麼這個線程就會一直阻塞,直到該標誌為真。如果將一個event對象的標誌設置為真,他將喚醒所有等待這個標誌的線程,如果一個線程等待一個被設置為真得Event對象,那麼它將忽略這個事件,繼續向下執行。
Event (事件) 定義了一個全局的標誌Flag,如果Flag為False,當程式執行event.wait()時就會阻塞,當Flag為True時,程式執行event.wait()時便不會阻塞:
event.set(): 將標誌Flag設置為True, 並通知所有因等待該標誌而處於阻塞狀態的線程恢復運行。
event.clear(): 將標誌Flag設置為False
event.wait(): 判斷當前標誌狀態,如果是True則立即返回,否則線程繼續阻塞。
event.isSet(): 獲取標誌Flag狀態: 返回True或者False
1 from threading import Thread, Event 2 3 def run(num, start_evt): 4 if int(num) >10: 5 start_evt.set() 6 else: 7 start_evt.clear() 8 start_evt = Event() 9 10 if __name__ == '__main__': 11 num = input("請輸入數字>>>") 12 t = Thread(target=run, args=(num, start_evt,)) 13 t.start() 14 start_evt.wait() # 主線程獲取Event對象的標誌狀態,若為True,則主線程繼續執行,否則,主線程阻塞 15 print("主線程繼續執行!")
上邊這段代碼:當輸入的數字大於10時,將標誌設置為True,主程式繼續執行,當小於或者等於10時,將標誌設為False(預設為False),主線程阻塞。
值得註意的是:當Event對象的標誌被設置為True時,他會喚醒所有等待他的線程,如果只想喚醒某一個線程,最好使用信號量。
信號量
信號量,說白了就是一個計數器,用來控制線程的併發數,每次有線程獲得信號量的時候(即acquire())計數器-1,釋放信號量時候(release())計數器+1,計數器為0的時候其它線程就被阻塞無法獲得信號量
acquire() # 設置一個信號量
release() # 釋放一個信號量
python中有兩個類實現了信號量:(Semaphore和BoundedSemaphore)
Semaphore和BoundedSemaphore的相同之處:
通過: threading.Semaphore(3) 或者 threading.BoundedSemaphore(3) 來設置初始值為3的計數器
執行acquire() 計數器-1,執行release() 計數器+1,當計數器為0時,其他線程均無法再獲得信號量從而阻塞
import threading se = threading.BoundedSemaphore(3) for i in range(5): se.acquire() print('信號量被設置') for j in range(10): se.release() print('信號量被釋放了')
執行結果:
import threading se = threading.Semaphore(3) for i in range(5): se.acquire() print('信號量被設置') for j in range(10): se.release() print('信號量被釋放了')
執行結果:
可以看到,無論我們用那個類創建信號量,當計數器被減為0時,其他線程均會阻塞。
這個功能經常被用來控制線程的併發數:
沒有設置信號量:
import time import threading num = 3 def run(): time.sleep(2) print(time.time()) if __name__ == '__main__': t_list = [] for i in range(20): t = threading.Thread(target=run) t_list.append(t) for i in t_list: i.start()
執行結果:20個線程幾乎在同時執行,,如果主機在執行IO密集型任務時執行這種程式時,主機有可能會宕機,
但是在設置了信號量時,我們可以來控制同一時間同時運行的線程數:
import time import threading num = 3 def run(): se.acquire() # 添加信號量 time.sleep(2) print(time.time()) se.release() # 釋放一個信號量 if __name__ == '__main__': t_list = [] se = threading.Semaphore(5) # 設置一個大小為5的計數器(同一時間,最多允許5個線程在運行)
# se = threading.BoundedSemaphore(5)
for i in range(20): t = threading.Thread(target=run) t_list.append(t) for i in t_list: i.start()
這時,我們給程式加上信號量,控制它在同一時間內,最多只有5個線程在運行。
兩者之間的差異性:
當計數器達到設定好的上線時,BoundedSemaphore就無法進行release()操作了,Semaphore沒有這個限制,它會拋出異常。
import threading se = threading.Semaphore(3) for i in range(3): # 將計數器值減為0 se.acquire(3) for j in range(5): # 將計數器值加至5 se.release() print('信號量被釋放了')
運行結果:
import threading se = threading.BoundedSemaphore(3) for i in range(3): # 將計數器值減為0 se.acquire(3) for j in range(5): # 將計數器值加至5 se.release() print('信號量被釋放了')
運行結果:
拋異常了:信號量被釋放太多次。。。
好了,這篇文章的就先寫到這裡,下一篇文章我會講解關於線程間通信、線程加鎖等問題
想瞭解更多關於Python、爬蟲的信息,歡迎關註我的個人微信公眾號: