什麼是進程(process)? 程式並不能單獨運行,只有將程式裝載到記憶體中,系統為它分配資源才能運行,而這種執行的程式就稱之為進程。程式和進程的區別就在於,程式是指令的集合,它是進程運行的靜態描述文本;進程是程式的一次執行活動,屬於動態概念。 什麼是線程(thread)? 線程是操作系統能夠進行運算... ...
什麼是進程(process)?
程式並不能單獨運行,只有將程式裝載到記憶體中,系統為它分配資源才能運行,而這種執行的程式就稱之為進程。程式和進程的區別就在於,程式是指令的集合,它是進程運行的靜態描述文本;進程是程式的一次執行活動,屬於動態概念。
什麼是線程(thread)?
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以併發多個線程,每條線程並行執行不同的任務。
進程與線程的區別?
線程共用記憶體空間,進程的記憶體是獨立的。
同一個進程的線程之間可以直接交流,但兩個進程相互通信必須通過一個中間代理。
創建一個新的線程很簡單,創建一個新的進程需要對其父進程進行一次克隆。
一個線程可以控制和操作同一進程里的其他線程,但是進程只能操作子進程。
Python GIL(Global Interpreter Lock)
無論開啟多少個線程,有多少個CPU,python在執行的時候在同一時刻只允許一個線程允許。
Python threading模塊
直接調用
- import threading,time
- def run_num(num):
- """
- 定義線程要運行的函數
- :param num:
- :return:
- """
- print("running on number:%s"%num)
- time.sleep(3)
- if __name__ == '__main__':
- # 生成一個線程實例t1
- t1 = threading.Thread(target=run_num,args=(1,))
- # 生成一個線程實例t2
- t2 = threading.Thread(target=run_num,args=(2,))
- # 啟動線程t1
- t1.start()
- # 啟動線程t2
- t2.start()
- # 獲取線程名
- print(t1.getName())
- print(t2.getName())
- 輸出:
- running on number:1
- running on number:2
- Thread-1
- Thread-2
繼承式調用
- import threading,time
- class MyThread(threading.Thread):
- def __init__(self,num):
- threading.Thread.__init__(self)
- self.num = num
- # 定義每個線程要運行的函數,函數名必須是run
- def run(self):
- print("running on number:%s"%self.num)
- time.sleep(3)
- if __name__ == '__main__':
- t1 = MyThread(1)
- t2 = MyThread(2)
- t1.start()
- t2.start()
- 輸出:
- running on number:1
- running on number:2
Join and Daemon
Join
Join的作用是阻塞主進程,無法執行join後面的程式。
多線程多join的情況下,依次執行各線程的join方法,前面一個線程執行結束才能執行後面一個線程。
無參數時,則等待該線程結束,才執行後續的程式。
設置參數後,則等待該線程設定的時間後就執行後面的主進程,而不管該線程是否結束。
- import threading,time
- class MyThread(threading.Thread):
- def __init__(self,num):
- threading.Thread.__init__(self)
- self.num = num
- # 定義每個線程要運行的函數,函數名必須是run
- def run(self):
- print("running on number:%s"%self.num)
- time.sleep(3)
- print("thread:%s"%self.num)
- if __name__ == '__main__':
- t1 = MyThread(1)
- t2 = MyThread(2)
- t1.start()
- t1.join()
- t2.start()
- t2.join()
- 輸出:
- running on number:1
- thread:1
- running on number:2
- thread:2
設置參數效果如下:
- if __name__ == '__main__':
- t1 = MyThread(1)
- t2 = MyThread(2)
- t1.start()
- t1.join(2)
- t2.start()
- t2.join()
- 輸出:
- running on number:1
- running on number:2
- thread:1
- thread:2
Daemon
預設情況下,主線程在退出時會等待所有子線程的結束。如果希望主線程不等待子線程,而是在退出時自動結束所有的子線程,就需要設置子線程為後臺線程(daemon)。方法是通過調用線程類的setDaemon()方法。
- import time,threading
- def run(n):
- print("%s".center(20,"*")%n)
- time.sleep(2)
- print("done".center(20,"*"))
- def main():
- for i in range(5):
- t = threading.Thread(target=run,args=(i,))
- t.start()
- t.join(1)
- print("starting thread",t.getName())
- m = threading.Thread(target=main,args=())
- # 將main線程設置位Daemon線程,它作為程式主線程的守護線程,當主線程退出時,m線程也會退出,由m啟動的其它子線程會同時退出,不管是否執行完成
- m.setDaemon(True)
- m.start()
- m.join(3)
- print("main thread done".center(20,"*"))
- 輸出:
- *********0*********
- starting thread Thread-2
- *********1*********
- ********done********
- starting thread Thread-3
- *********2*********
- **main thread done**
線程鎖(互斥鎖Mutex)
一個進程下可以啟動多個線程,多個線程共用父進程的記憶體空間,也就意味著每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據就需要線程鎖。
- import time,threading
- def addNum():
- # 在每個線程中都獲取這個全局變數
- global num
- print("--get num:",num)
- time.sleep(1)
- # 對此公共變數進行-1操作
- num -= 1
- # 設置一個共用變數
- num = 100
- thread_list = []
- for i in range(100):
- t = threading.Thread(target=addNum)
- t.start()
- thread_list.append(t)
- # 等待所有線程執行完畢
- for t in thread_list:
- t.join()
- print("final num:",num)
加鎖版本
Lock時阻塞其他線程對共用資源的訪問,且同一線程只能acquire一次,如多於一次就出現了死鎖,程式無法繼續執行。
- import time,threading
- def addNum():
- # 在每個線程中都獲取這個全局變數
- global num
- print("--get num:",num)
- time.sleep(1)
- # 修改數據前加鎖
- lock.acquire()
- # 對此公共變數進行-1操作
- num -= 1
- # 修改後釋放
- lock.release()
- # 設置一個共用變數
- num = 100
- thread_list = []
- # 生成全局鎖
- lock = threading.Lock()
- for i in range(100):
- t = threading.Thread(target=addNum)
- t.start()
- thread_list.append(t)
- # 等待所有線程執行完畢
- for t in thread_list:
- t.join()
- print("final num:",num)
GIL VS Lock
GIL保證同一時間只能有一個線程來執行。lock是用戶級的lock,與GIL沒有關係。
RLock(遞歸鎖)
Rlock允許在同一線程中被多次acquire,線程對共用資源的釋放需要把所有鎖都release。即n次acquire,需要n次release。
- def run1():
- print("grab the first part data")
- lock.acquire()
- global num
- num += 1
- lock.release()
- return num
- def run2():
- print("grab the second part data")
- lock.acquire()
- global num2
- num2 += 1
- lock.release()
- return num2
- def run3():
- lock.acquire()
- res = run1()
- print("between run1 and run2".center(50,"*"))
- res2 = run2()
- lock.release()
- print(res,res2)
- if __name__ == '__main__':
- num,num2 = 0,0
- lock = threading.RLock()
- for i in range(10):
- t = threading.Thread(target=run3)
- t.start()
- while threading.active_count() != 1:
- print(threading.active_count())
- else:
- print("all threads done".center(50,"*"))
- print(num,num2)
這兩種鎖的主要區別是,RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。註意,如果使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所占用的鎖。
Semaphore(信號量)
互斥鎖同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據,比如售票處有3個視窗,那最多只允許3個人同時買票,後面的人只能等前面任意視窗的人離開才能買票。
- import threading,time
- def run(n):
- semaphore.acquire()
- time.sleep(1)
- print("run the thread:%s"%n)
- semaphore.release()
- if __name__ == '__main__':
- # 最多允許5個線程同時運行
- semaphore = threading.BoundedSemaphore(5)
- for i in range(20):
- t = threading.Thread(target=run,args=(i,))
- t.start()
- while threading.active_count() != 1:
- # print(threading.active_count())
- pass
- else:
- print("all threads done".center(50,"*"))
Timer(定時器)
Timer隔一定時間調用一個函數,如果想實現每隔一段時間就調用一個函數,就要在Timer調用的函數中,再次設置Timer。Timer是Thread的一個派生類。
- import threading
- def hello():
- print("hello,world!")
- # delay 5秒之後執行hello函數
- t = threading.Timer(5,hello)
- t.start()
Event
Python提供了Event對象用於線程間通信,它是有線程設置的信號標誌,如果信號標誌位為假,則線程等待指導信號被其他線程設置為真。Event對象實現了簡單的線程通信機制,它提供了設置信號、清除信號、等待等用於實現線程間的通信。
設置信號
使用Event的set()方法可以設置Event對象內部的信號標誌為真。Event對象提供了isSet()方法來判斷其內部信號標誌的轉態,當使用event對象的set()方法後,isSet()方法返回真。
清除信號
使用Event的clear()方法可以清除Event對象內部的信號標誌,即將其設為假,當使用Event的clear()方法後,isSet()方法返回假。
等待
Event的wait()方法只有在內部信號為真的時候才會很快的執行並完成返回。當Event對象的內部信號標誌為假時,則wait()方法一直等待其為真時才返回。
通過Event來實現兩個或多個線程間的交互,下麵以紅綠燈為例,即啟動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅停綠行的規則。
- import threading,time,random
- def light():
- if not event.isSet():
- event.set()
- count = 0
- while True:
- if count < 5:
- print("\033[42;1m--green light on--\033[0m".center(50,"*"))
- elif count < 8:
- print("\033[43;1m--yellow light on--\033[0m".center(50,"*"))
- elif count < 13:
- if event.isSet():
- event.clear()
- print("\033[41;1m--red light on--\033[0m".center(50,"*"))
- else:
- count = 0
- event.set()
- time.sleep(1)
- count += 1
- def car(n):
- while 1:
- time.sleep(random.randrange(10))
- if event.isSet():
- print("car %s is running..."%n)
- else:
- print("car %s is waiting for the red light..."%n)
- if __name__ == "__main__":
- event = threading.Event()
- Light = threading.Thread(target=light,)
- Light.start()
- for i in range(3):
- t = threading.Thread(target=car,args=(i,))
- t.start()
queue隊列
Python中隊列是線程間最常用的交換數據的形式。Queue模塊是提供隊列操作的模塊。
創建一個隊列對象
- import queue
- q = queue.Queue(maxsize = 10)
queue.Queue類是一個隊列的同步實現。隊列長度可以無限或者有限。可以通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1表示隊列長度無限。
將一個值放入隊列中
- q.put("a")
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,預設為1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put()方法將引發Full異常。
將一個值從隊列中取出
- q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,預設為True。如果隊列為空且block為True,get()就使調用線程暫停,直到有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
Python Queue模塊有三種隊列及構造函數
- # 先進先出
- class queue.Queue(maxsize=0)
- # 先進後出
- class queue.LifoQueue(maxsize=0)
- # 優先順序隊列級別越低越先出
- class queue.PriorityQueue(maxsize=0)
常用方法
- q = queue.Queue()
- # 返回隊列的大小
- q.qsize()
- # 如果隊列為空,返回True,反之False
- q.empty()
- # 如果隊列滿了,返回True,反之False
- q.full()
- # 獲取隊列,timeout等待時間
- q.get([block[,timeout]])
- # 相當於q.get(False)
- q.get_nowait()
- # 等到隊列為空再執行別的操作
- q.join()
生產者消費者模型
在開發編程中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程式的整體處理數據的速度。
為什麼要使用生產者和消費者模式
線上程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為瞭解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不再等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
最基本的生產者消費者模型的例子。
- import queue,threading,time
- q = queue.Queue(maxsize=10)
- def Producer():
- count = 1
- while True:
- q.put("骨頭%s"%count)
- print("生產了骨頭",count)
- count += 1
- def Consumer(name):
- while q.qsize() > 0:
- print("[%s] 取到[%s]並且吃了它..."%(name,q.get()))
- time.sleep(1)
- p = threading.Thread(target=Producer,)
- c1 = threading.Thread(target=Consumer,args=("旺財",))
- c2 = threading.Thread(target=Consumer,args=("來福",))
- p.start()
- c1.start()
- c2.start()