併發編程 併發(偽):由於執行速度特別快,人感覺不到 並行(真):創建10個人同時操作 線程 1. 單進程,單線程的應用程式 print('666') 2. 到底什麼是線程?什麼是進程 Python自己沒有這玩意,Python中調用的操作系統的線程和進程(偽線程) 3. 多線程 工作的最小單元 共用 ...
併發編程
- 併發(偽):由於執行速度特別快,人感覺不到
- 並行(真):創建10個人同時操作
線程
- 單進程,單線程的應用程式
- print('666')
- 到底什麼是線程?什麼是進程
- Python自己沒有這玩意,Python中調用的操作系統的線程和進程(偽線程)
- 多線程
- 工作的最小單元
- 共用進程中所有資源
- 每個線程可以分擔一點任務,最終完成最後的結果
python多線程原理:python的多線程實際是一個假的多線程(多個線程在1核CPU上運行,進行快速的切換導致錯覺為同時執行)
代碼
import threading
def func(arg):
print(arg)
th = threading.Thread(target=func)
th.start()
print('end')
一個應用程式:軟體
- 預設一個程式只有一個進程
- 可以有多個進程(預設只有一個),一個進程可以創建多個線程(預設一個)。
Python多線程情況下:
- 計算密集型操作:效率低(GIL鎖)
- IO操作:效率高
Python多進程情況下:
- 計算密集型操作:效率高(浪費空間)
- IO操作:效率高(浪費資源)
以後寫Python時:
- IO密集型用多線程:文件/輸入輸出/socket
- 計算密集型用多進程:
進程
- 獨立開闢記憶體
- 進程之間的數據隔離
註意:進程是為了提供環境讓線程工作
Python中線程和進程(GIL鎖)
GIL鎖,全局解釋器鎖。用於限制一個進程中同一個時刻只有一個線程被CPU調度
擴展:預設GIL鎖在執行100個cpu指令後(過期時間)。
線程的使用
多線程基本使用
- 基礎例子
import threading
def func(arg):
print(arg)
th = threading.Thread(target=func)
th.start()
print('end')
- 測試例子(主線程預設會先執行自己的代碼,然後等子線程執行完畢後,才會結束)
import time
import threading
def func(arg):
time.sleep(arg)
print(arg)
t1 = threading.Thread(target=func,args=(3,))
t1.start()
t2 = threading.Thread(target=func,args=(9,))
t2.start()
print('end')
- jojn方法:
import time
import threading
def func(arg):
time.sleep(3)
print(arg)
print('開始執行t1')
t1 = threading.Thread(target=func,args=(3,))
t1.start()
# 無參:讓主線程等待,等到子線程t1執行完畢後,才能往下執行
# 有參:讓主線程在這裡最多等待n秒,無論執行完畢與否,會繼續往下走
t1.join(2)
print('開始執行t2')
t2 = threading.Thread(target=func,args=(9,))
t2.start()
t2.join() # 讓主線程等待,等到子線程t1執行完畢後,才能往下執行
print('end')
- 線程名稱獲取
import threading
def func(arg):
# 獲取當前執行該函數的線程的對象
t = threading.current_thread()
# 根據當前線程對象,獲取當前線程名稱
name = t.getName()
print(name,arg)
t1 = threading.Thread(target=func,args=(3,))
t1.setName('mhy')
t1.start()
t2 = threading.Thread(target=func,args=(9,))
t2.setName('zz')
t2.start()
print('end')
- 線程本質
# 先列印3還是end?
import threading
def func(arg):
print(arg)
t1 = threading.Thread(target=func,args=(3,))
# start是開始線程嘛?不是
# start是告訴cpu,我已準備就緒,你可以調度我了。
t1.start()
print('end')
- 面向對象式子多線程使用
import threading
class MyThread(threading.Thread):
def run(self):
print(123,self._args,self._kwargs)
t1 = MyThread(args=(11,))
t1.start()
t2 = MyThread(args=(12,))
t2.start()
- 計算密集型多線程(作用不大)
import threading
def func(lis,num):
et = [i+num for i in lis]
print(et)
t1 = threading.Thread(target=func,args=([11,22,33],1))
t1.start()
t2 = threading.Thread(target=func,args=([44,55,66],100))
t2.start()
- 實例代碼
import threading
import time
#定義類繼承Thread線程
class CodingThread(threading.Thread):
def run(self):
for x in range(3):
print('正在寫代碼%s' % threading.current_thread())
time.sleep(1)
class DrawingThread(threading.Thread):
def run(self):
for x in range(3):
print('正在畫圖%s' % threading.current_thread())
time.sleep(1)
def main():
t1 = CodingThread()
t2 = DrawingThread()
t1.start()
t2.start()
if __name__ == '__main__':
main()
線程安全(Lock)
- 線程安全,多線程操作時,內部會讓所有線程排隊處理,如:list/dict/Queue
- 線程不安全 + 人(LOCK) = 排隊處理
簡介:
Python自帶的解釋器是CPython。CPython解釋器的多線程實際上是一個假的多線程(在多核CPU中,只能利用一核,不能利用多核)。同一時刻只有一個線程在執行,為了保證同一時刻只有一個線程在執行,在CPython解釋器中有一個東西叫做GIL(Global Intepreter Lock),叫做全局解釋器鎖。這個解釋器鎖是有必要的。因為CPython解釋器的記憶體管理不是線程安全的。當然除了CPython解釋器,還有其他的解釋器,有些解釋器是沒有GIL鎖的,見下麵:
- Jython:用Java實現的Python解釋器。不存在GIL鎖。
- IronPython:用.net實現的Python解釋器。不存在GIL鎖。
- PyPy:用Python實現的Python解釋器。存在GIL鎖。
GIL雖然是一個假的多線程。但是在處理一些IO操作(比如文件讀寫和網路請求)還是可以在很大程度上提高效率的。在IO操作上建議使用多線程提高效率。在一些CPU計算操作上不建議使用多線程,而建議使用多進程。
不安全例子
import time
import threading
lit = []
def func(arg):
lit.append(arg)
time.sleep(0.04)
m = lit[-1]
print(arg,m) # m和arg應該是一個值
for num in range(10):
t1 = threading.Thread(target=func,args=(num,))
t1.start()
鎖機制原理
多線程同時執行,會導致數據同時執行,獲取數據不符合結果。
線程先獲得執行權時,將會將執行過程鎖起來,其他線程不能使用,必須要等該線程執行完,其他線程才可獲取執行權,執行線程,將解決數據不統一的方法
Lock 鎖(一次放行一個)
import time
import threading
lit = []
lock = threading.Lock() # 創建一個鎖
def func(arg):
lock.acquire() # 將該行代碼以下代碼鎖起來,直到遇到release釋放
lit.append(arg)
time.sleep(0.04)
m = lit[-1]
print(arg,m)
lock.release() # 釋放鎖
for num in range(10):
t1 = threading.Thread(target=func,args=(num,))
t1.start()
RLock 遞歸鎖(一次放行多個),
因為lock鎖如果有多層鎖機制,會造成死鎖
現象,所以有了Rlock
(遞歸鎖)
代碼如下:
import time
import threading
lit = []
lock = threading.RLock() # 創建一個遞歸鎖
def func(arg):
# Lock只能解開一個鎖,會造成死鎖線程
# RLock可以解開兩個鎖,解決了下麵問題
lock.acquire() # 將該行代碼以下代碼鎖起來,直到遇到release釋放
lock.acquire()
lit.append(arg)
time.sleep(0.04)
m = lit[-1]
print(arg,m)
lock.release() # 釋放鎖
lock.release() # 釋放鎖
for num in range(10):
t1 = threading.Thread(target=func,args=(num,))
t1.start()
BoundedSemaphore(一次放N個)信號量
import time
import threading
# 創建一個鎖,這個可以支持你同時進行幾次鎖,預設為1次
lock = threading.BoundedSemaphore(3)
def func(arg):
lock.acquire()
print(arg)
time.sleep(1)
lock.release()
for num in range(20):
t = threading.Thread(target=func,args=(num,))
t.start()
Condition(1次放x個數)動態輸入
Lock版本的生產者與消費者模式可以正常的運行,但是存在一個不足,在消費者中,總是通過While True死迴圈並且上鎖的方式去判斷錢夠不夠,上鎖是一個很耗費CPU資源的行為,因此這種方式不是最好的,還有一種更好的方式便是使用Threading.condition來實現,threading.condition可以在沒有數據的時候處於堵塞等待狀態,一旦有合適的數據了,還可以使用notify相關的函數來通知其他處於等待狀態的線程。這樣可以不用做一些無用的上鎖和解鎖的操作。可以提高程式的性能。首先對threading.Condition相關的函數做個介紹,threading.Condition類似threading.Lock,可以在修改全局數據的時候進行上鎖,也可以在修改完畢後進行解鎖。以下將一些常用的函數做個簡單的介紹:
- acquire:上鎖
- release:解鎖
- wait:將當前線程處於等待狀態,並且會釋放鎖。可以被其他線程使用notify和notify_all函數喚醒。被喚醒後繼續等待上鎖,上鎖後執行下麵的代碼。
- notify:通知某個等待的線程,預設是第1個等待的線程。
- notify_all:通知正在等待的線程。notify和notify_all不會釋放鎖。並且需要在release之前調用。
# 方法一
import time
import threading
# 創建一個鎖,這個可以支持你同時進行幾次鎖,預設為1次
lock = threading.Condition()
def func(arg):
print('線程進來了')
lock.acquire()
lock.wait()
print(arg)
time.sleep(1)
lock.release()
for num in range(3):
t = threading.Thread(target=func,args=(num,))
t.start()
while True:
num = int(input('>>>:'))
lock.acquire()
lock.notify(num)
lock.release()
# 方法二
import time
import threading
# 創建一個鎖,這個可以支持你同時進行幾次鎖,預設為1次
lock = threading.Condition()
def xxx():
print('來執行函數了')
input('>>>:')
return True
def func(arg):
print('線程進來了')
lock.wait_for(xxx)
print(arg)
time.sleep(1)
for num in range(3):
t = threading.Thread(target=func,args=(num,))
t.start()
event(事件)1次放所有
import threading
# 創建一個鎖,這個可以支持你同時進行幾次鎖,預設為1次
lock = threading.Event()
def func(arg):
print('線程進來了')
lock.wait() # 變紅燈
print(arg)
for num in range(10):
t = threading.Thread(target=func,args=(num,))
t.start()
input('>>>')
lock.set() # 變綠燈
input('>>>')
# 重新回歸 紅燈狀態
lock.clear()
for num in range(10):
t = threading.Thread(target=func,args=(num,))
t.start()
input('>>>')
lock.set() # 變綠燈
input('>>>')
線程總結
線程安全:列表和字典就是線程安全;
為什麼要加鎖?
- 非線程安全
- 控制一段代碼的時候,每次只能最多同時執行幾個
threading.local
為每一個線程創建一個字典鍵值對,為數據進行隔離
示例代碼:
import time
import threading
pond = threading.local()
def func(arg):
# 內部會為當前線程創建一個空間用於存儲,phone = 自己的值 ,將數據隔離開
pond.phone = arg
time.sleep(2)
# 取當前線程自己空間取值
print(pond.phone,arg)
for num in range(10):
t = threading.Thread(target=func,args=(num,))
t.start()
線程池
使用concurrent
來創建線程池,使用線程池可以有效的解決線程無止境的問題,用戶每一次一個請求都會創建一個線程,這樣線程一堆積也會造成程式緩慢,所以,線程池就可以幫我們解決這個問題,線程池可以設定,最多同時執行n個線程,由自己設定。
示例代碼:
import time
from concurrent.futures import ThreadPoolExecutor
# 創建一個線程池,(最多同時執行5個線程)
pool = ThreadPoolExecutor(5)
def func(arg1,arg2):
time.sleep(1)
print(arg1,arg2)
for num in range(5):
# 去線程池申請一個線程,讓線程執行func函數
pool.submit(func,num,8)
Queue線程安全隊列:
線上程中,訪問一些全局變數,加鎖是一個經常的過程。如果你是想把一些數據存儲到某個隊列中,那麼Python內置了一個線程安全的模塊叫做queue模塊。Python中的queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先進先出)隊列Queue,LIFO(後入先出)隊列LifoQueue。這些隊列都實現了鎖原語(可以理解為原子操作,即要麼不做,要麼都做完),能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。相關的函數如下:
- 初始化Queue(maxsize):創建一個先進先出的隊列。
- qsize():返回隊列的大小。
- empty():判斷隊列是否為空。
- full():判斷隊列是否滿了。
- get():從隊列中取最後一個數據。
- put():將一個數據放到隊列中。
示例代碼
#encoding:utf-8
from queue import Queue
import threading
import time
# q = Queue(4) # 添加4個隊列
# q.put(10) #第1個隊列插入一條數據
# q.put(4) #第2個隊列插入一條數據
# for x in range(4):
# q.put(x)
#
# for x in range(4):
# print(q.get())
# print(q.empty())
# print(q.full())
# print(q.qsize())
def set_value(q):
index = 0
while True:
q.put(index)
index +=1
time.sleep(2)
def get_value(q):
while True:
print(q.get())
def main():
q = Queue(4)
t1 = threading.Thread(target=set_value,args=[q])
t2 = threading.Thread(target=get_value,args=[q])
t1.start()
t2.start()
if __name__ == '__main__':
main()
生產者消費者模型
模型三部件
- 生產者
- 隊列:先進先出
- 棧:後進先出
- 消費者
- 隊列
生產者和消費者模型解決了 不用一直等待的問題
使用Queue創建隊列
示例代碼:
import time
import threading
from queue import Queue
q = Queue()
def producer(id):
'''生產者'''
while True:
time.sleep(2)
q.put('包子')
print('廚師 %s 生產了一個包子'%id)
def consumer(id):
'''消費者'''
while True:
time.sleep(1)
v1 = q.get()
print('顧客 %s 吃了一個包子'%id)
for produce in range(1,4):
t1 = threading.Thread(target=producer,args=(produce,))
t1.start()
for consu in range(1,3):
t2 = threading.Thread(target=consumer,args=(consu,))
t2.start()
總結:
- 操作系統幫助開發者操作硬體
- 程式員寫好代碼在操作系統上運行(依賴解釋器)
- 任務特別多。以前一個一個執行,(串列),現在可以使用多線程
為什麼創建線程?
- 由於線程是cpu工作的最小單元,創建線程可以利用多核優勢實現並行操作(java,C#)
為啥創建進程?
- 進程和進程之間做數據隔離(java/C)
Python
- Python中存在一個GIL鎖。
- 造成:多線程無法利用多核優勢
- 解決:開多進程處理(浪費資源)
- 總結:
- IO密集型:多線程
- 計算密集型:多進程
- 線程的創建
- Thread
- 面向對象繼承(Threading.Thread)
- 其他
- jojn
- setDeanon
- setName
- threading.current_thread()
- 鎖
- 獲得
- 釋放