引言 對於 Python 來說,並不缺少併發選項,其標準庫中包括了對線程、進程和非同步 I/O 的支持。在許多情況下,通過創建諸如非同步、線程和子進程之類的高層模塊,Python 簡化了各種併發方法的使用。除了標準庫之外,還有一些第三方的解決方案,例如 Twisted、Stackless 和進程模塊。本 ...
引言
對於 Python 來說,並不缺少併發選項,其標準庫中包括了對線程、進程和非同步 I/O 的支持。在許多情況下,通過創建諸如非同步、線程和子進程之類的高層模塊,Python 簡化了各種併發方法的使用。除了標準庫之外,還有一些第三方的解決方案,例如 Twisted、Stackless 和進程模塊。本文重點關註於使用 Python 的線程,並使用了一些實際的示例進行說明。雖然有許多很好的聯機資源詳細說明瞭線程 API,但本文嘗試提供一些實際的示例,以說明一些常見的線程使用模式。
全局解釋器鎖 (Global Interpretor Lock) 說明 Python 解釋器並不是線程安全的。當前線程必須持有全局鎖,以便對 Python 對象進行安全地訪問。因為只有一個線程可以獲得 Python 對象/C API,所以解釋器每經過 100 個位元組碼的指令,就有規律地釋放和重新獲得鎖。解釋器對線程切換進行檢查的頻率可以通過sys.setcheckinterval()
函數來進行控制。此外,還將根據潛在的阻塞 I/O 操作,釋放和重新獲得鎖。有關更詳細的信息,請參見參考資料部分中的 Gil and Threading State 和 Threading the Global Interpreter Lock。需要說明的是,因為 GIL,CPU 受限的應用程式將無法從線程的使用中受益。使用 Python 時,建議使用進程,或者混合創建進程和線程。
首先弄清進程和線程之間的區別,這一點是非常重要的。線程與進程的不同之處在於,它們共用狀態、記憶體和資源。對於線程來說,這個簡單的區別既是它的優勢,又是它的缺點。一方面,線程是輕量級的,並且相互之間易於通信,但另一方面,它們也帶來了包括死鎖、爭用條件和高複雜性在內的各種問題。幸運的是,由於 GIL 和隊列模塊,與採用其他的語言相比,採用 Python 語言線上程實現的複雜性上要低得多。
使用 Python 線程
要繼續學習本文中的內容,我假定您已經安裝了 Python 2.5 或者更高版本,因為本文中的許多示例都將使用 Python 語言的新特性,而這些特性僅出現於 Python2.5 之後。要開始使用 Python 語言的線程,我們將從簡單的 "Hello World" 示例開始:
#! /usr/bin/env python #coding=utf-8 import threading import datetime class ThreadClass(threading.Thread): def run(self): now = datetime.datetime.now() print "%s says Hello World at time: %s" % (self.getName(),now) for i in range(2): t = ThreadClass() t.start()
結果:
Thread-1 says Hello World at time: 2012-06-20 14:43:26.981173
Thread-2 says Hello World at time: 2012-06-20 14:43:26.981375
仔細觀察輸出結果,您可以看到從兩個線程都輸出了 Hello World 語句,並都帶有日期戳。如果分析實際的代碼,那麼將發現其中包含兩個導入語句;一個語句導入了日期時間模塊,另一個語句導入線程模塊。類 ThreadClass
繼承自 threading.Thread
,也正因為如此,您需要定義一個 run 方法,以此執行您在該線程中要運行的代碼。在這個 run 方法中唯一要註意的是,self.getName()
是一個用於確定該線程名稱的方法。
最後三行代碼實際地調用該類,並啟動線程。如果註意的話,那麼會發現實際啟動線程的是 t.start()
。在設計線程模塊時考慮到了繼承,並且線程模塊實際上是建立在底層線程模塊的基礎之上的。對於大多數情況來說,從 threading.Thread
進行繼承是一種最佳實踐,因為它創建了用於線程編程的常規 API。
使用線程隊列
如前所述,當多個線程需要共用數據或者資源的時候,可能會使得線程的使用變得複雜。線程模塊提供了許多同步原語,包括信號量、條件變數、事件和鎖。當這些選項存在時,最佳實踐是轉而關註於使用隊列。相比較而言,隊列更容易處理,並且可以使得線程編程更加安全,因為它們能夠有效地傳送單個線程對資源的所有訪問,並支持更加清晰的、可讀性更強的設計模式,如”URL 獲取線程化“
#! /usr/bin/env python #coding=utf-8 import urllib2 import time import Queue import threading hosts = ["http://yahoo.com", "http://baidu.com", "http://amazon.com","http://ibm.com", "http://apple.com"] queue = Queue.Queue() class ThreadUrl(threading.Thread): '''Theaded url grab''' def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): """docstring for run""" while True: # grabs host from Queue host = self.queue.get() #grabs urls of hosts and prints first 1024 bytes of page url = urllib2.urlopen(host) print url.read(1024) #signals to queue job is done self.queue.task_done() start = time.time() def main(): """docstring for main""" #spawn a poll of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
對於這個示例,有更多的代碼需要說明,但與第一個線程示例相比,它並沒有複雜多少,這正是因為使用了隊列模塊。在 Python 中使用線程時,這個模式是一種很常見的並且推薦使用的方式。具體工作步驟描述如下:
1.創建一個 Queue.Queue() 的實例,然後使用數據對它進行填充。
2.將經過填充數據的實例傳遞給線程類,後者是通過繼承 threading.Thread 的方式創建的。
3.生成守護線程池。
4.每次從隊列中取出一個項目,並使用該線程中的數據和 run 方法以執行相應的工作。
5.在完成這項工作之後,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號。
6.對隊列執行 join 操作,實際上意味著等到隊列為空,再退出主程式。
在使用這個模式時需要註意一點:通過將守護線程設置為 true,將允許主線程或者程式僅在守護線程處於活動狀態時才能夠退出。這種方式創建了一種簡單的方式以控製程序流程,因為在退出之前,您可以對隊列執行 join 操作、或者等到隊列為空。隊列模塊文檔詳細說明瞭實際的處理過程,請參見參考資料:
join()
保持阻塞狀態,直到處理了隊列中的所有項目為止。在將一個項目添加到該隊列時,未完成的任務的總數就會增加。當使用者線程調用 task_done() 以表示檢索了該項目、並完成了所有的工作時,那麼未完成的任務的總數就會減少。當未完成的任務的總數減少到零時,join()
就會結束阻塞狀態。
使用多個隊列
因為上面介紹的模式非常有效,所以可以通過連接附加線程池和隊列來進行擴展,這是相當簡單的。在上面的示例中,您僅僅輸出了 Web 頁面的開始部分。而下一個示例則將返回各線程獲取的完整 Web 頁面,然後將結果放置到另一個隊列中。然後,對加入到第二個隊列中的另一個線程池進行設置,然後對 Web 頁面執行相應的處理。這個示例中所進行的工作包括使用一個名為 Beautiful Soup 的第三方 Python 模塊來解析 Web 頁面。使用這個模塊,您只需要兩行代碼就可以提取所訪問的每個頁面的 title 標記,並將其列印輸出,如“多隊列數據挖掘網站”例子:
#! /usr/bin/env python # coding: utf-8 import Queue import threading import urllib2 import time from BeautifulSoup import BeautifulSoup hosts = ["http://yahoo.com", "http://baidu.com", "http://amazon.com","http://ibm.com", "http://apple.com"] queue = Queue.Queue() out_queue = Queue.Queue() class ThreadUrl(threading.Thread): '''Threaded Url Grab''' def __init__(self,queue,out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): """grabs host from Queue""" host = self.queue.get() #grabs urls of hosts and then grabs chunk of webpage url = urllib2.urlopen(host) chunk = url.read() #place chunk into out_queuet self.out_queue.put(chunk) #signals to queue job is done self.queue.task_done() class DatamineThread(threading.Thread): '''Thread Url Grab''' def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): """grabs host from queue""" chunk = self.out_queue.get() #parse the chunk soup = BeautifulSoup(chunk) print soup.findAll(['title']) #signals to queue job is done self.out_queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue,out_queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) for i in range(5): dt = DatamineThread(out_queue) dt.setDaemon(True) dt.start() # wait on the queue until everything has been processed queue.join() out_queue.join() main() print "Elapsed Time: %s" % (time.time()-start)
分析這段代碼時您可以看到,我們添加了另一個隊列實例,然後將該隊列傳遞給第一個線程池類 ThreadURL
。接下來,對於另一個線程池類 DatamineThread
,幾乎複製了完全相同的結構。在這個類的 run 方法中,從隊列中的各個線程獲取 Web 頁面、文本塊,然後使用 Beautiful Soup 處理這個文本塊。在這個示例中,使用 Beautiful Soup 提取每個頁面的 title 標記、並將其列印輸出。可以很容易地將這個示例推廣到一些更有價值的應用場景,因為您掌握了基本搜索引擎或者數據挖掘工具的核心內容。一種思想是使用 Beautiful Soup 從每個頁面中提取鏈接,然後按照它們進行導航。
總結
本文研究了 Python 的線程,並且說明瞭如何使用隊列來降低複雜性和減少細微的錯誤、並提高代碼可讀性的最佳實踐。儘管這個基本模式比較簡單,但可以通過將隊列和線程池連接在一起,以便將這個模式用於解決各種各樣的問題。在最後的部分中,您開始研究如何創建更複雜的處理管道,它可以用作未來項目的模型。參考資料部分提供了很多有關常規併發性和線程的極好的參考資料。
最後,還有很重要的一點需要指出,線程並不能解決所有的問題,對於許多情況,使用進程可能更為合適。特別是,當您僅需要創建許多子進程並對響應進行偵聽時,那麼標準庫子進程模塊可能使用起來更加容易。有關更多的官方說明文檔,請參考參考資料部分。
文章地址:http://www.ibm.com/developerworks/cn/aix/library/au-threadingpython/