線程(下) 7.同步鎖 這個例子很經典,實話說,這個例子我是直接照搬前輩的,並不是原創,不過真的也很有意思,請看: 這段代碼的意思是,用一百個線程去減1,以此讓變數number為100的變為0 結果: 那麼我稍微的改下代碼看看: 並沒有很大的改變對吧,只是加了一個臨時變數,並且中途停頓了0.2s而已 ...
線程(下)
7.同步鎖
這個例子很經典,實話說,這個例子我是直接照搬前輩的,並不是原創,不過真的也很有意思,請看:
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time number = 100 def subnum(): global number number -= 1 threads = [] for i in range(100): t = threading.Thread(target=subnum,args=[]) t.start() threads.append(t) for i in threads: i.join() print(number)
這段代碼的意思是,用一百個線程去減1,以此讓變數number為100的變為0
結果:
那麼我稍微的改下代碼看看:
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time number = 100 def subnum(): global number temp = number time.sleep(0.2) number = temp -1 threads = [] for i in range(100): t = threading.Thread(target=subnum,args=[]) t.start() threads.append(t) for i in threads: i.join() print(number)
並沒有很大的改變對吧,只是加了一個臨時變數,並且中途停頓了0.2s而已。
而這個結果就不一樣了:
這裡我先說下,time.sleep(0.2)是我故意加的,就是要體現這個效果,如果你的電腦不加sleep就已經出現這個情況了那麼你就不用加了,這咋回事呢?這就是線程共用數據的潛在危險性,因為線程都是搶著CPU資源在運行,只要發現有空隙就各自搶著跑,所以在這停頓的0.2s時間中,就會有新的線程搶到機會開始運行,那麼一百個線程就有一百個線程在搶機會運行,搶到的時間都是在temp還沒有減1的值,也就是100,所以大部分的線程都搶到了100,然後減1,少部分線程沒搶到,搶到已經減了一次的99,這就是為什麼會是99的原因。而這個搶占的時間和結果並不是根本的原因,究其根本還是因為電腦的配置問題了,配置越好的話,這種越不容易發生,因為一個線程搶到CPU資源後一直在運行,其他的線程在短暫的時間里得不到機會。
而為什麼number -= 1,不藉助其他變數的寫法就沒事呢?因為numebr -= 1其實是兩個步驟,減1並重新賦值給number,這個動作太快,所以根本沒給其他的線程機會。
圖解:
那麼這個問題我們怎麼解決呢,在以後的開發中絕對會遇到這種情況對吧,這個可以解決呢?根據上面的講解,有人會想到用join,而前面已經提過了join會使多線程變成串列,失去了多線程的用意。這個到底怎麼解決呢,用同步鎖
同步鎖:當運行開始加鎖,防止其他線程索取,當運行結束釋放鎖,讓其他線程繼續
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time r = threading.Lock() #創建同步鎖對象 number = 100 def subnum(): global number r.acquire() #加鎖 temp = number time.sleep(0.2) number = temp - 1 r.release() #釋放 threads = [] for i in range(100): t = threading.Thread(target=subnum,args=[]) t.start() threads.append(t) for i in threads: i.join() print(number)
運行結果:
但是你發現沒,這個運行太慢了,每個線程都運行了一次sleep,竟然又變成和串列運行差不多了對吧?不過還是和串列稍微有點不同,只是在有同步鎖那裡是串列,在其他地方還是多線程的效果
那麼有朋友要問了,既然都是鎖,已經有了一個GIL,那麼還要同步鎖來幹嘛呢?一句話,GIL是著重於保證線程安全,同步鎖是用戶級的可控機制,開發中防止這種不確定的潛在隱患
8.死鎖現象/可重用鎖
前面既然已經用了同步鎖,那麼相信在以後的開發中,絕對會用到使用多個同步鎖的時候,所以這裡模擬一下使用兩個同步鎖,看看會有什麼現象發生
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time a = threading.Lock() #創建同步鎖對象a b = threading.Lock() #創建同步鎖對象b def demo1(): a.acquire() #加鎖 print('threading model test A....') b.acquire() time.sleep(0.2) print('threading model test B....') b.release() a.release() #釋放 def demo2(): b.acquire() #加鎖 print('threading model test B....') a.acquire() time.sleep(0.2) print('threading model test A....') a.release() b.release() #釋放 threads = [] for i in range(5): t1 = threading.Thread(target=demo1,args=[]) t2 = threading.Thread(target=demo2,args=[]) t1.start() t2.start() threads.append(t1) threads.append(t2) for i in threads: i.join()
運行結果:
這裡就一直阻塞住了,因為demo1函數用的鎖是外層a鎖,內層b鎖,demo2函數剛好相反,外層b鎖,內層a鎖,所以當多線程運行時,兩個函數同時在互搶鎖,誰也不讓誰,這就導致了阻塞,這個阻塞現象又叫死鎖現象。
那麼為了避免發生這種事,我們可以使用threading模塊下的RLOCK來創建重用鎖依此來避免這種現象
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time r = threading.RLock() #創建重用鎖對象 def demo1(): r.acquire() #加鎖 print('threading model test A....') r.acquire() time.sleep(0.2) print('threading model test B....') r.release() r.release() #釋放 def demo2(): r.acquire() #加鎖 print('threading model test B....') r.acquire() time.sleep(0.2) print('threading model test A....') r.release() r.release() #釋放 threads = [] for i in range(5): t1 = threading.Thread(target=demo1,args=[]) t2 = threading.Thread(target=demo2,args=[]) t1.start() t2.start() threads.append(t1) threads.append(t2) for i in threads: i.join()
運行結果:
這個Rlock其實就是Lock+計算器,計算器里的初始值為0,每嵌套一層鎖,計算器值加1,每釋放一層鎖,計算器值減1,和同步鎖一樣,只有當值為0時才算結束,讓其他線程接著搶著運行。而這個Rlock也有一個官方一點的名字,遞歸鎖
那麼估計有朋友會問了,為什麼會有死鎖現象呢?或者你應該問,是什麼生產環境導致有死鎖現象的,還是那句,為了保護數據同步性,防止多線程操作同一數據時發生衝突。這個說辭很籠統對吧,我說細點。比如前面的購物車系統,雖然我們在操作數據時又重新取了一遍數據來保證數據的真實性,如果多個用戶同時登錄購物車系統在操作的話,或者不同的操作但會涉及到同一個數據的時候,就會導致數據可能不同步了,那麼就可以在內部代碼裡加一次同步鎖,然後再在實際操作處再加一次同步鎖,這樣就出現多層同步鎖,那麼也就會出現死鎖現象了,而此時這個死鎖現象是我們開發中正好需要的。
我想,說了這個例子你應該可以理解為什麼lock里還要有lock,很容易導致死鎖現象我們還是要用它了,總之如果需要死鎖現象就用同步鎖,不需要就換成遞歸鎖。
9.信號量/綁定式信號量
信號量也是一個線程鎖
1)Semaphore
信號量感覺更有具有多線程的意義。先不急著說,看看例子就懂:
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time s = threading.Semaphore(3) #創建值為3的信號量對象 def demo(): s.acquire() #加鎖 print('threading model test A....') time.sleep(2) s.release() #釋放 threads = [] for i in range(10): t = threading.Thread(target=demo,args=[]) t.start() threads.append(t) for i in threads: i.join()
運行結果:
如果你親自測試這段代碼,你會發現,這個結果是3個一組出的,出了3次3個一組的,最後出了一個一組,3個一組都是並行的,中間停頓2秒。
這裡可以給很形象的例子,假如某個地方的停車位只能同時停3輛車,當停車位有空時其他的車才可以停進來。這裡的3個停車位就相當於信號量。
2)BoundedSemaphore
既然有信號量為我們完成這些一組一組的操作結果,但敢不敢保證這些線程就不會突然的越出這個設定好的車位呢?比如設定好的3個信號量一組,我們都知道線程是爭強著運行,萬一就有除了設定的3個線程外的一兩個線程搶到了運行權,誰也不讓誰,就是要一起運行呢?好比,這裡只有3個車位,已經停滿了,但有人就是要去擠一擠,出現第4輛或者第5輛車的情況,這個和現實生活中的例子簡直太貼切了對吧?
那麼我們怎麼辦?當然這個問題早就有人想好了,所以有了信號量的升級版——綁定式信號量(BoundedSemaphore)。既然是升級版,那麼同信號量一樣該有的都有的,用法也一樣,就是有個功能,在設定好的幾個線程一組運行時,如果有其他線程也搶到運行權,那麼就會報錯。
比如thread_lock = threading.BoundedSemaphore(5),那麼多線程同時運行的線程數就必須在5以內(包括5),不然就報錯。換句話,它擁有了實時監督的功能,好比停車位上的保全,如果發現車位滿了,就禁止放行車輛,直到有空位了再允許車輛進入停車。
因為這個很簡單,就多了個監督功能,其他和semaphore一樣的用法,我就不演示了,自己琢磨吧
10.條件變數同步鎖
不多說,它也是一個線程鎖,本質上是在Rlock基礎之上再添加下麵的三個方法
condition = threading.Condition([Lock/RLock]),預設裡面的參數是Rlock
wait():條件不滿足時調用,釋放線程併進入等待阻塞
notify():條件創造後調用,通知等待池激活一個線程
notifyall():條件創造後調用,通知等待池激活所有線程
直接上例子
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time from random import randint class producer(threading.Thread): ''' 生產者 ''' def run(self): global Li while True: value = randint(0,100) #創建一百以內隨機數 print('生產者',self.name,'Append:'+str(value),Li) if con.acquire(): #加鎖 Li.append(value) #把產品加入產品列表裡 con.notify() #通知等待池裡的消費者線程激活並運行 con.release() #釋放 time.sleep(3) #每3秒做一次產品 class consumer(threading.Thread): ''' 消費者 ''' def run(self): global Li while True: con.acquire() #獲取條件變數鎖,必須和生產者同一個鎖對象,生產者通知後在此處開始運行 if len(Li) == 0: #如果產品列表內沒數據,表示消費者先搶到線程運行權 con.wait() #阻塞狀態,等待生產者線程通知 print('消費者',self.name,'Delete:'+str(Li [0]),Li) Li.remove(Li[0]) #刪除被消費者用掉的產品 con.release() #釋放 time.sleep(0.5) #每0.5秒用掉一個產品 con = threading.Condition() #創建條件變數鎖對象 threads = [] #線程列表 Li = [] #產品列表 for i in range(5): threads.append(producer()) threads.append(consumer()) for i in threads: i.start() for i in threads: i.join()
運行結果:
圖片只截取了部分,因為它一直在無線迴圈著的。這個生產者和消費者的模型很經典,必須理解,每個步驟分別什麼意思我都註釋了,不再贅述了。
11.event事件
類似於condition,但它並不是一個線程鎖,並且沒有鎖的功能
event = threading.Event(),條件環境對象,初始值為False
event.isSet():返回event的狀態值
event.wait():如果event.isSet()的值為False將阻塞
event.set():設置event的狀態值為True,所有阻塞池的線程激活併進入就緒狀態,等待操作系統調度
event.clear():恢復event的狀態值False
不多說,看一個例子:
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time class boss(threading.Thread): def run(self): print('boss:今晚加班!') event.isSet() or event.set() #設置為True time.sleep(5) #切換到員工線程 print('boss:可以下班了') event.isSet() or event.set() #又設置為True class worker(threading.Thread): def run(self): event.wait() #等待老闆發話,只有值為True再往下走 print('worker:唉~~~,又加班') time.sleep(1) #開始加班 event.clear() #設置標誌為false event.wait() #等老闆發話 print('worker:oh yeah,終於可以回家了') event = threading.Event() threads = [] for i in range(5): threads.append(worker()) threads.append(boss()) for i in threads: i.start() for i in threads: i.join()
運行結果:
其實這個和condition的通信原理是一樣的,只是condition用的是notify,event用的set和isset
*12.隊列(queue)
本質上,隊列是一個數據結構。
1)創建一個“隊列”對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。
2)將一個值放入隊列中
q.put(obj)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,預設為
1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。
3)將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,預設為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
例:
4)Python Queue模塊有三種隊列及構造函數:
- Python Queue模塊的FIFO隊列先進先出 class queue.Queue(maxsize)
- LIFO類似於堆,即先進後出 class queue.LifoQueue(maxsize)
- 還有一種是優先順序隊列級別越低越先出來 class queue.PriorityQueue(maxsize)
當maxsize值比put的數量少時就會阻塞住,當數據被get後留有空間才能接著put進去,類似於線程的信號量
5)queue中的常用方法(q = Queue.Queue()):
q.qsize():返回隊列的大小
q.empty():如果隊列為空,返回True,反之False
q.full():如果隊列滿了,返回True,反之False,q.full與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait():相當q.get(False)
q.put_nowait(item):相當q.put(item, False)
q.task_done():在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join():實際上意味著等到隊列為空,再執行別的操作
6)隊列有什麼好處,與列表區別
隊列本身就有一把鎖,內部已經維持一把鎖,如果你用列表的話,當環境是在多線程下,那麼列表數據就一定會有衝突,而隊列不會,因為此,隊列有個外號——多線程利器
例:
#!usr/bin/env python #-*- coding:utf-8 -*- # author:yangva import threading,time import queue from random import randint class productor(threading.Thread): def run(self): while True: r = randint(0,100) q.put(r) print('生產出來 %s 號產品'%r) time.sleep(1) class consumer(threading.Thread): def run(self): while True: result =q.get() print('用掉 %s 號產品'%result) time.sleep(1) q = queue.Queue(10) threads = [] for i in range(3): threads.append(productor()) threads.append(consumer()) for i in threads: i.start()
運行結果:
這裡根本不用加鎖就完成了前面的生產者消費者模型,因為queue裡面自帶了一把鎖。
好的,關於線程的知識點,講解完。
多線程式爬蟲
有的朋友學完線程還不知道線程到底能運用於哪些生活實際,好的,不多說,來,我們爬下堆糖網(https://www.duitang.com/)的校花照片。
import requests import urllib.parse import threading,time,os #設置照片存放路徑 os.mkdir('duitangpic') base_path = os.path.join(os.path.dirname(__file__),'duitangpic') #設置最大信號量線程鎖 thread_lock=threading.BoundedSemaphore(value=10) #通過url獲取數據 def get_page(url): header={'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'} page=requests.get(url,headers=header) page=page.content #content是byte #轉為字元串 page=page.decode('utf-8') return page #label 即是搜索關鍵詞 def page_from_duitang(label): pages=[] url='https://www.duitang.com/napi/blog/list/by_search/?kw={}&start={}&limit=1000' label=urllib.parse.quote(label)#將中文轉成url(ASCII)編碼 for index in range(0,3600,100): u=url.format(label,index) #print(u) page=get_page(u) pages.append(page) return pages def findall_in_page(page,startpart,endpart): all_strings=[] end=0 while page.find(startpart,end) !=-1: start=page.find(startpart,end)+len(startpart) end=page.find(endpart,start) string=page[start:end] all_strings.append(string) return all_strings def pic_urls_from_pages(pages): pic_urls=[] for page in pages: urls=findall_in_page(page,'path":"','"') #print('urls',urls) pic_urls.extend(urls) return pic_urls def download_pics(url,n): header={'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'} r=requests.get(url,headers=header) path=base_path+'/'+str(n)+'.jpg' with open(path,'wb') as f: f.write(r.content) #下載完,解鎖 thread_lock.release() def main(label): pages=page_from_duitang(label) pic_urls=pic_urls_from_pages(pages) n=0 for url in pic_urls: n+=1 print('正在下載第{}張圖片'.format(n)) #上鎖 thread_lock.acquire() t=threading.Thread(target=download_pics,args=(url,n)) t.start() main('校花')
運行結果:
在與本py文件相同的目錄下,有個duitangpic的文件夾,打開看看:
全是美女,而且不出意外又好幾千張呢,我這隻有一千多張是因為我手動結束了py程式運行,畢竟我這是演示,不需要真的等程式運行完。我大概估計,不出意外應該能爬到3000張左右的照片
怎麼樣,老鐵,得勁不?刺不刺激?感受到多線程的用處了不?而且這還是python下的偽多線程(IO密集型,但並不算是真正意義上的多線程),你用其他的語言來爬更帶勁。