python3-cookbook中每個小節以問題、解決方案和討論三個部分探討了Python3在某類問題中的最優解決方式,或者說是探討Python3本身的數據結構、函數、類等特性在某類問題上如何更好地使用。這本書對於加深Python3的理解和提升Python編程能力的都有顯著幫助,特別是對怎麼提高Py ...
python3-cookbook中每個小節以問題、解決方案和討論三個部分探討了Python3在某類問題中的最優解決方式,或者說是探討Python3本身的數據結構、函數、類等特性在某類問題上如何更好地使用。這本書對於加深Python3的理解和提升Python編程能力的都有顯著幫助,特別是對怎麼提高Python程式的性能會有很好的幫助,如果有時間的話強烈建議看一下。
本文為學習筆記,文中的內容只是根據自己的工作需要和平時使用寫了書中的部分內容,並且文中的示例代碼大多直接貼的原文代碼,當然,代碼多數都在Python3.6的環境上都驗證過了的。不同領域的編程關註點也會有所不同,有興趣的可以去看全文。
python3-cookbook:https://python3-cookbook.readthedocs.io/zh_CN/latest/index.html
12.1 啟動與停止線程
Python中的線程除了使用is_alive()查詢它是否存活和使用join()將它加入到當前線程並等待它終止之外,並沒有提供多少可以對線程操作的方法,例如不能主動終止線程,不能給線程發送信號等,如果想要對線程進行別的查詢和操作,可以參考如下方案。
import time from threading import Thread class CountdownTask: def __init__(self): self._running = True def terminate(self): self._running = False def run(self, n): while self._running and n > 0: print('T-minus', n) n -= 1 time.sleep(5) c = CountdownTask() t = Thread(target=c.run, args=(10,)) t.start() # 主動終止線程 c.terminate() # 等待線程終止 t.join()
12.3 線程間通信
當你需要線上程間交換數據時,可以考慮使用queue庫中的隊列了,它的優勢在於其本身就是線程安全的,如果你使用的是其他的數據結構,就需要在代碼中手動添加線程鎖的相關操作了。需要註意的是,隊列的qsize()、full()和empty()等方法並不是線程安全的,例如當qsize()獲取結果為0時,可能另一個線程馬上就往隊列中添加了一個數據,此時qsize()的獲取結果就是1了。
對於隊列終止的判斷,可以通過在隊列中添加結束標誌或者異常捕獲來判斷,當然,隊列的操作,還是要根據具體的場景來做。
from queue import Queue from threading import Thread # 隊列終止標誌 _sentinel = object() def producer(out_q): while True: # 數據處理 ... # 向隊列中添加數據 out_q.put(data) out_q.put(_sentinel) def consumer(in_q): while True: # 從隊列獲取數據 data = in_q.get() # 判斷隊列是否結束 if data is _sentinel: in_q.put(_sentinel) break # 數據處理 ... q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start()
import queue q = queue.Queue() try: data = q.get(block=False) except queue.Empty: ... try: data = q.get(timeout=5.0) except queue.Empty: ... try: q.put(item, block=False) except queue.Full: ...
12.4 給關鍵部分加鎖
當你需要給可變對象添加鎖時,應該考慮使用with語句,而不是手動調用acquire方法和release方法,在進入with語句時會自動獲取鎖,離開with語句時則自動釋放鎖。
12.5 防止死鎖的加鎖機制
此小節主要記錄一個“哲學家就餐問題”的避免死鎖的解決方案,有興趣的可以看下。
哲學家就餐問題:五位哲學家圍坐在一張桌子前,每個人 面前有一個碗飯和一隻筷子。在這裡每個哲學家可以看做是一個獨立的線程,而每隻筷子可以看做是一個鎖。每個哲學家可以處在靜坐、 思考、吃飯三種狀態中的一個。需要註意的是,每個哲學家吃飯是需要兩隻筷子的,這樣問題就來了:如果每個哲學家都拿起自己左邊的筷子, 那麼他們五個都只能拿著一隻筷子坐在那兒,直到餓死。此時他們就進入了死鎖狀態。
import threading from contextlib import contextmanager # 線程運行時,local()返回的實例會為每個線程創建一個屬於它自己的本地存儲,不同線程的本地存儲互不影響,且互不可見 _local = threading.local() # 利用上下文管理器和鎖的id值進行排序來控制鎖的分配 @contextmanager def acquire(*locks): # Sort locks by object identifier locks = sorted(locks, key=lambda x: id(x)) # 每個線程第一次運行到這兒時,結果都是空列表 acquired = getattr(_local, 'acquired', []) if acquired and max(id(lock) for lock in acquired) >= id(locks[0]): raise RuntimeError('Lock Order Violation') # 為線程的本地存儲添加一個列表,存儲所有鎖的id值 acquired.extend(locks) _local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: # Release locks in reverse order of acquisition for lock in reversed(locks): lock.release() del acquired[-len(locks):] # 5個哲學家就餐問題實現 # The philosopher thread def philosopher(left, right): while True: with acquire(left, right): print(threading.currentThread(), 'eating') # The chopsticks (represented by locks) NSTICKS = 5 chopsticks = [threading.Lock() for n in range(NSTICKS)] # Create all of the philosophers for n in range(NSTICKS): t = threading.Thread(target=philosopher, args=(chopsticks[n], chopsticks[(n + 1) % NSTICKS])) t.start()
12.6 保存線程的狀態信息
threading.local()返回的實例可以為每個線程創建一個本地存儲,即一個底層字典,不同線程之間的字典是不可見的。以下示例中,每個線程都有自己的專屬套接字連接,所以多線程運行時它們是互不影響的。
import threading from functools import partial from socket import socket, AF_INET, SOCK_STREAM class LazyConnection: def __init__(self, address, family=AF_INET, type=SOCK_STREAM): self.address = address self.family = AF_INET self.type = SOCK_STREAM self.local = threading.local() def __enter__(self): if hasattr(self.local, 'sock'): raise RuntimeError('Already connected') self.local.sock = socket(self.family, self.type) self.local.sock.connect(self.address) return self.local.sock def __exit__(self, exc_ty, exc_val, tb): self.local.sock.close() del self.local.sock def test(conn): with conn as s: s.send(b'GET /index.html HTTP/1.0\r\n') s.send(b'Host: www.python.org\r\n') s.send(b'\r\n') resp = b''.join(iter(partial(s.recv, 8192), b'')) print('Got {} bytes'.format(len(resp))) if __name__ == '__main__': conn = LazyConnection(('www.python.org', 80)) t1 = threading.Thread(target=test, args=(conn,)) t2 = threading.Thread(target=test, args=(conn,)) t1.start() t2.start() t1.join() t2.join()
12.7 創建一個線程池
如果程式中需要使用到線程池,或者需要線程所執行函數的返回結果時,可以考慮使用from concurrent.futures import ThreadPoolExecutor。
import urllib.request from concurrent.futures import ThreadPoolExecutor def fetch_url(url): u = urllib.request.urlopen(url) data = u.read() return data # 創建線程池對象,並允許同時運行10個線程 pool = ThreadPoolExecutor(10) # 傳入線程要執行的函數,以及它的參數 a = pool.submit(fetch_url, 'http://www.python.org') b = pool.submit(fetch_url, 'http://www.pypy.org') # 獲取線程執行結果時,會阻塞當前線程,直到該線程執行完畢並返回結果 x = a.result() y = b.result()
12.8 簡單的並行編程
如果想要進行CPU密集型運算,並且想利用CPU多核的特性,可以考慮使用concurrent.futures的ProcessPoolExecutor類,但是正如此小節標題所言,只能是執行一些簡單的函數形式,其他的類方法、閉包等形式並不支持,並且函數的參數和返回結果也必須相容pickle。
它的原理是創建N個獨立的Python解釋器來執行,N取決於系統CPU核心數,當然,在實例化時也可以指定ProcessPoolExecutor(N)。
可以使用對應map來批量執行函數,也可以使用submit來單獨執行某個函數,具體使用見示例。
# 使用map批量執行 from concurrent.futures import ProcessPoolExecutor def work(x): ... return result # 普通做法 # results = map(work, data) # 利用CPU多核特點 with ProcessPoolExecutor() as pool: results = pool.map(work, data)
from concurrent.futures import ProcessPoolExecutor def work(x): ... return result def when_done(r): print('Got:', r.result()) with ProcessPoolExecutor() as pool: ... # 單獨執行某個函數 future_result = pool.submit(work, arg) # 在使用result()獲取結果時,當前程式會被阻塞,直到產生結果 r = future_result.result() ... # 單獨執行如果不想被阻塞,可以使用add_done_callback指定一個回調函數 # 這個函數接受一個Future實例參數,可以在回調函數中獲取執行結果 future_result.add_done_callback(when_done)