引子 上一節中我們知道GIL鎖將導致CPython中多線程無法並行執行,只能併發的執行。 而併發實現的原理是切換+保存,那就意味著使用多線程實現併發,就需要為每一個任務創建一個線程,必然增加了線程創建銷毀與切換的帶來的開銷 明顯的問題就是,高併發情況下,由於任務數量太多導致無法開啟新的線程,使得即沒 ...
引子
上一節中我們知道GIL鎖將導致CPython中多線程無法並行執行,只能併發的執行。
而併發實現的原理是切換+保存,那就意味著使用多線程實現併發,就需要為每一個任務創建一個線程,必然增加了線程創建銷毀與切換的帶來的開銷
明顯的問題就是,高併發情況下,由於任務數量太多導致無法開啟新的線程,使得即沒有實際任務要執行,也無法創建新線程來處理新任務的情況
如何解決上述問題呢,首先要保證併發效果,然後來想辦法避免創建線程帶來的開銷問題;
協程既是因此而出現的,其原理是使用單線程來實現多任務併發,那麼如何能實現單線程併發呢?
一、單線程實現併發
是否可行
單線程實現併發這句話乍一聽好像在瞎說
首先需要明確併發的定義
併發:指的是多個任務同時發生,看起來好像是同時都在進行
並行:指的是多個任務真正的同時進行
早期的電腦只有一個CPU,既然CPU可以切換線程來實現併發,那麼為何不能線上程中切換任務來併發呢?
所以線程實現併發理論上是可行的
如何夠實現
併發 = 切換任務+保存狀態,只要找到一種方案,能夠在兩個任務之間切換執行並且保存狀態,那就可以實現單線程併發
python中的生成器就具備這樣一個特點,每次調用next都會回到生成器函數中執行代碼,這意味著任務之間可以切換,並且是基於上一次運行的結果,這意味著生成器會自動保存執行狀態!
於是乎我們可以利用生成器來實現併發執行:
def task1():
while True:
yield
print("task1 run")
def task2():
g = task1()
while True:
next(g)
print("task2 run")
task2()
併發雖然實現了,但是這對效率的影響是好是壞呢?來測試一下
# 兩個計算任務一個採用生成器切換併發執行 一個直接串列調用
import time
def task1():
a = 0
for i in range(10000000):
a += i
yield
def task2():
g = task1()
b = 0
for i in range(10000000):
b += 1
next(g)
s = time.time()
task2()
print("併發執行時間",time.time()-s)
# 單線程下串列執行兩個計算任務 效率反而比併發高 因為併發需要切換和保存
def task1():
a = 0
for i in range(10000000):
a += i
def task2():
b = 0
for i in range(10000000):
b += 1
s = time.time()
task1()
task2()
print("串列執行時間",time.time()-s)
可以看到對於純計算任務而言,單線程併發反而使執行效率下降了一半左右,所以這樣的方案對於純計算任務而言是沒有必要的
greenlet模塊實現併發
我們暫且不考慮這樣的併發對程式的好處是什麼,在上述代碼中,使用yield來切換是的代碼結構非常混亂,如果十個任務需要切換呢,不敢想象!因此就有人專門對yield進行了封裝,這便有了greenlet模塊
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name)
g2.switch('jack')
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('rose')#可以在第一次switch時傳入參數,以後都不需要再次傳
該模塊簡化了yield複雜的代碼結構,實現了單線程下多任務併發,但是無論直接使用yield還是greenlet都不能檢測IO操作,遇到IO時同樣進入阻塞狀態,同樣的對於純計算任務而言效率也是沒有任何提升的。
測試:
#切換
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=i
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=i
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題,
任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模塊。
二、協程
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。是一種用戶態的輕量級線程,即協程是由用戶程式自己控制調度的。
需要強調的是:
#1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他線程運行)
#2. 單線程內開啟協程,一旦遇到io,就會從應用程式級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換
優點如下:
#1. 協程的切換開銷更小,屬於程式級別的切換,操作系統完全感知不到,因而更加輕量級
#2. 單線程內就可以實現併發的效果,最大限度地利用cpu
缺點如下:
#1. 協程的本質是單線程下,無法利用多核,可以是一個程式開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程來儘可能提高效率
#2. 協程本質是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
gevent模塊
Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程式操作系統進程的內部,但它們被協作式地調度。
常用方法:
#用法
#創建一個協程對象g1,
g1=gevent.spawn(func,1,,2,3,x=4,y=5)
#spawn括弧內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束
#或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
遇到IO阻塞時會自動切換任務
import gevent,sys
from gevent import monkey # 導入monkey補丁
monkey.patch_all() # 打補丁
import time
print(sys.path)
def task1():
print("task1 run")
# gevent.sleep(3)
time.sleep(3)
print("task1 over")
def task2():
print("task2 run")
# gevent.sleep(1)
time.sleep(1)
print("task2 over")
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
#gevent.joinall([g1,g2])
g1.join()
g2.join()
# 執行以上代碼會發現不會輸出任何消息
# 這是因為協程任務都是以非同步方式提交,所以主線程會繼續往下執行,而一旦執行完最後一行主線程也就結束了,
# 導致了協程任務沒有來的及執行,所以這時候必須join來讓主線程等待協程任務執行完畢 也就是讓主線程保持存活
# 後續在使用協程時也需要保證主線程一直存活,如果主線程不會結束也就意味著不需要調用join
需要註意:
1.如果主線程結束了 協程任務也會立即結束。
2.monkey補丁的原理是把原始的阻塞方法替換為修改後的非阻塞方法,即偷梁換柱,來實現IO自動切換
必須在打補丁後再使用相應的功能,避免忘記,建議寫在最上方
我們可以用threading.current_thread().getName()來查看每個g1和g2,查看的結果為DummyThread-n,即假線程
monke補丁原理
#myjson.py
def dump():
print("一個被替換的 dump函數")
def load():
print("一個被替換的 load函數")
# test.py
import myjson
import json
# 補丁函數
def monkey_pacth_json():
json.dump = myjson.dump
json.load = myjson.load
# 打補丁
monkey_pacth_json()
# 測試
json.dump()
json.load()
# 輸出:
# 一個被替換的 dump函數
# 一個被替換的 load函數
使用Gevent案例一 爬蟲:
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' %url)
response=requests.get(url)
if response.status_code == 200:
print('%d bytes received from %s' %(len(response.text),url))
start_time=time.time()
gevent.joinall([
gevent.spawn(get_page,'https://www.python.org/'),
gevent.spawn(get_page,'https://www.yahoo.com/'),
gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
使用Gevent案例二 TCP:
伺服器
#=====================================服務端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
客戶端
#=====================================多線程模擬多個客戶端併發訪問
from threading import Thread
from socket import *
import threading
def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字對象一定要加到函數內,即局部名稱空間內,放在函數外則被所有線程共用,則大家公用一個套接字對象,那麼客戶端埠永遠一樣了
c.connect((server_ip,port))
count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()