多線程 線程 線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以併發多個線程,每條線程並行執行不同的任務。 進程 程式的執行實例稱為進程。 每個進程提供執行程式所需的資源。進程具有虛擬地址空間、可執行代碼、系 ...
多線程
線程
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以併發多個線程,每條線程並行執行不同的任務。
進程
程式的執行實例稱為進程。
每個進程提供執行程式所需的資源。進程具有虛擬地址空間、可執行代碼、系統對象的打開句柄、安全上下文、唯一進程標識符、環境變數、優先順序類、最小和最大工作集大小以及至少一個執行線程。每個進程都是用一個線程(通常稱為主線程)啟動的,但是可以從它的任何線程創建額外的線程。
線程和進程的區別
線程共用創建它的進程的地址空間;進程有自己的地址空間。
線程可以直接訪問其進程的數據段;進程有自己的父進程數據段副本。
線程可以直接與其進程的其他線程通信;進程必須使用進程間通信來與兄弟進程通信。
新線程很容易創建;新進程需要父進程的重覆。
線程可以對同一進程的線程進行相當大的控制;進程只能對子進程進行控制。
對主線程的更改(取消、優先順序更改等)可能會影響進程的其他線程的行為;對父進程的更改不會影響子進程。
線程的創建
import threading def foo(num): print('running on thread ',num)if __name__=='__main__': t1 = threading.Thread(target=foo, args=(1,)) # 生成一個線程實例 t1.start() # 啟動線程
通過繼承的方式創建線程
import threading class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): # 定義每個線程要運行的函數,將threading.Thread中的run方法進行了重載 print("running on number:%s" % self.num) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
併發
創建兩個線程來同時併發。
import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): # 定義每個線程要運行的函數 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': begin = time.time() t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() end = time.time() print(end-begin)
這樣輸出的是
running on number:1
running on number:20.002995729446411133
期間只用了零點幾秒,幾乎是同時輸出的,這樣可以看出它們是同時進行的。要是是串列的就應該是3.XXXXX秒了。
join()方法
在子線程完成運行之前,這個子線程的父線程將一直被阻塞。即一個線程使用join()方法後,必須等該線程結束後才執行join()之後的代碼。
import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): # 定義每個線程要運行的函數 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': begin = time.time() t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() t1.join() t2.join() end = time.time() print(end-begin)
這樣輸出的是
running on number:1
running on number:2
3.005915880203247
這樣就相當於串列了。
守護線程setDaemon
將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程式會被無限掛起。這個方法基本和join是相反的。當主線程完成時不需要某個子線程完全運行完就要退出程式,那麼就可以將這個子線程設置為守護線程,setDaemon(True)。
import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): # 定義每個線程要運行的函數 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': begin = time.time() t1 = MyThread(1) t2 = MyThread(2) t1.setDaemon(True) t2.setDaemon(True) t1.start() t2.start() end = time.time() print(end-begin)
這樣當主線程完成了之後就會退出,不會等待子線程。
running on number:1
running on number:20.0020024776458740234
同步鎖(Lock)
import time import threading def addNum(): global num #在每個線程中都獲取這個全局變數 # num-=1 temp=num print('--get num:',num ) time.sleep(0.001) num =temp-1 #對此公共變數進行-1操作 num = 100 #設定一個共用變數 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('final num:', num )
但這樣最後獲得的final num往往不是0,而是其他數,這是因為多個線程在time.sleep()的時候同時拿到了num,所以num是同一個數,而解決方法就是加鎖。
死鎖與遞歸鎖(RLock)
死鎖
import time import threading lock1 = threading.Lock() lock2 = threading.Lock() class Mythread(threading.Thread): def run(self): self.f1() self.f2() def f1(self): lock1.acquire() print('%s 拿到一號鎖' %self.name) lock2.acquire() print('%s 拿到二號鎖' % self.name) time.sleep(2) lock1.release() lock2.release() def f2(self): lock2.acquire() print('%s 拿到二號鎖' %self.name) lock1.acquire() print('%s 拿到一號鎖' % self.name) lock2.release() lock1.release() if __name__ == '__main__': for i in range(5): t=Mythread() t.start() #Thread-1 拿到一號鎖 Thread-1 拿到二號鎖 Thread-1 拿到二號鎖Thread-2 拿到一號鎖
這裡開了5個線程,可是卻阻塞住了,原因是在Thread1拿到二號鎖,Thread2拿到一號鎖時,f2中在等待一號鎖,f1在等待二號鎖,結果都等不到,所以產生了死鎖。
RLock
import time import threading lock = threading.RLock() class Mythread(threading.Thread): def run(self): self.f1() self.f2() def f1(self): lock.acquire() print('%s 拿到一號鎖' %self.name) lock.acquire() print('%s 拿到二號鎖' % self.name) time.sleep(2) lock.release() lock.release() def f2(self): lock.acquire() print('%s 拿到二號鎖' %self.name) lock.acquire() print('%s 拿到一號鎖' % self.name) lock.release() lock.release() if __name__ == '__main__': for i in range(5): t=Mythread() t.start() ''' Thread-1 拿到一號鎖 Thread-1 拿到二號鎖 Thread-2 拿到一號鎖 Thread-2 拿到二號鎖 Thread-3 拿到一號鎖 Thread-3 拿到二號鎖 Thread-4 拿到一號鎖 Thread-4 拿到二號鎖 Thread-5 拿到一號鎖 Thread-5 拿到二號鎖 Thread-1 拿到二號鎖 Thread-1 拿到一號鎖 Thread-2 拿到二號鎖 Thread-2 拿到一號鎖 Thread-3 拿到二號鎖 Thread-3 拿到一號鎖 Thread-4 拿到二號鎖 Thread-4 拿到一號鎖 Thread-5 拿到二號鎖 Thread-5 拿到一號鎖 '''
為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
信號量
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器為 0時,acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。(類似於停車位的概念)
BoundedSemaphore與Semaphore的唯一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將拋出一個異常。
import time import threading s = threading.BoundedSemaphore(3) class Mythread(threading.Thread): def run(self): s.acquire() print(self.name) time.sleep(2) s.release() if __name__ == '__main__': t=[] for i in range(10): t.append(Mythread()) for i in t: i.start() ''' Thread-1 Thread-2 Thread-3 Thread-4Thread-6Thread-5 Thread-7 Thread-8Thread-9 Thread-10 '''
可以看到幾乎是三個三個同時輸出的。
條件變數同步
有一類線程需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition 對象用於條件變數線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳入鎖,對象自動創建一個RLock()。
wait():條件不滿足時調用,線程會釋放鎖併進入等待阻塞;
notify():條件創造後調用,通知等待池激活一個線程;
notifyAll():條件創造後調用,通知等待池激活所有線程。
import time import threading import random lock = threading.Condition() class Producer(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = name def run(self): global product while True: if lock.acquire(): p = random.randint(1,10) print('機器%s生產了%d件產品'%(self.name,p)) product+=p lock.notify() lock.release() time.sleep(2) class Consumer(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = name def run(self): global product while True: if lock.acquire(): if product>=1: print('客戶%s購買了1件產品'%self.name) product -=1 lock.notify() lock.release() time.sleep(2) if __name__=="__main__": product = 0 t = [] for i in range(5): t.append(Producer(i)) t.append((Consumer(1))) t.append(Consumer(2)) for x in t: x.start() ''' 機器0生產了10件產品 機器1生產了4件產品 機器2生產了1件產品 機器3生產了2件產品 機器4生產了3件產品 客戶1購買了1件產品 客戶2購買了1件產品 機器0生產了8件產品 機器1生產了7件產品 機器2生產了5件產品 機器3生產了1件產品 機器4生產了4件產品 客戶2購買了1件產品 客戶1購買了1件產品 機器0生產了8件產品 機器1生產了4件產品 機器2生產了5件產品 機器3生產了1件產品 機器4生產了1件產品 客戶1購買了1件產品 客戶2購買了1件產品 機器0生產了5件產品 機器1生產了2件產品 '''
wait等待notify的通知,當接到通知後,會重新從if acquire()開始執行
同步條件(Event)
event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態,等待操作系統調度; event.clear():恢復event的狀態值為False。
import time import threading import random event1=threading.Event() event2=threading.Event() event3=threading.Event() class Producer(threading.Thread): def run(self): event1.wait() print('salesman:這個東西100塊。') print('salesman:你要嗎?') event1.clear() event2.set() event3.wait() print('salesman:好的') event3.clear() class Consumer(threading.Thread): def run(self): print('customer:這個東西多少錢?') event1.set() event2.wait() print('customer:嗯,幫我包起來') event2.clear() event3.set() if __name__=="__main__": t = [] t.append(Producer()) t.append(Consumer()) for x in t: x.start() ''' customer:這個東西多少錢? salesman:這個東西100塊。 salesman:你要嗎? customer:嗯,幫我包起來 salesman:好的 '''
隊列queue
創建一個“隊列”對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。
將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,預設為
1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。
將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,預設為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
Python Queue模塊有三種隊列及構造函數:
1、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize)
2、LIFO類似於堆,即先進後出。 class queue.LifoQueue(maxsize)
3、還有一種是優先順序隊列級別越低越先出來。 class queue.PriorityQueue(maxsize)
此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味著等到隊列為空,再執行別的操作
import threading,queue from time import sleep from random import randint class Production(threading.Thread): def run(self): while True: r=randint(0,100) q.put(r) print("生產出來%s號包子"%r) sleep(1) class Proces(threading.Thread): def run(self): while True: re=q.get() print("吃掉%s號包子"%re) if __name__=="__main__": q=queue.Queue(10) threads=[Production(),Production(),Production(),Proces()] for t in threads: t.start() ''' 生產出來55號包子 生產出來100號包子 生產出來67號包子 吃掉55號包子 吃掉100號包子 吃掉67號包子 生產出來23號包子生產出來97號包子生產出來41號包子吃掉23號包子 吃掉97號包子 吃掉41號包子 吃掉4號包子 生產出來4號包子 生產出來45號包子吃掉45號包子生產出來84號包子 '''