python3-cookbook筆記:第十二章 併發編程

来源:https://www.cnblogs.com/guyuyun/archive/2020/02/28/12375001.html
-Advertisement-
Play Games

python3-cookbook中每個小節以問題、解決方案和討論三個部分探討了Python3在某類問題中的最優解決方式,或者說是探討Python3本身的數據結構、函數、類等特性在某類問題上如何更好地使用。這本書對於加深Python3的理解和提升Python編程能力的都有顯著幫助,特別是對怎麼提高Py ...


python3-cookbook中每個小節以問題、解決方案和討論三個部分探討了Python3在某類問題中的最優解決方式,或者說是探討Python3本身的數據結構、函數、類等特性在某類問題上如何更好地使用。這本書對於加深Python3的理解和提升Python編程能力的都有顯著幫助,特別是對怎麼提高Python程式的性能會有很好的幫助,如果有時間的話強烈建議看一下。
本文為學習筆記,文中的內容只是根據自己的工作需要和平時使用寫了書中的部分內容,並且文中的示例代碼大多直接貼的原文代碼,當然,代碼多數都在Python3.6的環境上都驗證過了的。不同領域的編程關註點也會有所不同,有興趣的可以去看全文。
python3-cookbook:https://python3-cookbook.readthedocs.io/zh_CN/latest/index.html

 

12.1 啟動與停止線程

Python中的線程除了使用is_alive()查詢它是否存活和使用join()將它加入到當前線程並等待它終止之外,並沒有提供多少可以對線程操作的方法,例如不能主動終止線程,不能給線程發送信號等,如果想要對線程進行別的查詢和操作,可以參考如下方案。

import time
from threading import Thread


class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)


c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
# 主動終止線程
c.terminate()
# 等待線程終止
t.join()

 

 

12.3 線程間通信

當你需要線上程間交換數據時,可以考慮使用queue庫中的隊列了,它的優勢在於其本身就是線程安全的,如果你使用的是其他的數據結構,就需要在代碼中手動添加線程鎖的相關操作了。需要註意的是,隊列的qsize()、full()和empty()等方法並不是線程安全的,例如當qsize()獲取結果為0時,可能另一個線程馬上就往隊列中添加了一個數據,此時qsize()的獲取結果就是1了。

對於隊列終止的判斷,可以通過在隊列中添加結束標誌或者異常捕獲來判斷,當然,隊列的操作,還是要根據具體的場景來做。

from queue import Queue
from threading import Thread

# 隊列終止標誌
_sentinel = object()


def producer(out_q):
    while True:
        # 數據處理
        ...

        # 向隊列中添加數據
        out_q.put(data)

    out_q.put(_sentinel)


def consumer(in_q):
    while True:
        # 從隊列獲取數據
        data = in_q.get()

        # 判斷隊列是否結束
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # 數據處理
        ...


q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
import queue

q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    ...

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    ...

try:
    q.put(item, block=False)
except queue.Full:
    ...

 

 

12.4 給關鍵部分加鎖

當你需要給可變對象添加鎖時,應該考慮使用with語句,而不是手動調用acquire方法和release方法,在進入with語句時會自動獲取鎖,離開with語句時則自動釋放鎖。

 

12.5 防止死鎖的加鎖機制

此小節主要記錄一個“哲學家就餐問題”的避免死鎖的解決方案,有興趣的可以看下。

哲學家就餐問題:五位哲學家圍坐在一張桌子前,每個人 面前有一個碗飯和一隻筷子。在這裡每個哲學家可以看做是一個獨立的線程,而每隻筷子可以看做是一個鎖。每個哲學家可以處在靜坐、 思考、吃飯三種狀態中的一個。需要註意的是,每個哲學家吃飯是需要兩隻筷子的,這樣問題就來了:如果每個哲學家都拿起自己左邊的筷子, 那麼他們五個都只能拿著一隻筷子坐在那兒,直到餓死。此時他們就進入了死鎖狀態。

import threading
from contextlib import contextmanager

# 線程運行時,local()返回的實例會為每個線程創建一個屬於它自己的本地存儲,不同線程的本地存儲互不影響,且互不可見
_local = threading.local()


# 利用上下文管理器和鎖的id值進行排序來控制鎖的分配
@contextmanager
def acquire(*locks):
    # Sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))

    # 每個線程第一次運行到這兒時,結果都是空列表
    acquired = getattr(_local, 'acquired', [])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # 為線程的本地存儲添加一個列表,存儲所有鎖的id值
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]


# 5個哲學家就餐問題實現
# The philosopher thread
def philosopher(left, right):
    while True:
        with acquire(left, right):
            print(threading.currentThread(), 'eating')


# The chopsticks (represented by locks)
NSTICKS = 5
chopsticks = [threading.Lock() for n in range(NSTICKS)]

# Create all of the philosophers
for n in range(NSTICKS):
    t = threading.Thread(target=philosopher,
                         args=(chopsticks[n], chopsticks[(n + 1) % NSTICKS]))
    t.start()

 

 

12.6 保存線程的狀態信息

threading.local()返回的實例可以為每個線程創建一個本地存儲,即一個底層字典,不同線程之間的字典是不可見的。以下示例中,每個線程都有自己的專屬套接字連接,所以多線程運行時它們是互不影響的。

import threading
from functools import partial
from socket import socket, AF_INET, SOCK_STREAM


class LazyConnection:
    def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
        self.address = address
        self.family = AF_INET
        self.type = SOCK_STREAM
        self.local = threading.local()

    def __enter__(self):
        if hasattr(self.local, 'sock'):
            raise RuntimeError('Already connected')
        self.local.sock = socket(self.family, self.type)
        self.local.sock.connect(self.address)
        return self.local.sock

    def __exit__(self, exc_ty, exc_val, tb):
        self.local.sock.close()
        del self.local.sock


def test(conn):
    with conn as s:
        s.send(b'GET /index.html HTTP/1.0\r\n')
        s.send(b'Host: www.python.org\r\n')

        s.send(b'\r\n')
        resp = b''.join(iter(partial(s.recv, 8192), b''))

    print('Got {} bytes'.format(len(resp)))


if __name__ == '__main__':
    conn = LazyConnection(('www.python.org', 80))

    t1 = threading.Thread(target=test, args=(conn,))
    t2 = threading.Thread(target=test, args=(conn,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

 

 

12.7 創建一個線程池

如果程式中需要使用到線程池,或者需要線程所執行函數的返回結果時,可以考慮使用from concurrent.futures import ThreadPoolExecutor。

import urllib.request
from concurrent.futures import ThreadPoolExecutor


def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data


# 創建線程池對象,並允許同時運行10個線程
pool = ThreadPoolExecutor(10)
# 傳入線程要執行的函數,以及它的參數
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# 獲取線程執行結果時,會阻塞當前線程,直到該線程執行完畢並返回結果
x = a.result()
y = b.result()

 

 

12.8 簡單的並行編程

如果想要進行CPU密集型運算,並且想利用CPU多核的特性,可以考慮使用concurrent.futures的ProcessPoolExecutor類,但是正如此小節標題所言,只能是執行一些簡單的函數形式,其他的類方法、閉包等形式並不支持,並且函數的參數和返回結果也必須相容pickle。

它的原理是創建N個獨立的Python解釋器來執行,N取決於系統CPU核心數,當然,在實例化時也可以指定ProcessPoolExecutor(N)。

可以使用對應map來批量執行函數,也可以使用submit來單獨執行某個函數,具體使用見示例。

# 使用map批量執行
from concurrent.futures import ProcessPoolExecutor


def work(x):
    ...
    return result

# 普通做法
# results = map(work, data)

# 利用CPU多核特點
with ProcessPoolExecutor() as pool:
    results = pool.map(work, data)
from concurrent.futures import ProcessPoolExecutor


def work(x):
    ...
    return result

def when_done(r):
    print('Got:', r.result())


with ProcessPoolExecutor() as pool:
    ...
    # 單獨執行某個函數
    future_result = pool.submit(work, arg)

    # 在使用result()獲取結果時,當前程式會被阻塞,直到產生結果
    r = future_result.result()
    ...

    # 單獨執行如果不想被阻塞,可以使用add_done_callback指定一個回調函數
    # 這個函數接受一個Future實例參數,可以在回調函數中獲取執行結果
    future_result.add_done_callback(when_done)

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 隨著分散式架構微服務的興起,DDD(領域驅動設計)、CQRS(命令查詢職責分離)、EDA(事件驅動架構)、ES(事件溯源)等概念也一併成為時下的火熱概念,我也在早些時候閱讀了一些大佬的分析文,學習相關概念,不過一直有種霧裡看花、似懂非懂的感覺。經過一段時間的學習和研究大佬的代碼後,自己設計實現 ...
  • 圖解Java設計模式之UML類圖 3.1 UML基本介紹 UML圖 UML類圖 3.1 UML基本介紹 1)UML – Unified modeling language UML(統一建模語言),是一種用於軟體系統分析和設計的語言工具,它用於幫助軟體開發人員進行思考和記錄思路的結果2)UML本身是一 ...
  • 最近購買了極客時間推出的李運華的課程——《從0開始學架構》,本人通過聽音頻和文字閱讀,整理出相關筆記,目的是方便今後再次閱讀。再次感謝李運華的講解,購買鏈接:從0開始學架構 資深技術專家的實戰架構心法 開篇詞 | 照著做,你也能成為架構師 想成為架構師,夢想是美好的,但道路是曲折的,這應該不是個人天 ...
  • 一、 安裝環境 python2.7.15(我使用的版本是歷史版本,比較晚) a. 在python官網下載對應版本(相容性需要自行確定),推薦使用虛擬機中共用軟體的功能(我的共用軟體是掛載到"/mnt/hgfs/",當進入目錄後會看到'你的共用文件的名字'直接進去就可以)。 b. sudo tar - ...
  • 我的潘多拉 從一個故事說起。從前,有個Java程式員非常喜歡寫程式,喜歡研究源碼,讀英文文檔。但是它在一家小公司里工作,公司的技術棧很陳舊。 單個系統代碼中含有很多的xml配置,配置各種中間件的入口適配器,而不同的業務系統中都是類似的配置。啟動單個系統很慢。啟動依賴web組件,無法快速部署。公共組件 ...
  • python3-cookbook中每個小節以問題、解決方案和討論三個部分探討了Python3在某類問題中的最優解決方式,或者說是探討Python3本身的數據結構、函數、類等特性在某類問題上如何更好地使用。這本書對於加深Python3的理解和提升Python編程能力的都有顯著幫助,特別是對怎麼提高Py ...
  • 一、 1.讓SortedSet集合做到排序還有另一種方式:java.util.Comparator; 2.單獨編寫一個比較器 package com.bjpowernode.java_learning; import java.util.*; ​ public class D90_1_SortedS ...
  • 明確學習目標,不急於求成 當下是一個喧囂、浮躁的時代。我們總是被生活中大量涌現的熱點所吸引,幾乎沒有深度閱讀和思考的時間和機會。我始終認為,學習是需要沉下心來慢慢鑽研的,是長 期的;同時,學習不應該被賦予太多的功利色彩。一個Python 程式員的成長路線圖應該是這樣子的:基礎語法–>語感訓練–>課題 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...