Python併發編程-線程

来源:https://www.cnblogs.com/xiongmozhou/archive/2018/05/27/9097968.html
-Advertisement-
Play Games

Python作為一種解釋型語言,由於使用了全局解釋鎖(GIL)的原因,其代碼不能同時在多核CPU上併發的運行。這也導致在Python中使用多線程編程並不能實現併發,我們得使用其他的方法在Python中實現併發編程。 一、全局解釋鎖(GIL) Python中不能通過使用多線程實現併發編程主要是因為全局 ...


Python作為一種解釋型語言,由於使用了全局解釋鎖(GIL)的原因,其代碼不能同時在多核CPU上併發的運行。這也導致在Python中使用多線程編程並不能實現併發,我們得使用其他的方法在Python中實現併發編程。

一、全局解釋鎖(GIL)

  Python中不能通過使用多線程實現併發編程主要是因為全局解釋鎖的機制,所以首先解釋一下全局解釋鎖的概念。

     首先,我們知道C++和Java是編譯型語言,而Python則是一種解釋型語言。對於Python程式來說,它是直接被輸入到解釋器中直接運行的。解釋器在程式執行之前對其並不瞭解;它所知道的只是Python的規則,以及在執行過程中怎樣去動態的應用這些規則。它也有一些優化,但是這基本上只是另一個級別的優化。由於解釋器沒法很好的對程式進行推導,Python的大部分優化其實是解釋器自身的優化。更快的解釋器自然意味著程式的運行也能“免費”的更快。也就是說,解釋器優化後,Python程式不用做修改就可以享受優化後的好處。

      為了利用多核系統,Python必須支持多線程運行。但作為解釋型語言,Python的解釋器需要做到既安全又高效。解釋器要註意避免在不同的線程操作內部共用的數據,同時還要保證在管理用戶線程時保證總是有最大化的計算資源。為了保證不同線程同時訪問數據時的安全性,Python使用了全局解釋器鎖(GIL)的機制。從名字上我們很容易明白,它是一個加在解釋器上的全局(從解釋器的角度看)鎖(從互斥或者類似角度看)。這種方式當然很安全,但它也意味著:對於任何Python程式,不管有多少的處理器,任何時候都總是只有一個線程在執行。即:只有獲得了全局解釋器鎖的線程才能操作Python對象或者調用Python/C API函數。

      所以,在Python中”不要使用多線程,請使用多進程”。具體來說,如果你的代碼是IO密集型的,使用多線程或者多進程都是可以的,多進程比線程更易用,但是會消耗更多的記憶體;如果你的代碼是CPU密集型的,多進程(multiprocessing模塊)就明顯是更好的選擇——特別是所使用的機器是多核或多CPU的時候。

      另外,Python的官方實現CPython帶有GIL,但並不是所有的Python實現版本都是這樣的。IronPython,Jython,還有使用.NET框架實現的Python就沒有GIL。所以如果你不能忍受GIL,也可以嘗試用一下其他實現版本的Python。

  

 

  如果是一個計算型的任務,GIL就會讓多線程變慢。我們舉個計算斐波那契數列的例子:

 

 1 import time
 2 import threading
 3 
 4 def text(name):
 5     def profile(func):
 6         def wrapper(*args,**kwargs):
 7             start = time.time()
 8             res = func(*args,**kwargs)
 9             end = time.time()
10             print('{} cost:{}'.format(name,end-start))
11             return res
12         return wrapper
13     return profile
14 
15 
16 def fib(n):
17     if n <= 2:
18         return 1
19     return fib(n-1) + fib(n-2)
20 
21 
22 @text('nothread')
23 def nothread():
24     fib(35)
25     fib(35)
26 
27 
28 @text('hasthread')
29 def hasthread():
30     for i in range(2):
31         t = threading.Thread(target=fib,args=(35,))
32         t.start()
33     main_thread = threading.current_thread()
34     for t in threading.enumerate():
35         if t is main_thread:
36             continue
37         t.join()
38 
39 nothread()
40 hasthread()
41 
42 ##輸出結果###
43 nothread cost:6.141353607177734
44 hasthread cost:6.15336275100708
View Code

 

 

  這種情況還不如不用多線程!

  GIL是必須的,這是Python設計的問題:Python解釋器是非線程安全的。這意味著當從線程內嘗試安全的訪問Python對象的時候將有一個全局的強制鎖。 在任何時候,僅僅一個單一的線程能夠獲取Python對象或者C API。每100個位元組的Python指令解釋器將重新獲取鎖,這(潛在的)阻塞了I/O操作。因為鎖,CPU密集型的代碼使用線程庫時,不會獲得性能的提高。

  那是不是由於GIL的存在,多線程庫就是個「雞肋」呢?當然不是。事實上我們平時會接觸非常多的和網路通信或者數據輸入/輸出相關的程式,比如網路爬蟲、文本處理等等。這時候由於網路情況和I/O的性能的限制,Python解釋器會等待讀寫數據的函數調用返回,這個時候就可以利用多線程庫提高併發效率了。

2.同步機制

  A. Semaphore(信號量)

  在多線程編程中,為了防止不同的線程同時對一個公用的資源(比如全部變數)進行修改,需要進行同時訪問的數量(通常是1)的限制。信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。

import time
from random import random
from threading import Thread,Semaphore,current_thread,enumerate

sema = Semaphore(3)

def foo(tid):
    with sema:
        print('{} acquire sema'.format(tid))
        wt = random() * 2

        time.sleep(wt)
    print('{} release sema'.format(tid))

for i in range(5):
    t = Thread(target=foo,args=(i,))
    t.start()

main_thread = current_thread()
for t in enumerate():
    if t is main_thread:
        continue
    t.join()

####輸出結果#####
0 acquire sema
1 acquire sema
2 acquire sema
0 release sema
3 acquire sema
1 release sema
4 acquire sema
2 release sema
3 release sema
4 release sema

  B. Lock(互斥鎖)

  Lock也可以叫做互斥鎖,其實相當於信號量為1。我們先看一個不加鎖的例子:


import time
import threading

value = 0

def getlock():
    global value
    new = value + 1
    time.sleep(0.001)  # 讓線程有機會切換
    value = new

for i in range(100):
    t = threading.Thread(target=getlock)
    t.start()

main_thread = threading.current_thread()

for t in threading.enumerate():
    if t == main_thread:
        continue
    t.join()

print(value)

####輸出結果#####
不確定(刷新值會發生改變)
 

  現在,我們來看看加鎖之後的情況:

 1 import time
 2 import threading
 3 
 4 value = 0
 5 lock = threading.Lock()
 6 
 7 def getlock():
 8     global value
 9     with lock:
10         new = value + 1
11         time.sleep(0.001)  # 讓線程有機會切換
12         value = new
13 
14 for i in range(100):
15     t = threading.Thread(target=getlock)
16     t.start()
17 
18 main_thread = threading.current_thread()
19 
20 for t in threading.enumerate():
21     if t == main_thread:
22         continue
23     t.join()
24 
25 print(value)
26 
27 ####輸出結果為#############
View Code

 

  我們對value的自增加了鎖,就可以保證了結果了。

3. RLock(遞歸鎖)

  先來說說死鎖,所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。


import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 如果鎖被占用,則阻塞在這裡,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()
        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()
 

  解決方案:


import threading
import time

mutex = threading.RLock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        mutex.acquire()  # 如果鎖被占用,則阻塞在這裡,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutex.release()
        mutex.release()


    def fun2(self):
        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutex.release()

        mutex.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()
 

  遞歸鎖內部維護了一個計數器,當有線程拿到了Lock以後,這個計數器會自動加1,只要這計數器的值大於0,那麼其他線程就不能搶到改鎖,這就保證了,在同一時刻,僅有一個線程使用該鎖,從而避免了死鎖的方法。關於遞歸鎖內部實現,有興趣的可以看看源碼。

4. Condition(條件)

  一個線程等待特定條件,而另一個線程發出特定條件滿足的信號。最好說明的例子就是「生產者/消費者」模型:

 
import time
import threading

def consumer(cond):
    t = threading.current_thread()
    with cond:
        cond.wait()  # 創建了一個鎖,等待producer解鎖
        print('{}: Resource is available to consumer'.format(t.name))

def producer(cond):
    t = threading.current_thread()
    with cond:
        print('{}:Making resource available'.format(t.name))
        cond.notifyAll()  # 釋放鎖,喚醒消費者

condition = threading.Condition()

c1 = threading.Thread(name='c1',target=consumer,args=(condition,))
p = threading.Thread(name='p',target=producer,args=(condition,))
c2 = threading.Thread(name='c2',target=consumer,args=(condition,))


c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()
 

5. Event

  一個線程發送/傳遞事件,另外的線程等待事件的觸發。我們同樣的用「生產者/消費者」模型的例子:

 
import time
import threading
from random import randint

TIMEOUT = 2

def consumer(event, l):
    t = threading.currentThread()
    while 1:
        event_is_set = event.wait(TIMEOUT)
        if event_is_set:
            try:
                integer = l.pop()
                print('{} popped from list by {}'.format(integer,t.name))
                event.clear()  # 重置狀態
            except IndexError:
                pass

def producer(event, l):
    t = threading.currentThread()
    while 1:
        integer = randint(10,100)
        l.append(integer)
        print('{} append to list by {}'.format(integer, t.name))
        event.set()
        time.sleep(1)

event = threading.Event()

l = []
threads = []

p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)

for name in ('consumer1','consumer2'):
    t = threading.Thread(target=consumer, name=name, args=(event, l))
    t.start()
    threads.append(t)


for t in threads:
    t.join()
print('ending')
 

  可以看到事件被2個消費者比較平均的接收並處理了。如果使用了wait方法,線程就會等待我們設置事件,這也有助於保證任務的完成。

6. Queue

  隊列在併發開發中最常用的。我們藉助「生產者/消費者」模式來理解:生產者把生產的「消息」放入隊列,消費者從這個隊列中對去對應的消息執行。

大家主要關心如下4個方法就好了:

  1. put: 向隊列中添加一個消息。

  2. get: 從隊列中刪除並返回一個消息。

  3. task_done: 當某一項任務完成時調用。

  4. join: 阻塞直到所有的項目都被處理完。

 
import time
import threading
import random
import queue

q = queue.Queue()

def double(n):
    return n*2

def producer():
    while 1:
        wt = random.randint(1,10)
        time.sleep(random.random())
        q.put((double, wt))

def consumer():
    while 1:
        task, arg = q.get()
        print(arg, task(arg))

        q.task_done()

for target in (producer, consumer):
    t = threading.Thread(target=target)
    t.start()
 

  Queue模塊還自帶了PriorityQueue(帶有優先順序)和LifoQueue(先進先出)2種特殊隊列。我們這裡展示下線程安全的優先順序隊列的用法,
  PriorityQueue要求我們put的數據的格式是(priority_number, data),我們看看下麵的例子:

 

 
import time
import threading
from random import randint
import queue

q = queue.PriorityQueue()
def double(n):
    return n * 2

def producer():
    count = 0
    while 1:
        if count > 5:
            break
        prit = randint(0,100)
        print("put :{}".format(prit))
        q.put((prit, double, prit))  # (優先順序,函數,參數)
        count += 1

def consumer():
    while 1:
        if q.empty():
            break
        pri,task,arg = q.get()
        print('[PRI:{}] {} * 2 = {}'.format(pri,arg,task(arg)))
        q.task_done()
        time.sleep(0.1)

t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()
 

7.線程池

  面向對象開發中,大家知道創建和銷毀對象是很費時間的,因為創建一個對象要獲取記憶體資源或者其它更多資源。無節制的創建和銷毀線程是一種極大的浪費。那我們可不可以把執行完任務的線程不銷毀而重覆利用呢?仿佛就是把這些線程放進一個池子,一方面我們可以控制同時工作的線程數量,一方面也避免了創建和銷毀產生的開銷。

 
import time
import threading
from random import random
import queue


def double(n):
    return n * 2


class Worker(threading.Thread):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self._q = queue
        self.daemon = True
        self.start()

    def run(self):
        while 1:
            f, args, kwargs = self._q.get()
            try:
                print('USE:{}'.format(self.name))
                print(f(*args, **kwargs))
            except Exception as e:
                print(e)
            self._q.task_done()


class ThreadPool(object):
    def __init__(self, max_num=5):
        self._q = queue.Queue(max_num)
        for _ in range(max_num):
            Worker(self._q)  # create worker thread

    def add_task(self, f, *args, **kwargs):
        self._q.put((f, args, kwargs))

    def wait_compelete(self):
        self._q.join()

pool = ThreadPool()
for _ in range(8):
    wt = random()
    pool.add_task(double, wt)
    time.sleep(wt)

pool.wait_compelete()
 
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 將springMVC進行了進一步的封裝。讓開發者更容易。 ...
  • Java開源生鮮電商平臺-團購模塊設計與架構(源碼可下載) 說明:任何一個電商系統中,對於促銷這塊是必不可少的,畢竟這塊是最吸引用戶的,用戶也是最愛的模塊之一,理由很簡單,便宜。 我的經驗是無論是大的餐飲點還是小的餐飲店,優惠與折扣永遠是說福他們進入平臺的最好的手段之一。(大企業叫做節約成本,小企業 ...
  • 函數 1.函數結構 def 是函數的定義關鍵字,my_len是函數名。()傳參用,冒號下麵都是函數體。 執行函數方法:函數名加括弧來執行函數。My_len() 舉例: # s = 'lkfjsjulkjdgjdsf' # def my_len(): # count = 0 # for i in s: ...
  • 我們或多或少都有過,或者見過將賦值表達式參與運算的情況。這通常會伴隨著一些意想不到的問題。今天我就見到了一段奇怪的代碼: 乍一看,似乎答案很明朗,按照順序運算之後,a的值是3,b的值是5.有經驗的程式員肯定會一眼看出,這裡的計算過程是一個未定義行為(Undefined behavior).在這裡簡單 ...
  • 這裡的內容僅僅是本人閱讀《Python高性能編程》後總結的一些知識,用於自己更好的瞭解Python機制。本人現在並不從事計算密集型工作:人工智慧、數據分析等。僅僅只是出於好奇而去閱讀這本書。很多人因為Python不能同時使用多顆CPU(全局解釋器鎖GIL),而覺得它不能實現高性能。書中有很多介紹避開 ...
  • 例1: 輸出結果: 例2: 輸出結果: ...
  • 1.建立普通的Javaweb項目,導入項目所必須的jar包。 2.配置web.xml文件。 3.在src下建立struts.xml。 4.在實體包下配置 實體名.hbm.xml 5.在src下建立applicationContext.xml。 6.在src下建立資料庫的相關配置信息db.proper ...
  • 實現步驟: 1、創建用戶登錄提交界面 2、創建處理用戶登錄請求servlet組件Main 3、創建代表登錄成功響應的servlet的組件LoginSuccess 4、創建代表登錄失敗響應的servlet組件LoginFail 【1代碼login.html】 【2程式Main.java】 【3程式Lo ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...