python day 20: 線程池與協程 2019/11/1 [TOC] 資料來自老男孩教育 2. 線程 線程適用於IO密集流操作,線程是最小的執行單元 線程之間數據是共用的,共用一塊記憶體 import threading :導入線程模塊 t = threading.Thread(target=f ...
目錄
python day 20: 線程池與協程
2019/11/1
資料來自老男孩教育
2. 線程
線程適用於IO密集流操作,線程是最小的執行單元
線程之間數據是共用的,共用一塊記憶體
import threading :導入線程模塊
t = threading.Thread(target=func,args=())
t.start()
t.join([timeout]),此處join是wait的意思,即主線程等待子線程結束,最多等2秒
主進程是否等待子線程
t.setDaemon(True/False)
線程鎖:使用RLOCK
mutex = threading.RLock()創建鎖
mutex.acquire()獲得鎖
mutex.release()釋放鎖
Event事件
event_obj = threading.Event()創建事件對象
event_obj.wait()等待狀態,flag為False則阻塞所有的線程
event_obj.clear()將flag設置為False
event_obj.set()將flal設置為True
3. 進程
創建進程
import multiprocessing
if name=="main":
p = multiprocessing.Process(target=func,args=(),)
p.deamon = True/False(主進程是否等待子進程結束)
p.start()
p.join([timeout])主進程等待子進程結束,最多等待多少秒
進程之間數據不共用,每個進程使用獨立記憶體
進程間數據共用的實現方式Manage或Array
Array
數組必須一開始就定義好數組的長度
數組的元素必須是統一的數據類型
Manage()
manage = Manage()
manage.dict()無長度限制,比Array方便,進程間數據通信使用manage.dict是最優選擇。
進程池Pool
pool = multiprocessing.Pool(5),創建一個最多支持5個進程的進程池。
pool.apply(func,(1,))申請一個進程執行某個函數。每個任務是排隊執行。每個進程有join執行。
pool.apply_async(func=Foo,args=(1,),callback=Bar),申請一個進程去執行Foo方法,將Foo方法的返回值作為實參賦值給Bar函數。每個任務是併發執行。更多是使用apply_async。每個進程沒有join執行. 進程的deamon=True。
pool.close()關閉進程池,不再接收新請求。或pool.terminate()
pool.join(),進程池中進程執行完畢後主進程再關閉。
4. 協程:gevent模塊,又叫微線程
必須安裝greenlet模塊與gevent模塊
gevent代表高性能,當發IO請求時,尤其是網路IO請求時,使用協程
gevent.sleep(0)
gevent.joinall()
from gevent import monkey; monkey.patch_all()
import requests
import gevent
def f(url):
print("url: %s"%url)
ret = requests.get(url)
data = ret.text
print(url,len(data))
gevent.joinall([
gevent.spawn(f,"https://juejin.im/post/5aa7314e6fb9a028d936d2a4"),
gevent.spawn(f,"https://www.cnblogs.com/lanxing0422/p/pythonday19.html"),
gevent.spawn(f,"http://www.baidu.com"),
])
# # 用於IO密集流
# def foo():
# print("foo")
# # 執行gevent的sleep方法
# gevent.sleep(0)
# print("foo again")
#
# def bar():
# print("bar")
# gevent.sleep(0)
# print("bar again")
#
# gevent.joinall([
# gevent.spawn(foo),
# gevent.spawn(bar),
# ])
5. 擴展
生產者消費者模型
隊列:Queue
隊列的特性:先進先出
q = queue.Queue(max)
q.put()
q.get()
q.get_nowait
q.put_nowait()
rabbit_m_queue開源的,特別牛逼的
6. 自定義線程池
python內部沒有提供,需要自定義。
實際使用中,更多是基於線程池來使用,而不是創建一個單獨的線程。
非常重要,以後會經常使用。
import threading
import queue
import time
import contextlib
Stopevent = object()
class ThreadPool(object):
'''
進程池
'''
def __init__(self, max_num):
self.max_num = max_num
self.q = queue.Queue()
self.terminate_flag = False
# 真實創建的線程列表
self.generate_list = []
# 空閑中的線程列表
self.free_list = []
def generate_thread(self):
'''
創建一個線程執行任務
:return:
'''
t = threading.Thread(target=self.call)
t.start()
def run(self, func, args, callback=None):
'''
線程池執行一個任務
:param func: 任務函數名
:param args: 任務函數所需參數元組
:param callback: 任務執行失敗或成功後執行的回調函數
:return: 如果線程池已經終止,則返回True否則None
'''
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
tuple_obj = (func, args, callback)
self.q.put(tuple_obj)
def call(self):
'''
迴圈獲取任務並執行
:return:
'''
# 獲取當前線程並添加到列表中
current_thread = threading.currentThread()
self.generate_list.append(current_thread)
# 從隊列中取任務
event = self.q.get()
# 當該任務的類型不是Stopevent時,死迴圈
while event != Stopevent:
func, args, callback = event
status = True
try:
ret = func(*args)
except Exception as e:
status = False
ret = e
# 當回調函數不為空時,就執行回調函數
if callback:
try:
callback(status, ret)
except Exception as e:
pass
# self.free_list.append(current_thread)
# event = self.q.get()
# self.free_list.remove(current_thread)
with self.work_state(self.free_list,current_thread):
if not self.terminate_flag:
event = self.q.get()
else:
event = Stopevent
else:
self.generate_list.remove(current_thread)
def close(self):
num = len(self.generate_list)
while num:
self.q.put(Stopevent)
num -= 1
def terminate(self):
self.terminate_flag = True
# 終止線程,不清空隊列
# max = len(self.generate_list)
# while max:
# self.q.put(Stopevent)
# max -= 1
# self.q.empty()
# 終止線程且清空隊列
while self.generate_list:
self.q.put(Stopevent)
self.q.empty()
@contextlib.contextmanager
def work_state(self,state_list,work_thread):
'''用於記錄線程池中正在等待的線程數'''
state_list.append(work_thread)
try:
yield
finally:
state_list.remove(work_thread)
def foo(i):
time.sleep(0.1)
print("當前i是>>>",i)
return i + 100
def call_back(status, ret):
print("ret:>>>", ret)
pool = ThreadPool(10)
for i in range(50):
'''
將任務放進隊列中:
創建線程:
只有空閑線程列表為空且創建的線程數量小於最大線程數時才會創建線程。
從隊列中取到任務,線程執行任務
'''
pool.run(func=foo, args=(i,), callback=call_back)
pool.close()
print("gene_list",len(pool.generate_list))
7. 實現多進程TCP伺服器
serSocket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
重新設置套接字選項,重覆使用綁定的信息
當有一個有相同本地地址和埠的socket1處於TIME_WAIT狀態時,而你啟動的程式的socket2要占用該地址和埠,你的程式就要用到SO_REUSEADDR選項。
from socket import *
from multiprocessing import *
# 處理客戶端的請求併為其服務
def dealwithClient(newSocket):
while True:
recvData = newSocket.recv(1024)
if len(recvData)==0:
break
else:
print(recvData)
newSocket.close()
def main():
serSock = socket(AF_INET,SOCK_STREAM)
# 設置套接字選項,使其可以重覆使用綁定的信息
serSock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
bindAddr= ('',9091)
serSock.bind(bindAddr)
serSock.listen(5)
try:
while True:
newSocket,clientAddr = serSock.accept()
p1 = Process(target= dealwithClient,args=(newSocket,))
p1.start()
# 因為已經向⼦進程中copy了⼀份(引⽤) ,
# 並且⽗進程中這個套接字也沒有用處了
# 所以關閉
newSocket.close()
finally:
serSock.close()
if __name__=='__main__':
main()
8. 實現多線程TCP伺服器
from socket import *
from threading import *
# 處理客戶端的請求併為其服務
def dealwithClient(newSocket):
while True:
recvData = newSocket.recv(1024)
if len(recvData)==0:
break
else:
print(recvData)
newSocket.close()
def main():
serSock = socket(AF_INET,SOCK_STREAM)
# 設置套接字選項,使其可以重覆使用綁定的信息
serSock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
bindAddr= ('',9091)
serSock.bind(bindAddr)
serSock.listen(5)
try:
while True:
newSocket,clientAddr = serSock.accept()
p1 = Thread(target= dealwithClient,args=(newSocket,))
p1.start()
# 因為線程中共用這個套接字, 如果關閉了會導致這個套接字不可⽤,
# 但是此時線上程中這個套接字可能還在收數據, 因此不能關閉
# newSocket.close()
finally:
serSock.close()
if __name__=='__main__':
main()
9. 協程greenlet和gevent
⽐線程更⼩的執⾏單元(微線程)
⼀個線程作為⼀個容器⾥⾯可以放置多個協程
只切換函數調用即可實現多任務,可以減少CPU的切換
協程⾃⼰主動讓出CPU
使用生成器,只切換函數調用即可完成多任務切換
import time
def A():
while True:
print(“----A---”)
yield
time.sleep(0.5)
def B(c):
while True:
print(“----B---”)
c.next()
time.sleep(0.5)
if __name__==‘__main__’:
a = A() # 如果一個函數中有yield,返回值就是一個生成器
B(a)
# python中的greenlet模塊對協程進行了封裝(底層相當於yield)
# 安裝模塊: pip3 install greenlet
from greenlet import greenlet
import time
def t1():
while True:
print("........A........")
gr2.switch()
time.sleep(1)
def t2():
while True:
print("........b........")
gr1.switch()#調到上次執行的地方繼續執行
time.sleep(1)
gr1 = greenlet(t1)#創建一個greenlet對象
gr2 = greenlet(t2)
gr1.switch()#此時會執行1函數
python還有⼀個⽐greenlet更強⼤的並且能夠⾃動切換任務的模塊 gevent.
原理是當⼀個greenlet遇到IO(指的是input output 輸⼊輸出)操作時, ⽐如訪問⽹絡, 就⾃動切換到其他的greenlet, 等到IO操作完成, 再在適當的時候切換回來繼續執⾏.
io密集型和cpu密集型:一些進程絕大多數時間在計算上,稱為計算密集型(CPU密集型),此時用多進程.
有一些進程則在input 和output上花費了大多時間,稱為I/O密集型。比如搜索引擎大多時間是在等待(耗時操作),相應這種就屬於I/O密集型。此時用多線程.
import gevent
def A():
while True:
print(".........A.........")
gevent.sleep(1)#用來模擬一個耗時操作
#gevent中:當一個協程遇到耗時操作會自動交出控制權給其他協程
def B():
while True:
print(".........B.........")
gevent.sleep(1)#每當遇到耗時操作,會自用轉到其他協程
g1 = gevent.spawn(A) # 創建一個gevent對象(創建了一個協程),此時就已經開始執行A
g2 = gevent.spawn(B)
g1.join() #等待協程執行結束
g2.join() #會等待協程運行結束後再退出