34、鎖問題與線程queue

来源:http://www.cnblogs.com/liluning/archive/2017/08/30/7453961.html
-Advertisement-
Play Games

上一篇隨筆我們學了全局解釋器鎖,前面也學了互斥鎖,今天學習一些與鎖相關的點,例如遞歸鎖,信號量,Event,還會學習我們已經很熟悉的隊列,不過這次的隊列是作為一個模塊出現的。 ...


上一篇隨筆我們學了全局解釋器鎖,前面也學了互斥鎖,今天學習一些與鎖相關的點,例如遞歸鎖,信號量,Event,還會學習我們已經很熟悉的隊列,不過這次的隊列是作為一個模塊出現的。

 

一、同步鎖

1、join與互斥鎖

線程搶的是GIL鎖,GIL鎖相當於執行許可權,拿到執行許可權後才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發現Lock仍然沒有被釋放則阻塞,即便是拿到執行許可權GIL也要立刻交出來

join是等待所有,即整體串列,而鎖只是鎖住修改共用數據的部分,即部分串列,要想保證數據安全的根本原理在於讓併發變成串列,join與互斥鎖都可以實現,毫無疑問,互斥鎖的部分串列效率要更高

2、GIL VS Lock

鎖的目的是為了保護共用的數據,同一時間只能有一個線程來修改共用的數據。結論:保護不同的數據就應該加不同的鎖。

GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程式的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

分析:

1)100個線程去搶GIL鎖,即搶執行許可權
2) 肯定有一個線程先搶到GIL(暫且稱為線程1),然後開始執行,一旦執行就會拿到lock.acquire()
3)極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行許可權,即釋放GIL
4)直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的線程再重覆2 3 4的過程

3、join與互斥鎖對比實例

1)未處理代碼:

#不加鎖:併發執行,速度快,數據不安全
from threading import currentThread,Thread
import time
def task():
    time.sleep(1)
    global n
    print('%s is running' %currentThread().getName())
    temp = n
    time.sleep(0.1)
    n = temp - 1

if __name__ == '__main__':
    n = 100
    t_l = []
    s1 = time.time()
    for i in range(100):
        t=Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()

    s2 = time.time()
    print('主:%s n:%s' %(s2-s1,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:1.1128411293029785 n:99
'''
初始

2)加互斥鎖:

#不加鎖:未加鎖部分併發執行,加鎖部分串列執行,速度慢,數據安全
from threading import currentThread,Thread,Lock
import time
def task():
    #未加鎖的代碼併發運行
    time.sleep(1)
    print('%s start to run' %currentThread().getName())
    global n
    #加鎖的代碼串列運行
    mutex.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    mutex.release()

if __name__ == '__main__':
    n = 100
    mutex = Lock()
    t_l = []
    s1 = time.time()
    for i in range(100) :
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l :
        t.join()
    s2 = time.time()
    print('主:%s n:%s' %(s2-s1,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:11.091605186462402 n:0
'''
Lock

3)join效果

from threading import currentThread,Thread
import time
def task():
    time.sleep(1)
    print('%s start to run' %currentThread().getName())
    global n
    temp = n
    time.sleep(0.1)
    n = temp - 1


if __name__ == '__main__':
    n = 100
    s1 = time.time()
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t.join()
    s2 = time.time()
    print('主:%s n:%s' %(s2-s1,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:110.16416668891907 n:0
'''
join

即在start之後立刻使用jion,肯定會將100個任務的執行變成串列,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是start後立即join:任務內的所有代碼都是串列執行的,而加鎖,只是加鎖的部分即修改共用數據的部分是串列的單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.


 

二、死鎖現象與遞歸鎖

1、死鎖現象

進程也有死鎖與遞歸鎖與線程中相同

死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程

from threading import Lock,Thread
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('\033[31m%s 拿到A鎖' %self.name)
        mutexB.acquire()
        print('\033[32m%s 拿到B鎖' %self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print('\033[33m%s 拿到B鎖' %self.name)
        time.sleep(1)
        mutexA.acquire()
        print('\033[34m%s 拿到A鎖' %self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

死鎖狀態,程式永遠無法結束:

2、遞歸鎖

上述情況可以用遞歸鎖解決

遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

from threading import ,Thread,RLock
import time
mutexB=mutexA=RLock()
#一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('\033[31m%s 拿到A鎖' %self.name)
        mutexB.acquire()
        print('\033[32m%s 拿到B鎖' %self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print('\033[33m%s 拿到B鎖' %self.name)
        time.sleep(1)
        mutexA.acquire()
        print('\033[34m%s 拿到A鎖' %self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

 

三、信號量Semaphore

 Semaphore也是一種鎖不過這個鎖可以自己定義同時可以進入鎖的線程數

Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;

計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:

1、互斥鎖Lock就像家裡的廁所每次只能進一人,進去後鎖門其他人在外面等著(這是學進程互斥鎖時的例子)

from multiprocessing import Process,Lock,current_process
import time,random
def work(mutex):
    mutex.acquire()  #上鎖
    print('%s 上廁所' %current_process().name)
    time.sleep(random.randint(1,3))
    print('%s 走了' %current_process().name)
    mutex.release()  #開鎖

if __name__ == '__main__':
    mutex=Lock()  #實例化(互斥鎖)
    print('start...')
    for i in range(20):
        t=Process(target=work,args=(mutex,))
        t.start()

2、信號量Semaphore就像是街道的公共廁所有固定個數的隔間(例如5個),剛開始可以進去5個,然後出來幾個便可以再進去幾個

from threading import Thread,Semaphore,currentThread
import time,random
sm=Semaphore(5)
def task():
    sm.acquire()
    print('%s 上廁所' %currentThread().getName())
    time.sleep(random.randint(1,3))
    print('%s 走了' %currentThread().getName())
    sm.release()
if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=task)
        t.start()

與進程池相似但是完全不同的概念,進程池Pool(4),最大隻能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程


 

四、Event

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程式中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為瞭解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麼這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。

1、模擬紅綠燈

from threading import Thread,Event,currentThread
import time
e=Event()

def traffic_lights():
    time.sleep(5)
    e.set()

def car():
    print('\033[41m%s 等' %currentThread().getName())
    e.wait()
    print('\033[42m%s 跑' %currentThread().getName())


if __name__ == '__main__':
    for i in range(10):
        t=Thread(target=car)
        t.start()
    traffic_thread=Thread(target=traffic_lights)
    traffic_thread.start()

2、有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL伺服器,如果連接不成功,都會去嘗試重新連接。那麼我們就可以採用threading.Event機制來協調各個工作線程的連接操作

from threading import Thread,Event,currentThread
import time
e=Event()
def conn_mysql():
    count=1
    while not e.is_set():
        if count > 3:
            raise ConnectionError('嘗試鏈接的次數過多')
        print('\033[35m%s 第%s次嘗試' %(currentThread().getName(),count))
        e.wait(timeout=1)
        count+=1
    print('\033[32m%s 開始鏈接' %currentThread().getName())

def check_mysql():
    print('\033[34m%s 檢測mysql...' %currentThread().getName())
    time.sleep(2)
    e.set()
if __name__ == '__main__':
    for i in range(3):
        t=Thread(target=conn_mysql)
        t.start()
    t=Thread(target=check_mysql)
    t.start()

 

五、定時器

定時器,指定n秒後執行某操作

from threading import Timer

def hello(n):
    print("hello, world",n)

#三秒後運行hello函數傳入參數123
t = Timer(3, hello, args=(123,))
t.start()

 

六、線程queue

queue隊列 :使用import queue,用法與進程Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

隊列線上程編程中尤其有用,因為必須在多個線程之間安全地交換信息。

1、queue.Queue() 先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

2、queue.LifoQueue() 後進先出

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

3、queue.PriorityQueue() 存儲數據時可設置優先順序的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我們知道字元有兩種初始化方式: String s1 = “abc”; String s2 = new String("def"); 這兩種有什麼區別呢?這時就需要我們看看String的在記憶體中是怎麼存儲的了。我們先看下麵的圖瞭解其在記憶體在是怎麼存儲的: 首先是 String s1 = “abc”;它 ...
  • BeautifulSoup是一個模塊,該模塊用於接收一個HTML或XML字元串,然後將其進行格式化,之後便可以使用他提供的方法進行快速查找指定元素,從而使得在HTML或XML中查找指定元素變得簡單。 1 from bs4 import BeautifulSoup 2 3 html_doc = """ ...
  • 一、數據類型 數據類型,相同類的值(指定類的對象)具有相同的功能,功能保存在類中(只有一份) 1、整數(int類) 1,2,3,4.......99.... bit_length() 當前十進位用二進位表示時,最少使用的位數 v = 15# 當前十進位用二進位表示時,最少使用的位數data = v. ...
  • 正則匹配的各種模式 1.擇一匹配(|) 擇一匹配用管道符號(|),也就是豎線表示。代表可以從多個模式中選擇一個,可用於分割正則表達式。例如: ABC | abc 表示即可以匹配 ABC,又可以匹配 abc。 2.匹配任意單個字元 3.從字元串起始或者結尾或者單詞邊界匹配 例如: \bthe,匹配以 ...
  • JSP的作用域一般是對於變數而言的,描述的是變數在某處是否有效(可用) 1.第一個作用域是page,只在當前頁面有效。也就是用戶請求的頁面有效,噹噹前頁面關閉或轉到其他頁面時,page對象將在響應回饋給客戶端後釋放。 2.第二個作用域是request,在當前請求中有效。request可以通過setA ...
  • 批處理(batch) >好比快遞員【不能一件一件的送快遞】 - 批處理指的是一次操作中執行多條SQL語句- 批處理相比於一次一次執行效率會提高很多 - 批處理主要是分兩步: 1.將要執行的SQL語句保存 2.執行SQL語句 - Statement和PreparedStatement都支持批處理操作, ...
  • 箱線圖 箱線圖是能同時反映數據統計量和整體分佈,又很漂亮的展示圖。在2014年的Nature Method上有2篇Correspondence論述了使用箱線圖的好處和一個線上繪製箱線圖的工具。就這樣都可以發兩篇Nature method,沒天理,但也說明瞭箱線圖的重要意義。 下麵這張圖展示了Bar ...
  • 私有方法不能被覆蓋: 因為private被自動認為final,對子類是屏蔽的,那麼子類中的相同方法就是一個新的方法,編譯器不會報錯但也不會按期望運行: 輸出為A。public、protected或預設情況下輸出都為B。 靜態方法不能被覆蓋: 靜態方法是與類,而非單個的對象的關聯。單個對象調用靜態方法 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...