1. python線程 1.1 全局解釋器鎖GIL Python代碼的執行由Python虛擬機(也叫解釋器主迴圈)來控制。Python在設計之初就考慮到要在主迴圈中,同時只有一個線程在執行。雖然 Python 解釋器中可以“運行”多個線程,但在任意時刻只有一個線程在解釋器中運行。對Python虛擬機 ...
1. python線程
1.1 全局解釋器鎖GIL
Python代碼的執行由Python虛擬機(也叫解釋器主迴圈)來控制。Python在設計之初就考慮到要在主迴圈中,同時只有一個線程在執行。雖然 Python 解釋器中可以“運行”多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。
在多線程環境中,Python 虛擬機按以下方式執行:
設置 GIL
切換到一個線程去運行
運行指定數量的位元組碼指令或者線程主動讓出控制(可以調用 time.sleep(0))
把線程設置為睡眠狀態
解鎖 GIL
再次重覆以上所有步驟
在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束為止(由於在這期間沒有Python的位元組碼被運行,所以不會做線程切換)編寫擴展的程式員可以主動解鎖GIL。
1.2 python線程模塊的選擇
Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊允許程式員創建和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊允許用戶創建一個可以用於多個線程之間共用數據的隊列數據結構。
避免使用thread模塊,因為更高級別的threading模塊更為先進,對線程的支持更為完善,而且使用thread模塊里的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語很少(實際上只有一個),而threading模塊則有很多;再者,thread模塊中當主線程結束時,所有的線程都會被強制結束掉,沒有警告也不會有正常的清除工作,至少threading模塊能確保重要的子線程退出後進程才退出。
thread模塊不支持守護線程,當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。而threading模塊支持守護線程,守護線程一般是一個等待客戶請求的伺服器,如果沒有客戶提出請求它就在那等著,如果設定一個線程為守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。
2. threading模塊
multiprocess模塊的完全模仿了threading模塊的介面,二者在使用層面,有很大的相似性,因而不再詳細介紹。
2.1 線程的創建Threading.Thread類
2.1.1 線程的創建
創建線程的方式1:
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')
創建線程的方式2:
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')
2.1.2 多線程與多進程
pid的比較:
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啟多個線程,每個線程都跟主進程的pid一樣 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每個進程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid())
開啟效率的較量:
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啟線程 t=Thread(target=work) t.start() print('主線程/主進程') ''' 列印結果: hello 主線程/主進程 ''' #在主進程下開啟子進程 t=Process(target=work) t.start() print('主線程/主進程') ''' 列印結果: 主線程/主進程 hello '''
記憶體數據的共用問題:
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) #毫無疑問子進程p已經將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) #查看結果為0,因為同一進程內的線程之間共用進程內的數據 同一進程內的線程共用該進程的數據?
2.1.3 Thread類的其他方法
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變數。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
代碼示例:
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進程下開啟線程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主線程 print(threading.enumerate()) #連同主線程在內有兩個運行的線程 print(threading.active_count()) print('主線程/主進程') ''' 列印結果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主線程/主進程 Thread-1 '''
join方法:
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主線程') print(t.is_alive()) ''' egon say hello 主線程 False '''
2.1.4 守護線程
無論是進程還是線程,都遵循:守護xx會等待主xx運行完畢後被銷毀。需要強調的是:運行完畢並非終止運行。
對主進程來說,運行完畢指的是主進程代碼運行完畢。
對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢。
主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),然後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(否則會產生僵屍進程),才會結束。
主線程在其他非守護線程運行完畢後才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味著進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
守護線程實例1:
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()之前設置 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
守護線程實例2:
import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
2.2 鎖
2.2.1 同步鎖
多個線程搶占資源的情況:
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果可能為99
對公共數據的操作:
import threading R=threading.Lock() R.acquire() R.release()
同步鎖的引用:
from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果肯定為0,由原來的併發執行變成串列,犧牲了執行效率保證了數據安全
互斥鎖與join的區別:
#不加鎖:併發執行,速度快,數據不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 ''' #不加鎖:未加鎖部分併發執行,加鎖部分串列執行,速度慢,數據安全 from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串列運行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 ''' #有的同學可能有疑問:既然加鎖會讓運行變成串列,那麼我在start之後立即使用join,就不用加鎖了啊,也是串列的效果啊 #沒錯:在start之後立刻使用jion,肯定會將100個任務的執行變成串列,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是 #start後立即join:任務內的所有代碼都是串列執行的,而加鎖,只是加鎖的部分即修改共用數據的部分是串列的 #單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高. from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 ''' )
2.2.2 死鎖與遞歸鎖
進程也有死鎖與遞歸鎖,在進程那裡忘記說了,放到這裡一切說了額。
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖。
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:
遞歸鎖RLock:
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
典型問題:科學家吃面
死鎖問題:
import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麵條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃面'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麵條' % name) print('%s 吃面' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
遞歸鎖解決死鎖問題:
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麵條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃面'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麵條' % name) print('%s 吃面' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
2.3 線程隊列
queue隊列 :使用import queue,用法與進程Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
queue.
Queue
(maxsize=0) #先進先出
先進先出:
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
後進先出:
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
class queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先順序的隊列
優先順序隊列:
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先順序越高,優先順序高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
2.4 python標準模塊--concurrent.futures
#1 介紹 concurrent.futures模塊提供了高度封裝的非同步調用介面 ThreadPoolExecutor:線程池,提供非同步調用 ProcessPoolExecutor: 進程池,提供非同步調用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 非同步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for迴圈submit的操作 #shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源後才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程式都會等到所有任務執行完畢 submit和map必須在shutdown之前 #result(timeout=None) 取得結果 #add_done_callback(fn) 回調函數 # done() 判斷某一個線程是否完成 # cancle() 取消某個任務
ProcessPoolExecutor:
#介紹 The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised. #用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
ThreadPoolExecutor:
#介紹 ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. #用法 與ProcessPoolExecutor相同
map的用法:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
回調函數:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果