[TOC] Event事件 用來控制線程的執行 出現 ,就會把這個線程設置為False,就不能執行這個任務; 只要有一個線程出現 ,就會告訴Event對象,把有 的用戶全部改為True,剩餘的任務就會立馬去執行。由一些線程去控制另一些線程,中間通過Event。 進程池與線程池 1. 進程池與線程池是 ...
目錄
Event事件
用來控制線程的執行
出現e.wait()
,就會把這個線程設置為False,就不能執行這個任務;
只要有一個線程出現e.set()
,就會告訴Event對象,把有e.wait
的用戶全部改為True,剩餘的任務就會立馬去執行。由一些線程去控制另一些線程,中間通過Event。
from threading import Event
from threading import Thread
import time
# 調用Event實例化出對象
e = Event()
#
# # 若該方法出現在任務中,則為False,阻塞
# e.wait() # False
# # 若該方法出現在任務中,則將其他線程的False改為True,進入就緒態和運行態
# e.set() # True
def light():
print('紅燈亮...')
time.sleep(5)
# 應該發出信號,告訴其他線程準備執行
e.set() # 將car中的False變為True
print('綠燈亮...')
def car(name):
print('正在等紅燈...')
# 讓所有汽車任務進入阻塞態
e.wait() # False
print(f'{name}正在加速飄逸...')
# 讓一個light線程式控制制多個car線程
t = Thread(target=light)
t.start()
for i in range(10):
t = Thread(target=car, args=(f'汽車{i}號', ))
t.start()
進程池與線程池
進程池與線程池是用來控制當前程式允許創建(進程/線程)的數量
作用:保證在硬體允許的範圍內創建(進程/線程)的數量
線程池使用一:
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(5) # 5代表只能開啟5個進程, 不加預設使用cpu的進程數
# ThreadPoolExecutor(5) # 5代表只能開啟5個線程
# pool.submit() #非同步提交任務, 括弧里傳函數地址
def task():
print('線程任務開始了...')
time.sleep(1)
print('線程任務結束了...')
for line in range(5):
pool.submit(task)
使用二:
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(5) # 5代表只能開啟5個進程, 不加預設使用cpu的進程數
# ThreadPoolExecutor(5) # 5代表只能開啟5個線程
# pool.submit() #非同步提交任務, 括弧里傳函數地址
def task():
print('線程任務開始了...')
time.sleep(1)
print('線程任務結束了...')
return 123
# 回調函數
def call_back(res):
print(type(res))
res2 = res.result() # 註意:賦值操作不要與接收的res同名
print(res2)
for line in range(5):
pool.submit(task).add_done_callback(call_back)
pool.shutdown()
會讓所有線程池的任務結束後,才往下執行代碼
多線程爬取梨視頻
利用requests模塊,封裝底層socket套接字
- 主頁中獲取所有視頻id號,拼接視頻詳情頁url
- 在視頻詳情頁中獲取真實視頻url srcUrl=
- 往真實視頻url地址發送請求獲取 視頻 二進位數據
- 最後把視頻二進位數據保存到本地
協程
- 進程: 資源單位
- 線程: 執行單位
- 協程: 在單線程下實現併發
註意: 協程不是操作系統資源,目的是讓單線程實現併發
協程目的
- 操作系統:使用多道技術,切換 + 保存狀態,一個是遇到IO, 另一個是CPU執行時間過長
- 協程:通過手動模擬操作系統 “多道計數”, 實現 切換 + 保存狀態
- 手動實現,遇到IO切換,欺騙操作系統誤以為沒有IO操作
- 單線程時,遇到IO,就切換 + 保存狀態
- 單線程時,對於計算密集型,來回切換 + 保存狀態反而效率更低
優點:在IO密集型的情況下,會提高效率
缺點:若在計算密集型的情況下,來回切換,反而效率更低
import time
def func1():
for i in range(10000000):
i+1
def func2():
for i in range(10000000):
i+1
start = time.time()
func1()
func2()
stop = time.time()
print(stop - start) # 1.0312113761901855
# 基於yield實現併發 在計算密集型的情況下效率更低
def func1():
while True:
10000000+1
yield
def func2():
g = func1()
for i in range(10000000):
i+1
next(g) # 每次執行next相當於切換到func1下麵
start = time.time()
func2()
stop = time.time()
print(stop - start) # 1.3294126987457275
gevent
gevent是一個第三方模塊,可以幫你監聽IO操作,並切換
使用gevent的目的:在單線程下實現,遇到IO就會 保存狀態 + 切換
import time
from gevent import monkey
monkey.patch_all() # 可以監聽該程式下所有的IO操作
from gevent import spawn, joinall # 用於做切換 + 保存狀態
def func1():
print('1')
time.sleep(1) # IO操作
def func2():
print('2')
time.sleep(3)
def func3():
print('3')
time.sleep(5)
start = time.time()
s1 = spawn(func1)
s2 = spawn(func2)
s3 = spawn(func3)
s1.join() # 發送信號,相當於等待自己(在單線程的情況下)
s2.join()
s3.join()
# joinall((s1, s2, s3)) # 一個個執行很麻煩,可以用joinall把這些全部裝進去
end = time.time()
print(end - start) # 5.006161451339722
TCP服務端socket套接字實現協程
服務端:
from gevent import monkey
from gevent import spawn
import socket
monkey.patch_all()
server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen(5)
def task(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0:
break
print(data.decode('utf-8'))
send_data = data.upper()
conn.send(send_data)
except Exception:
break
conn.close()
def server2():
while True:
conn, addr = server.accept()
print(addr)
spawn(task, conn)
if __name__ == '__main__':
s = spawn(server2)
s.join()
客戶端:
import socket
from threading import Thread, current_thread
def client():
client = socket.socket()
client.connect(('127.0.0.1', 9999))
number = 0
while True:
send_data = f'{current_thread().name} {number}'
client.send(send_data.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
number += 1
for i in range(400):
t = Thread(target=client)
t.start()