協程 Event事件 python 添加全局變數,修改全局變數,實現一個線程在某一個節點讓下一個線程繼續工作 import time from threading import Thread from threading import current_thread flag = False def ...
協程
什麼是協成?單個線程併發的處理多個任務,程式控制協成的切換+保持狀態,協成的切換速度非常快,矇蔽了操作系統的眼睛,讓操作系統認為CPU一直在運行
進程或線程都是由操作系統控制CPU來回切換,遇到阻塞就切換執行其他任務,協成是程式控制的,霸占CPU執行任務,會在操作系統控制CPU之前來回切換,操作系統就認為CPU一直在運作
協程的優點:
1.開銷小
2.運行速度快
3.協程會長期霸占CPU只執行我程式里的所有任務
協程的缺點:
1.協程屬於微併發,處理任務不易過多
2.協程的本質是單線程,無法利用多核,可以一個程式開啟多個進程,每個進程開啟多個線程.每個線程開啟協程
協程處理io密集型比較好
協程的特點:
1.必須在只有一個單線程里實現併發
2.修改共用數據不需要加鎖
3.保持狀態
4.一個協程遇到io操作就自動切換到其他協程
工作中:
一般在工作中我們都是進程+線程+協程的方式來實現併發,以達到最好的併發效果,如果是4核的cpu,一般起5個進程,每個進程中20個線程(5倍cpu數量),每個線程可以起500個協程,大規模爬取頁面的時候,等待網路延遲的時間的時候,我們就可以用協程去實現併發。 併發數量 = 5 * 20 * 500 = 50000個併發,這是一般一個4cpu的機器最大的併發數。nginx在負載均衡的時候最大承載量就是5w個
#當一個任務沒有阻塞的時候
import time
def task():
res = 1
for i in range(1,1000000):
res += 1
def task1():
res = 1
for i in range(1,1000000):
res -= 1
strt = time.time()
task()
task1()
end = time.time()
print(f"串列執行效率:{end - strt}") #串列執行效率:0.07783079147338867
#第一次接觸協程是yield
#沒有io阻塞
import time
def task():
res = 1
for i in range(1,1000000):
res += 1
yield res
def task1():
g = task()
res = 1
for i in range(1,1000000):
res -= 1
next(g)
strt = time.time()
task()
task1()
end = time.time()
print(f"協成執行效率:{end - strt}")#協成執行效率:0.21143341064453125
純計算密集型,沒有io情況下,串列的執行效率高於協程
# 並不是真正的阻塞
import gevent
def eat(name):
print(f"{name} eat 1") #1
gevent.sleep(2) #模擬的是gevent可以識別的阻塞
print(f"{name} eat 2") #4
def play(name):
print(f"{name} play 1") #2
gevent.sleep(1)
print(f"{name} play 2") #3
g1 = gevent.spawn(eat,'八戒')
g2 = gevent.spawn(play,name = '悟空')
g1.join()
g2.join()
print("主") #5
import threading
from gevent import monkey
import gevent
import time
monkey.patch_all() # 打補丁:將下麵的所有的任務的阻塞打上標記,遇到這個標記就切換
def eat():
print(f"線程1:{threading.current_thread().getName()}") #1
print('eat food 1') #2
time.sleep(2) #3阻塞,去執行別的任務
print('eat food 2') #8
def play():
print(f"線程2:{threading.current_thread().getName()}") #4
print('play 1') #5
time.sleep(1) #6阻塞,再去執行別的任務,上一個任務還在阻塞,繼續向下執行
print('play 2') #7
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
print(f"{threading.current_thread().getName()}") #9
註意在各個任務中的阻塞時間
Event事件
添加全局變數,修改全局變數,實現一個線程在某一個節點讓下一個線程繼續工作
import time
from threading import Thread
from threading import current_thread
flag = False
def task():
print(f"{current_thread().name}檢測伺服器是否正常開啟.....")
time.sleep(2)
global flag
flag = True
def task1():
while 1:
time.sleep(1)
print(f"{current_thread().name}正在連接伺服器....")
if flag:
print(f"{current_thread().name}連接成功")
return
if __name__ == '__main__':
t1 = Thread(target=task1)
t2 = Thread(target=task1)
t3 = Thread(target=task1)
t = Thread(target=task)
t.start()
t1.start()
t2.start()
t3.start()
import time
from threading import Thread
from threading import current_thread
from threading import Event
import random
event = Event() #預設為False
def task1():
print(f"{current_thread().getName()} 檢測伺服器知否開啟...") #獲取線程名字
time.sleep(random.randint(1,3))
def task2():
print(f"{current_thread().getName()}正在嘗試連接伺服器...")
count = 1
while count < 4:
time.sleep(1)
print(f"{current_thread().getName()}連接第{count}次")
if count < 4:
time.sleep(0.5)
event.set() #將event狀態修改為True
print('連接成功')
count += 1
t1 = Thread(target=task1)
t2 = Thread(target=task2)
t1.start()
t2.start()
import time
from threading import Thread
from threading import current_thread
from threading import Event
import random
event = Event()
def check():
print(f"{current_thread().name}檢測伺服器是否開啟...")
time.sleep(random.randint(1,3))
event.set()
print("連接成功")
def connect():
count = 1
while not event.is_set(): #event.is_set()判斷狀態是True或False
if count == 4:
print('連接次數過多,已斷開')
break
event.wait(1) #輪詢檢測event狀態,如果為真,就向下執行,是個阻塞, # 只阻塞1秒,1秒之後如果還沒有進行set 直接進行下一步操作.
print(f"{current_thread().name}嘗試連接{count}次")
count += 1
else:
print(f"{current_thread().name}連接成功")
t1 = Thread(target=check)
t2 = Thread(target=connect)
t1.start()
t2.start()
線程隊列
import queue
# 先進先出原則
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False)) #設置多取會報錯
q.get(timeout=2)#阻塞兩秒,後報錯
# print(q.get())多去一個會阻塞
# 後進先出
q = queue.LifoQueue(4)
q.put(1)
q.put(2)
q.put('alex')
q.put('太白')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# 太白
# alex
# 2
# 1
優先順序隊列
最小的先取出
q = queue.PriorityQueue(4)
q.put((1,'qq'))
q.put((-2,'qq1'))
q.put((0,'qq2'))
print(q.get())
print(q.get())
print(q.get())
#(-2, 'qq1')
#(0, 'qq2')
#(1, 'qq')
同步
同步調用(提交完任務後,就在原地等待任務執行完後,拿到結果,再執行下一行代碼)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f"{os.getpid()}開始任務")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}結束任務")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10): #同時開啟10個進程
obj = pool.submit(task,i) #類似於發佈任務
print(f"任務結果:{obj.result()}") # 對象加result()就變成同步,獲取的是運行狀態.obj就為動態對象, (running,pending,finish) 動態對象.result()獲取結果
#obj.result() 必須等到這個任務完成後,返回了結果,在執行下一個任務
pool.shutdown(wait=True) #shutdown 讓我的主進程等待進程池中所有子進程結束後,在執行,類似於join
#在上一個進程池沒有完成任務之前,不允許添加新的任務,一個任務通過一個函數實現,任務完成了他的返回值就是函數的返回值
print("主")
非同步
非同步調用(提交完任務後,不再原地等待任務執行完)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f"{os.getpid()}開始任務")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}結束任務")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10):
pool.submit(task,i)
pool.shutdown(wait=True) #等待進程池中所有子進程結束後在向下執行,類似於join
print("主")
存在一個列表中,統一回收結果 ,把動態對象放在列表中,(容器)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f"{os.getpid()}開始任務")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}結束任務")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
l = []
for i in range(10):
obj = pool.submit(task,i)
l.append(obj)
pool.shutdown(wait=True)
for i in l:
print(i.result())
print("主")
雖然是同時發佈了任務,但是在回收結果的時候,不能馬上收到一個結束任務的返回值,只能等所有任務結束後統一收到結果
非同步調用的如何取值?
1.
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
import requests
# 模擬的就是爬取多個源代碼 一定有IO操作
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(content):
return len(content)
if __name__ == '__main__':
ret = task('http://www.baidu.com')
print(parse(ret))
ret = task('http://www.JD.com')
print(parse(ret))
ret = task('http://www.taobao.com')
print(parse(ret))
ret = task('https://www.cnblogs.com/jin-xin/articles/7459977.html')
print(parse(ret))
#這樣寫為執行一個task函數,在執行一個parse函數,執行效率低,一個任務執行2秒,20個任務就是40秒,耗時
#這兩個函數為一個任務,當一個任務執行完成後再執行下一個任務,串列
2.
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
import requests
# 模擬的就是爬取多個源代碼 一定有IO操作
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(content):
return len(content)
if __name__ == '__main__':
# 開啟一個線程池,併發執行
url_list = ['http://www.baidu.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html'
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task,url)
obj_list.append(obj)
pool.shutdown(wait=True)
#這個for迴圈把爬取的源碼添加到一個類表中,對象,執行task函數
for i in obj_list:
print(parse(i.result()))
迴圈取出類表中的元素,當做參數傳入parse函數中,進行數據分析
以上版本缺點:非同步的發出10個任務,併發的執行任務,但是分析結果的流程是串列,沒有做到結束一個任務就返回一個返回值,效率低
3.
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return parse(ret.text) #更改函數,直接返回並調用parse函數
def parse(content):
return len(content)
if __name__ == '__main__':
# 開啟一個線程池,併發執行
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/'
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task,url)
obj_list.append(obj)
pool.shutdown(wait=True)
for i in obj_list:
print(i.result())#直接返回結果,不用再調用函數,直接全部列印出來結果
4.非同步調用+回調函數**************************
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import random
import os
import requests
def task(url):
'''模擬的就是爬取多個源代碼 一定有IO操作'''
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(obj):
'''模擬對數據進行分析 一般沒有IO'''
print(len(obj.result()))
if __name__ == '__main__':
# 開啟線程池,併發並行的執行
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/'
]
pool = ThreadPoolExecutor(4)
for url in url_list:
obj = pool.submit(task, url)
obj.add_done_callback(parse)
#add_done_callback回調函數,執行一個任務就返回一個函數的返回值,增加回調函數,發佈完任務後直接執行本行,當同一進程執行完第一次任務後,主進程立即對其結果進行分析,不必等所有進程全部執行完再分析,提高了效率
線程池設置4個線程,非同步發起10個任務,每次執行4個任務,執行完任務的時間肯定有先後順序,先執行完的,將parse分析代碼的任務
交給空閑線程去執行,然後這個線程再去執行其他任務,比爬取源碼的任務就和分析數據的任務共同併發執行
非同步回收收取結果的兩種方式?
1.將所有任務的結果同意回收
2.完成一個任務,返回一個結果
非同步+回調函數
回調函數:按順序接收每個任務的結果,進行下一步處理
前提:
非同步處理的多個任務(io任務),回調函數處理的非io
區別:
多進程,由主進程執行回調函數的代碼
多線程,由空閑線程執行回調函數的代碼