Python作為一種解釋型語言,由於使用了全局解釋鎖(GIL)的原因,其代碼不能同時在多核CPU上併發的運行。這也導致在Python中使用多線程編程並不能實現併發,我們得使用其他的方法在Python中實現併發編程。 一、全局解釋鎖(GIL) Python中不能通過使用多線程實現併發編程主要是因為全局 ...
Python作為一種解釋型語言,由於使用了全局解釋鎖(GIL)的原因,其代碼不能同時在多核CPU上併發的運行。這也導致在Python中使用多線程編程並不能實現併發,我們得使用其他的方法在Python中實現併發編程。
一、全局解釋鎖(GIL)
Python中不能通過使用多線程實現併發編程主要是因為全局解釋鎖的機制,所以首先解釋一下全局解釋鎖的概念。
首先,我們知道C++和Java是編譯型語言,而Python則是一種解釋型語言。對於Python程式來說,它是直接被輸入到解釋器中直接運行的。解釋器在程式執行之前對其並不瞭解;它所知道的只是Python的規則,以及在執行過程中怎樣去動態的應用這些規則。它也有一些優化,但是這基本上只是另一個級別的優化。由於解釋器沒法很好的對程式進行推導,Python的大部分優化其實是解釋器自身的優化。更快的解釋器自然意味著程式的運行也能“免費”的更快。也就是說,解釋器優化後,Python程式不用做修改就可以享受優化後的好處。
為了利用多核系統,Python必須支持多線程運行。但作為解釋型語言,Python的解釋器需要做到既安全又高效。解釋器要註意避免在不同的線程操作內部共用的數據,同時還要保證在管理用戶線程時保證總是有最大化的計算資源。為了保證不同線程同時訪問數據時的安全性,Python使用了全局解釋器鎖(GIL)的機制。從名字上我們很容易明白,它是一個加在解釋器上的全局(從解釋器的角度看)鎖(從互斥或者類似角度看)。這種方式當然很安全,但它也意味著:對於任何Python程式,不管有多少的處理器,任何時候都總是只有一個線程在執行。即:只有獲得了全局解釋器鎖的線程才能操作Python對象或者調用Python/C API函數。
所以,在Python中”不要使用多線程,請使用多進程”。具體來說,如果你的代碼是IO密集型的,使用多線程或者多進程都是可以的,多進程比線程更易用,但是會消耗更多的記憶體;如果你的代碼是CPU密集型的,多進程(multiprocessing模塊)就明顯是更好的選擇——特別是所使用的機器是多核或多CPU的時候。
另外,Python的官方實現CPython帶有GIL,但並不是所有的Python實現版本都是這樣的。IronPython,Jython,還有使用.NET框架實現的Python就沒有GIL。所以如果你不能忍受GIL,也可以嘗試用一下其他實現版本的Python。
如果是一個計算型的任務,GIL就會讓多線程變慢。我們舉個計算斐波那契數列的例子:
1 import time 2 import threading 3 4 def text(name): 5 def profile(func): 6 def wrapper(*args,**kwargs): 7 start = time.time() 8 res = func(*args,**kwargs) 9 end = time.time() 10 print('{} cost:{}'.format(name,end-start)) 11 return res 12 return wrapper 13 return profile 14 15 16 def fib(n): 17 if n <= 2: 18 return 1 19 return fib(n-1) + fib(n-2) 20 21 22 @text('nothread') 23 def nothread(): 24 fib(35) 25 fib(35) 26 27 28 @text('hasthread') 29 def hasthread(): 30 for i in range(2): 31 t = threading.Thread(target=fib,args=(35,)) 32 t.start() 33 main_thread = threading.current_thread() 34 for t in threading.enumerate(): 35 if t is main_thread: 36 continue 37 t.join() 38 39 nothread() 40 hasthread() 41 42 ##輸出結果### 43 nothread cost:6.141353607177734 44 hasthread cost:6.15336275100708View Code
這種情況還不如不用多線程!
GIL是必須的,這是Python設計的問題:Python解釋器是非線程安全的。這意味著當從線程內嘗試安全的訪問Python對象的時候將有一個全局的強制鎖。 在任何時候,僅僅一個單一的線程能夠獲取Python對象或者C API。每100個位元組的Python指令解釋器將重新獲取鎖,這(潛在的)阻塞了I/O操作。因為鎖,CPU密集型的代碼使用線程庫時,不會獲得性能的提高。
那是不是由於GIL的存在,多線程庫就是個「雞肋」呢?當然不是。事實上我們平時會接觸非常多的和網路通信或者數據輸入/輸出相關的程式,比如網路爬蟲、文本處理等等。這時候由於網路情況和I/O的性能的限制,Python解釋器會等待讀寫數據的函數調用返回,這個時候就可以利用多線程庫提高併發效率了。
2.同步機制
A. Semaphore(信號量)
在多線程編程中,為了防止不同的線程同時對一個公用的資源(比如全部變數)進行修改,需要進行同時訪問的數量(通常是1)的限制。信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。
import time
from random import random
from threading import Thread,Semaphore,current_thread,enumerate
sema = Semaphore(3)
def foo(tid):
with sema:
print('{} acquire sema'.format(tid))
wt = random() * 2
time.sleep(wt)
print('{} release sema'.format(tid))
for i in range(5):
t = Thread(target=foo,args=(i,))
t.start()
main_thread = current_thread()
for t in enumerate():
if t is main_thread:
continue
t.join()
####輸出結果#####
0 acquire sema
1 acquire sema
2 acquire sema
0 release sema
3 acquire sema
1 release sema
4 acquire sema
2 release sema
3 release sema
4 release sema
B. Lock(互斥鎖)
Lock也可以叫做互斥鎖,其實相當於信號量為1。我們先看一個不加鎖的例子:
import time
import threading
value = 0
def getlock():
global value
new = value + 1
time.sleep(0.001) # 讓線程有機會切換
value = new
for i in range(100):
t = threading.Thread(target=getlock)
t.start()
main_thread = threading.current_thread()
for t in threading.enumerate():
if t == main_thread:
continue
t.join()
print(value)
####輸出結果#####
不確定(刷新值會發生改變)
現在,我們來看看加鎖之後的情況:
1 import time 2 import threading 3 4 value = 0 5 lock = threading.Lock() 6 7 def getlock(): 8 global value 9 with lock: 10 new = value + 1 11 time.sleep(0.001) # 讓線程有機會切換 12 value = new 13 14 for i in range(100): 15 t = threading.Thread(target=getlock) 16 t.start() 17 18 main_thread = threading.current_thread() 19 20 for t in threading.enumerate(): 21 if t == main_thread: 22 continue 23 t.join() 24 25 print(value) 26 27 ####輸出結果為#############View Code
我們對value的自增加了鎖,就可以保證了結果了。
3. RLock(遞歸鎖)
先來說說死鎖,所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。
import threading
import time
mutexA = threading.Lock()
mutexB = threading.Lock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun1()
self.fun2()
def fun1(self):
mutexA.acquire() # 如果鎖被占用,則阻塞在這裡,等待鎖的釋放
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexB.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
mutexB.release()
mutexA.release()
def fun2(self):
mutexB.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
time.sleep(0.2)
mutexA.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexA.release()
mutexB.release()
if __name__ == "__main__":
print("start---------------------------%s"%time.time())
for i in range(0, 10):
my_thread = MyThread()
my_thread.start()
解決方案:
import threading
import time
mutex = threading.RLock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun1()
self.fun2()
def fun1(self):
mutex.acquire() # 如果鎖被占用,則阻塞在這裡,等待鎖的釋放
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutex.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
mutex.release()
mutex.release()
def fun2(self):
mutex.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
time.sleep(0.2)
mutex.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutex.release()
mutex.release()
if __name__ == "__main__":
print("start---------------------------%s"%time.time())
for i in range(0, 10):
my_thread = MyThread()
my_thread.start()
遞歸鎖內部維護了一個計數器,當有線程拿到了Lock以後,這個計數器會自動加1,只要這計數器的值大於0,那麼其他線程就不能搶到改鎖,這就保證了,在同一時刻,僅有一個線程使用該鎖,從而避免了死鎖的方法。關於遞歸鎖內部實現,有興趣的可以看看源碼。
4. Condition(條件)
一個線程等待特定條件,而另一個線程發出特定條件滿足的信號。最好說明的例子就是「生產者/消費者」模型:
import time
import threading
def consumer(cond):
t = threading.current_thread()
with cond:
cond.wait() # 創建了一個鎖,等待producer解鎖
print('{}: Resource is available to consumer'.format(t.name))
def producer(cond):
t = threading.current_thread()
with cond:
print('{}:Making resource available'.format(t.name))
cond.notifyAll() # 釋放鎖,喚醒消費者
condition = threading.Condition()
c1 = threading.Thread(name='c1',target=consumer,args=(condition,))
p = threading.Thread(name='p',target=producer,args=(condition,))
c2 = threading.Thread(name='c2',target=consumer,args=(condition,))
c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()
5. Event
一個線程發送/傳遞事件,另外的線程等待事件的觸發。我們同樣的用「生產者/消費者」模型的例子:
import time
import threading
from random import randint
TIMEOUT = 2
def consumer(event, l):
t = threading.currentThread()
while 1:
event_is_set = event.wait(TIMEOUT)
if event_is_set:
try:
integer = l.pop()
print('{} popped from list by {}'.format(integer,t.name))
event.clear() # 重置狀態
except IndexError:
pass
def producer(event, l):
t = threading.currentThread()
while 1:
integer = randint(10,100)
l.append(integer)
print('{} append to list by {}'.format(integer, t.name))
event.set()
time.sleep(1)
event = threading.Event()
l = []
threads = []
p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)
for name in ('consumer1','consumer2'):
t = threading.Thread(target=consumer, name=name, args=(event, l))
t.start()
threads.append(t)
for t in threads:
t.join()
print('ending')
可以看到事件被2個消費者比較平均的接收並處理了。如果使用了wait方法,線程就會等待我們設置事件,這也有助於保證任務的完成。
6. Queue
隊列在併發開發中最常用的。我們藉助「生產者/消費者」模式來理解:生產者把生產的「消息」放入隊列,消費者從這個隊列中對去對應的消息執行。
大家主要關心如下4個方法就好了:
-
put: 向隊列中添加一個消息。
-
get: 從隊列中刪除並返回一個消息。
-
task_done: 當某一項任務完成時調用。
-
join: 阻塞直到所有的項目都被處理完。
import time
import threading
import random
import queue
q = queue.Queue()
def double(n):
return n*2
def producer():
while 1:
wt = random.randint(1,10)
time.sleep(random.random())
q.put((double, wt))
def consumer():
while 1:
task, arg = q.get()
print(arg, task(arg))
q.task_done()
for target in (producer, consumer):
t = threading.Thread(target=target)
t.start()
Queue模塊還自帶了PriorityQueue(帶有優先順序)和LifoQueue(先進先出)2種特殊隊列。我們這裡展示下線程安全的優先順序隊列的用法,
PriorityQueue要求我們put的數據的格式是(priority_number, data)
,我們看看下麵的例子:
import time
import threading
from random import randint
import queue
q = queue.PriorityQueue()
def double(n):
return n * 2
def producer():
count = 0
while 1:
if count > 5:
break
prit = randint(0,100)
print("put :{}".format(prit))
q.put((prit, double, prit)) # (優先順序,函數,參數)
count += 1
def consumer():
while 1:
if q.empty():
break
pri,task,arg = q.get()
print('[PRI:{}] {} * 2 = {}'.format(pri,arg,task(arg)))
q.task_done()
time.sleep(0.1)
t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()
7.線程池
面向對象開發中,大家知道創建和銷毀對象是很費時間的,因為創建一個對象要獲取記憶體資源或者其它更多資源。無節制的創建和銷毀線程是一種極大的浪費。那我們可不可以把執行完任務的線程不銷毀而重覆利用呢?仿佛就是把這些線程放進一個池子,一方面我們可以控制同時工作的線程數量,一方面也避免了創建和銷毀產生的開銷。
import time
import threading
from random import random
import queue
def double(n):
return n * 2
class Worker(threading.Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self._q = queue
self.daemon = True
self.start()
def run(self):
while 1:
f, args, kwargs = self._q.get()
try:
print('USE:{}'.format(self.name))
print(f(*args, **kwargs))
except Exception as e:
print(e)
self._q.task_done()
class ThreadPool(object):
def __init__(self, max_num=5):
self._q = queue.Queue(max_num)
for _ in range(max_num):
Worker(self._q) # create worker thread
def add_task(self, f, *args, **kwargs):
self._q.put((f, args, kwargs))
def wait_compelete(self):
self._q.join()
pool = ThreadPool()
for _ in range(8):
wt = random()
pool.add_task(double, wt)
time.sleep(wt)
pool.wait_compelete()