多任務進程與線程 一、多任務介紹 我們生活中有很多事情是同時進行的,比如開車的時候 手和腳共同來駕駛汽車,再比如唱歌跳舞也是同時進行的;用程式來模擬: from time import sleep def sing(): for i in range(3): print("正在唱歌...%d"% ...
多任務進程與線程
一、多任務介紹
我們生活中有很多事情是同時進行的,比如開車的時候 手和腳共同來駕駛汽車,再比如唱歌跳舞也是同時進行的;用程式來模擬:
from time import sleep
def sing():
for i in range(3):
print("正在唱歌...%d"%i)
sleep(1)
def dance():
for i in range(3):
print("正在跳舞...%d"%i)
sleep(1)
if __name__ == '__main__':
sing()
dance()
總結:
-
很顯然剛剛的程式並沒有完成唱歌和跳舞同時進行的要求,我們稱之為單進程
-
如果想要實現“唱歌跳舞”同時進行,那麼就需要一個新的方法,叫做:多任務
-
那什麼是多任務
簡單地說,就是操作系統可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽
MP3
,一邊在用Word趕作業,這就是多任務 現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多任務。由於CPU執行代碼都是順序執行的,那麼,單核CPU是怎麼執行多任務的呢?
答案就是操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。錶面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。
-
真正的並行執行多任務只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。所以以上只能算是併發。
-
併發與並行
- 併發:指的是任務數多餘CPU核數,通過操作系統的各種任務調度演算法,實現用多個任務“一起”執行(實際上總有一些任務不在執行,因為切換任務的速度相當快,看上去一起執行而已)
- 並行:指的是任務數小於等於CPU核數,即任務真的是一起執行的
二、進程
1> 基本概念
- 程式:例如
xxx.py
這是程式,是一個靜態的 - 進程:一個程式運行起來後,代碼+用到的資源 稱之為進程,它是操作系統分配資源的基本單元。
- 工作中,任務數往往大於CPU的核數,即一定有一些任務正在執行,而另外一些任務在等待CPU進行執行,因此導致了有了不同的狀態
- 就緒態:運行的條件都已經慢去,正在等在CPU執行
- 執行態:CPU正在執行其功能
- 等待態:等待某些條件滿足,例如一個程式sleep了,此時就處於等待態
2> 進程使用和特性
multiprocessing模塊就是跨平臺版本的多進程模塊,提供了一個Process類來代表一個進程對象,這個對象可以理解為是一個獨立的進程,可以執行另外的事情
-
進程的簡單實現
from multiprocessing import Process import time def func(): while True: print("【子進程】") time.sleep(3) if __name__ == '__main__': p = Process(target=func) p.start() while True: print("【主進程】") time.sleep(3)
總結:創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動
-
進程傳遞參數與進程對象和進程ID
import multiprocessing import os from time import sleep def func(name, age, **kwargs): print(f"【子進程】({multiprocessing.current_process()}) 的進程號為:{os.getpid()}") print(f"【子進程】 name={name}, age={age}, kwargs={kwargs}") for i in range(10): print(f"【子進程】 ---{i}---") sleep(0.2) if __name__ == '__main__': p = multiprocessing.Process(target=func, args=('test', 18), kwargs={"m": 20}) p.start() sleep(1) # 主進程會等待所有的子進程執行結束再結束 print("【主進程】--- 結束 ---")
-
使用多進程實現
UDP
通信同時收發數據import socket from multiprocessing import Process def recv_data(udp_socket): while True: recv_msg = udp_socket.recvfrom(1024) data = recv_msg[0].decode("gbk") source = recv_msg[1] print(f"{source}: {data}") def main(): udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_client.bind(("", 9000)) # 子進程接收數據 Process(target=recv_data, args=(udp_client,)).start() # 主進程發送數據 (子進程中無法使用input) while True: msg = input("請輸入發送的數據: ").encode("gbk") dest_addr = ("127.0.0.1", 8000) udp_client.sendto(msg, dest_addr) if __name__ == "__main__": main()
-
進程間不共用全局變數
from multiprocessing import Process import os import time nums = [11, 22] def work1(): print(f"【子進程work1】 pid={os.getpid()} ,nums={nums}") for i in range(3): nums.append(i) time.sleep(1) print(f"【子進程work1】 pid={os.getpid()} ,nums={nums}") print("【子進程work1】", id(nums), nums) def work2(): print(f"【子進程work2】 pid={os.getpid()} ,nums={nums}") print("【子進程work2】", id(nums), nums) print(f"當前進程為{os.getpid()}, 進程中nums的id為{id(nums)}") if __name__ == '__main__': p1 = Process(target=work1) p1.start() time.sleep(5) p2 = Process(target=work2) p2.start() print(f"【主進程】 pid={os.getpid()} ,nums={nums}")
-
進程間通信
在多進程編程中,不同的進程之間需要進行通信。multiprocessing模塊提供了多種進程間通信的方式,例如使用隊列、管道、共用記憶體等 進程通信
-
生產者消費者模型
生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞隊列就是用來給生產者和消費者解耦的。
-
隊列
-
multiprocessing.Queue()
和queue.Queue()
的區別queue.Queue
是進程內非阻塞隊列,multiprocess.Queue
是跨進程通信隊列。queue.Queue
是進程內的用的隊列,也就是多線程,multiprocessing.Queue
是跨進程通信隊列,也就是多進程
-
multiprocessing.Queue()
和queue.Queue()
隊列使用from multiprocessing import Queue # 初始化隊列;若括弧中沒有指定最大可接收的消息數量,或數量為負值,那麼就代表可接受的消息數量沒有上限(直到記憶體的盡頭) q=Queue() # 返回當前隊列包含的消息數量 q.qsize() # 如果隊列為空,返回True,反之False q.empty() # 如果隊列滿了,返回True,反之False; q.full() # 獲取隊列中的一條消息,然後將其從列隊中移除,block預設值為True # Queue.get([block[, timeout]]) q.get() # 相當Queue.get(False); q.get_nowait() # 將item消息寫入隊列,block預設值為True; # Queue.put(item,[block[, timeout]]) q.put() # 相當Queue.put(item, False); q.put_nowait(item)
-
queue.Queue()
其他功能# 與multiprocessing.Queue()基本一致 # 增加了隊列計數器來實現隊列的阻塞控制 from queue import Queue # 隊列阻塞,直到隊列中的【所有項目】都已經被獲取並【處理】才會解堵塞, # 如果線程里每從隊列里取一次,但沒有執行task_done(),則join無法判斷隊列到底有沒有結束 q.join() # 隊列解堵塞,指示以前已排隊的任務已完成,一般搭配.join使用 q.task_done()
總結:
隊列內部有一個計數器來實現隊列的阻塞控制; 當調用 q.join()會開啟隊列阻塞,直到計數器計數為0則解堵塞。 往隊列q.put一個數據計數器加一, 每執行一次q.task_done()隊列計數器減一(q.get不影響計數器計數值);q.size()=0或q.empty()=True,只能表示隊列中沒有任務了,不能保證任務已經執行完成 。
-
-
使用隊列和多進程實現生產者消費者模型
from multiprocessing import Process, Queue import time import random def producer(q): for value in ['A', 'B', 'C', 'D', 'E', 'F']: print(f'【Producer】 put {value} to queue...') q.put(value) time.sleep(random.random()) # 發送結束信號 q.put(None) def consumer(q): while True: if not q.empty(): data = q.get() # 接收到結束信號退出程式 if data is None: return print(f"【Consumer】 get {data} from queue") time.sleep(random.random()) q = Queue() if __name__ == "__main__": pw = Process(target=producer, args=(q,)) pr = Process(target=consumer, args=(q,)) pw.start() pr.start()
-
-
進程池Pool
當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建/銷毀進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務
from multiprocessing import Pool import time import random import os def work(msg): start_time = time.time() print("任務{msg} 開始執行,進程號 {os.getpid()}") time.sleep(random.random() * 2) end_time = time.time() print("任務{msg} 結束執行,運行時間{end_time - start_time}") if __name__ == "__main__": p = Pool(3) for i in range(10): p.apply_async(work, args=(i,)) # 觀察進程池任務什麼時候開始執行 time.sleep(1) print("------start--------") # 關閉Pool,使其不再接受新的任務; p.close() # 主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用; # p.join() print("--------end--------")
總結:
- 主進程不會主動等待進程池任務執行,如果主進程執行完畢,進程池任務立即結束
- 程池在定義的時候沒有指定最大進程數,系統會按當前運行電腦的CPU核心數決定進程池內運行的最大進程數,如電腦為雙核,則進程池內最大進程數為2
參數說明(
multiprocessing.Pool
):apply_async(func[, args[, kwds]])
:使用非阻塞方式調用func
(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args
為傳遞給func
的參數列表,kwds
為傳遞給func
的關鍵字參數列表;- close():關閉Pool,使其不再接受新的任務;
- terminate():不管任務是否完成,立即終止;
- join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;
-
進程池中的Queue
如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()
-
使用隊列和進程池實現生產者消費者模型
from multiprocessing import Manager, Pool import time import random import os def producer(q): print("producer啟動({os.getpid()}),父進程為({os.getppid()})") for value in ['A', 'B', 'C', 'D', 'E', 'F']: print(f'Put {value} to queue...') q.put(value) time.sleep(random.random()) # 發送結束信號 q.put(None) def consumer(q): print("consumer啟動({os.getpid()}),父進程為({os.getppid()})") while True: if not q.empty(): data = q.get() # 接收到結束信號退出程式 if data is None: return print(f"get {data} from queue") time.sleep(random.random()) if __name__ == "__main__": q = Manager().Queue() p = Pool() p.apply_async(producer, (q,)) p.apply_async(consumer, (q,)) p.close() p.join() print(f"主進程{os.getpid()} 結束")
-
三、線程
1> 基本概念
- 線程:線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程式計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共用進程所擁有的全部資源.
2> 線程使用和特性
-
線程的簡單實現與線程標識符
import threading from time import sleep, ctime def sing(): print(f"\n【子線程sing】({threading.current_thread()})的標識符為: {threading.get_ident()}") for i in range(3): print(f"正在唱歌...{i}") sleep(3) print(f"【子線程sing】--- 結束 ---") def dance(): print(f"\n【子線程dance】({threading.current_thread()})的標識符為: {threading.get_ident()}") for i in range(3): print(f"正在跳舞...{i}") sleep(3) print(f"【子線程dance】--- 結束 ---") if __name__ == '__main__': print(f"【主線程】---開始---:{ctime()}") t1 = threading.Thread(target=sing) t2 = threading.Thread(target=dance) t1.start() t2.start() print(f"【主線程】子線程sing的標識符: {t1.ident}") print(f"【主線程】子線程dance的標識符: {t2.ident}") length = len(threading.enumerate()) print(f'【主線程】當前運行的線程數為:{length}') print(f"【主線程】--- 結束 ---:{ctime()}")
總結:
- 多線程併發的操作比單線程效率更高
-
線程間共用全局變數
from threading import Thread import time nums = [11, 22] def work1(): for i in range(3): nums.append(i) print(f"【子線程work1】 g_num={nums}") def work2(): print(f"【子線程work2】 g_num={nums}") print(f"【主線程】 子線程創建之前g_num={nums}") t1 = Thread(target=work1) t1.start() # 延時一會,保證t1線程中的任務做完 time.sleep(1) t2 = Thread(target=work2) t2.start()
總結:
- 一個進程內的所有線程共用全局變數,很方便在多個線程間共用數據
- 缺點就是,線程是對全局變數隨意遂改可能造成多線程之間對全局變數的混亂
-
多線程資源競爭問題
import threading import time g_num = 0 def work1(num): global g_num for i in range(num): g_num += 1 print(f"【子線程work1】 g_num={g_num}") def work2(num): global g_num for i in range(num): g_num += 1 print(f"【子線程work2】 g_num={g_num}") print(f"【主線程】 子線程創建之前g_num={g_num}") t1 = threading.Thread(target=work1, args=(1000000,)) t2 = threading.Thread(target=work2, args=(1000000,)) t1.start() t2.start() while len(threading.enumerate()) != 1: time.sleep(1) print(f"【主線程】2個線程對同一個全局變數操作之後的最終結果是:{g_num}")
-
結果分析:
假設兩個線程t1和t2都要對全局變數g_num(預設是0)進行加1運算,t1和t2都各對`g_num`加10次,g_num的最終的結果應該為20。 但是由於是多線程同時操作,有可能出現下麵情況: 1> 在g_num=0時,t1取得g_num=0。此時系統把t1調度為”sleeping”狀態,把t2轉換為”running”狀態,t2也獲得g_num=0 2> 然後t2對得到的值進行加1並賦給g_num,使得g_num=1 3> 然後系統又把t2調度為”sleeping”,把t1轉為”running”。線程t1又把它之前得到的0加1後賦值給g_num。 4> 這樣導致雖然t1和t2都對g_num加1,但結果仍然是g_num=1
-
-
解決多線程資源競爭問題 - 互斥鎖
-
互斥鎖
解決多線程資源競爭問題,最簡單的機制就是引入互斥鎖,互斥鎖為資源引入一個狀態:鎖定/非鎖定
-
互斥鎖工作原理
某個線程要更改共用數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。
-
互斥鎖使用
# 創建鎖 mutex = threading.Lock() # 鎖定 mutex.acquire() # 釋放 mutex.release()
說明:
- 如果這個鎖之前是沒有上鎖的,那麼acquire不會堵塞
- 如果在調用acquire對這個鎖上鎖之前 它已經被 其他線程上了鎖,那麼此時acquire會堵塞,直到這個鎖被解鎖為止
-
為全局變數加入互斥鎖
import threading import time g_num = 0 def work1(num): global g_num for i in range(num): mutex.acquire() # 上鎖 g_num += 1 mutex.release() # 解鎖 print(f"【子線程work1】 g_num={g_num}") def work2(num): global g_num for i in range(num): mutex.acquire() # 上鎖 g_num += 1 mutex.release() # 解鎖 print(f"【子線程work2】 g_num={g_num}") # 創建一個互斥鎖 # 預設是未上鎖的狀態 mutex = threading.Lock() # 創建2個線程,讓他們各自對g_num加1000000次 p1 = threading.Thread(target=work1, args=(1000000,)) p2 = threading.Thread(target=work2, args=(1000000,)) p1.start() p2.start() # 等待計算完成 while len(threading.enumerate()) != 1: time.sleep(1) print("【主線程】 2個線程對同一個全局變數操作之後的最終結果是:%s" % g_num)
總結:
- 鎖的好處:確保了某段尖鍵代碼只能由一個線程從頭到尾完整地執行
- 鎖的壞處:
- 阻止了多線程併發執行﹐包含鎖的某段代碼實際上只能以單線程模式執行﹐效率就大大地下降了
- 由於可以存在多個鎖﹐不同的線程持有不同的鎖﹐並試圖獲取對方持有的鎖時﹐可能會造成死鎖
- 死鎖: 線上程間共用多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖。
- 避免死鎖:
- 程式設計時要儘量避免(銀行家演算法)
- 添加超時時間等
-
-
線程間通信
-
使用隊列和多線程實現生產者消費者模型
from queue import Queue import random import time import threading class Producer(threading.Thread): def run(self): for value in ['A', 'B', 'C', 'D', 'E', 'F']: print(f'【Producer】 put {value} to queue...') q.put(value) # 設置隊列堵塞,只有消費者完成任務並執行task_done 計算器計數為0才會解堵塞 # q.join() time.sleep(random.random()) # 發送結束信號 q.put(None) print("生產者任務結束!") class Consumer(threading.Thread): def run(self): while True: if not q.empty(): data = q.get() # 任務完成 隊列計數器減一 # q.task_done() # 接收到結束信號退出程式 if data is None: return print(f"【Consumer】 get {data} from queue") time.sleep(random.random()) q = Queue() Producer().start() Consumer().start()
-
-
線程池
ThreadPool
線程池是一個線程管理技術,創建一個或者多個線程進行管理,避免線程的創建和銷毀帶來的開銷線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度.
-
線程池的優點
- 降低資源消耗;通過重覆利用已創建的線程降低創建和銷毀造成的消耗。
- 提高響應速度,不必等待線程的創建,正常情況下(沒有任務進入隊列的情況)不需要等待。
- 線程管理,線程統一由線程池管理,隨取隨用。
-
自定義線程池實現
from threading import Thread from queue import Queue import time import random import threading # 自定義線程池 class MyThreadPool: def __init__(self, thread_num): self.thread_num = thread_num self.task_queue = Queue() # 初始化的時候啟動線程池 self.__start() # 依次創建並啟動線程 def __start(self): for _ in range(self.thread_num): # 每個線程的執行相同的方法,在方法中讀取隊列的任務 # 這裡daemon=True 表示當主線程運行結束時不對這個子線程進行檢查而直接退出(實現主線程退出時,關閉線程池中子線程的任務執行) # 參考daemon在多線程中的作用: https://blog.51cto.com/u_9653244/6450770 Thread(target=self._target, daemon=True).start() # 為線程分配任務 def _target(self): while True: target, args, kwargs = self.task_queue.get() target(*args, **kwargs) # 隊列計數器減一 self.task_queue.task_done() # 任務隊列執行完成之前一直堵塞 def join(self): self.task_queue.join() # 往隊列中添加任務 def submit_task(self, target, args=(), kwargs=None): if kwargs is None: kwargs = {} self.task_queue.put((target, args, kwargs)) def work(name, no, **kwargs): thread_id = threading.get_ident() print(f"【子線程{thread_id}-開始】({threading.current_thread()}) ") print(f"【子線程{thread_id}-data】 name={name}, no={no}, kwargs={kwargs}") time.sleep(random.randint(1, 10)) print(f"【子線程{thread_id}-結束】") t_pool = MyThreadPool(3) for i in range(5): t_pool.submit_task(work, (f"任務{i}", i), {"test": 1}) t_pool.join()
-
內置線程池模塊
ThreadPool
-
模塊說明
from multiprocessing.pool import ThreadPool # multiprocessing.dummy.Pool為一個函數,本質是使用ThreadPool創建線程池 from multiprocessing.dummy import Pool as TPool print(ThreadPool, TPool) print(ThreadPool().__class__, TPool().__class__) print(ThreadPool().__class__ is TPool().__class__)
-
使用內置線程池
ThreadPool
模塊實現線程復用from multiprocessing.pool import ThreadPool import time import random import threading def work(name, no, **kwargs): thread_id = threading.get_ident() print(f"【子線程{thread_id}-開始】({threading.current_thread()}) ") print(f"【子線程{thread_id}-data】 name={name}, no={no}, kwargs={kwargs}") time.sleep(random.randint(1, 10)) print(f"【子線程{thread_id}-結束】") t_pool = ThreadPool(3) for i in range(5): t_pool.apply_async(work, (f"任務{i}", i), {"test": 1}) # 關閉ThreadPool,使其不再接受新的任務; t_pool.close() t_pool.join()
-
-
四、進程線程的等待/終止 和 守護模式
-
進程的等待和終止
from multiprocessing import Process import time def func(): while True: print("【子進程】") time.sleep(3) if __name__ == '__main__': print("【主進程】") p = Process(target=func) p.start() # 主進程等待子進程完成 # p.join() # 立即結束子進程,不推薦使用,會導致子進程的資源無法被釋放 # p.terminate() print("【主進程】 結束")
-
線程的等待和終止
-
線程的等待
from threading import Thread import time def func(): while True: print("【子線程】") time.sleep(3) if __name__ == '__main__': print("【主線程】") t = Thread(target=func) t.start() # 主線程等待子線程完成再繼續往後執行 # t.join() print("【主線程】 結束")
總結:
- 主線程代碼執行完成以後預設等待子線程;所有子線程結束以後程式才會結束
t.join()
主線程在某個位置等待子線程執行完成, 再繼續執行
-
線程的終止
-
一. 使用
t1.stop()
方法強行終止線程跟進程的terminate一樣,不推薦使用;目前該方法已被棄用,會導致被終止的線程所擁有的資源如打開的文件、資料庫事務等不能被正確釋放;造成數據泄漏或死鎖,除非可以肯定數據安全,否則不建議強行殺死線程
-
二. 通過拋出異常來終止線程
比較複雜一般不用
-
三.通過一個終止標誌來終止線程
-
實現方式1:
# 方式一: 使用全局變數stop_threads控制線程終止; # 這裡子線程直接訪問主線程中的全局變數會導致數據混亂,不推薦 from threading import Thread import time def func(): while True: print("【子線程】") time.sleep(3) if stop_threads: return if __name__ == '__main__': print("【主線程】") stop_threads = False t = Thread(target=func) t.start() time.sleep(10) stop_threads = True print("【主線程】 結束")
-
實現方式2:
# 方式二:通過函數間接訪問局部變數stop_threads, 推薦使用 # 好處:通過在子線程中指定位置設置檢測點,可以在主線程中任何時候終止子線程,並且子線程所擁有的資源也能被正確釋放 from threading import Thread import time def func(get_stop_flag): while True: print("【子線程】") time.sleep(3) if get_stop_flag(): return if __name__ == '__main__': print("【主線程】") stop_threads = False t = Thread(target=func, args=(lambda: stop_threads,)) t.start() time.sleep(10) stop_threads = True print("【主線程】 結束")
-
-
四. 將子線程設置為守護線程
不能指定子線程終止時間, 接下來我們就說下守護模式
-
-
-
守護模式
-
守護進程概念
隨著主進程代碼執行結束,守護進程結束, 可以理解為子進程開啟守護進程以後,主進程為子進程的運行保駕護航;當主進程結束,沒有守護以後,子進程立刻就會結束;
-
守護進程代碼示例
from multiprocessing import Process import time def func(): while True: print("【子進程】") time.sleep(3) if __name__ == '__main__': print("【主進程】") # 設置p為守護進程, 創建時設置 # p = Process(target=func, daemon=True) p = Process(target=func) print(f"【主進程-創建進程後】子進程是否正在運行: {p.is_alive()}") # 設置p為守護進程, 啟動進程前設置(預設p.daemon = False) p.daemon = True p.start() print(f"【主進程-啟動進程後】子進程是否正在運行: {p.is_alive()}") print("【主進程】 結束")
-
守護線程概念
在主線程代碼執行結束後,等待其它非守護子線程執行結束,守護線程立即結束;即主線程只會等待非守護線程結束,不會等待守護線程執行完畢,只要主線程代碼執行結束,守護線程就會結束。
-
守護線程代碼示例
from threading import Thread import time def func(): while True: print("【子線程】") time.sleep(3) if __name__ == '__main__': print("【主線程】") # 設置t為守護線程, 創建時設置 # t = Thread(target=func, daemon=True) t = Thread(target=func) print(f"【主線程-創建線程後】子線程是否正在運行: {t.is_alive()}") # 設置t為守護線程, 啟動線程前設置(預設t.daemon = False) t.daemon = True # 等效於 t.daemon = True # t.setDaemon(True) t.start() print(f"【主線程-創建線程後】子線程是否正在運行: {t.is_alive()}") print("【主線程】 結束")
-
五、多進程多線程隊列綜合演練
-
進程池線程池高性能併發通信
# 【本機環境運行】 # 導入進程池 from multiprocessing import Pool, cpu_count # 導入線程池 from multiprocessing.pool import ThreadPool from socket import * from queue import Queue import os # 從隊列讀取數據並返回給客戶端 def send_data(client, addr, q): # 子進程中無法使用input,而且子進程錯誤不會展示 print(f"【send_data】準備向客戶{addr}發送數據...") while True: msg = q.get() if not msg: print(f"【send_data】收到關閉通知, 發送功能關閉!") return client.send(f"您的消息 【{msg}】 已收到, over !".encode("gbk")) # 收到客戶發來的數據存儲到隊列 def recv_data(client, addr, q): print(f"【recv_data】準備接收客戶{addr}的數據...") while True: data = client.recv(1024).decode('gbk') q.put(data) # 客戶端調用close; data為 '' (網路調試助手需要關閉通訊視窗才會調用close) if not data: # 往隊列寫入None,通知發送消息的子線程關閉,並關閉服務套接字 q.put('') client.close() print(f"【recv_data】客戶{addr}關閉連接, 接收功能關閉!") return print(f"【recv_data】 {addr} 發來消息 : {data}\n") # 進程負責處理連接請求(一個進程跟進一個客戶) def process_connect(client, addr): print(f"由進程 {os.getpid()} 為新客戶 {addr} 服務!") # 線程負責處理數據請求(一個線程處理客戶的一個需求) t_pool = ThreadPool(2) # 創建一個隊列,為接收和發送之間傳遞消息 q = Queue() t_pool.apply_async(send_data, (client, addr, q)) t_pool.apply_async(recv_data, (client, addr, q)) def main(): # 創建tcp監聽套接字 tcp_server_socket = socket(AF_INET, SOCK_STREAM) tcp_server_socket.bind(("127.0.0.1", 9000)) tcp_server_socket.listen(128) # 進程池負責接收連接請求(進程池數與cpu處理器數量一致) pool = Pool(cpu_count()) while True: # 等待連接請求,獲取服務套接字 client_socket, client_addr = tcp_server_socket.accept() pool.apply_async(process_connect, (client_socket, client_addr)) if __name__ == '__main__': main()
六、GIL全局解釋器鎖
基本概念
GIL 是python的全局解釋器鎖,同一進程中假如有多個線程運行,一個線程在運行python程式的時候會霸占python解釋器(加了一把鎖即GIL),使該進程內的其他線程無法運行,等該線程運行完後其他線程才能運行。如果線程運行過程中遇到耗時操作,則解釋器鎖解開,使其他線程運行。所以在多線程中,線程的運行仍是有先後順序的,並不是同時進行。
我們可以把GIL看作是“通行證”,並且在一個python進程中,GIL只有一個。拿不到通行證的線程,就不允許進入CPU執行。GIL只在cpython中才有,即同一個進程下的多個線程無法利用多核優勢。
-
互斥鎖和GIL全局解釋器鎖的區別
-
互斥鎖就是對共用數據進行鎖定,保證同一時刻只有一個線程操作數據,是數據級別的鎖。
-
GIL鎖是解釋器級別的鎖,保證同一時刻下同一個進程中只有一個線程拿到GIL鎖,擁有執行
許可權。
-
-
關於GIL全局解釋器鎖的說明
- Python語言和GIL沒有半毛錢關係。僅僅是由於歷史原因在
Cpython
虛擬機(解釋器),難以移除GIL。 更換其他解釋器就不會存在GIL - Python使用多進程是可以利用多核的CPU資源。
- 為什麼不刪除GIL-Guido的聲明
- Python語言和GIL沒有半毛錢關係。僅僅是由於歷史原因在
七、進程線程對比
-
多進程和多線程的關係 進程與線程的一個簡單解釋
-
定義不同
- 進程是系統進行資源分配和調度的一個獨立單位.
- 線程CPU調度和分派的基本單位
-
功能對比
- 進程,能夠完成多任務,比如 在一臺電腦上能夠同時運行多個QQ
- 線程,能夠完成多任務,比如 一個QQ中的多個聊天視窗
-
區別
- 一個程式至少有一個進程,一個進程至少有一個線程.
- 線程的劃分尺度小於進程(資源比進程少),使得多線程程式的併發性高。
- 進程在執行過程中擁有獨立的記憶體單元,而多個線程共用記憶體,從而極大地提高了程式的運行效率
- 線線程不能夠獨立執行,必須依存在進程中
- 可以將進程理解為工廠中的一條流水線,而其中的線程就是這個流水線上的工人
-
優缺點
- 線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。
-
進程線程選擇
- 計算密集型: 多進程;
- IO密集型: 多線程、協程
- 計算密集型: 多進程;
-
其他問題
-
IO密集型中多線程與協程的執行速度
IO密集型執行時間主要在IO讀寫,python中由於GIL鎖的原因,多線程其實還是使用的單核在進行cpu計算,如果計算任務加鎖了,cpu時間片調度機制會在一個cpu時間片(python預設是處理完1000個位元組碼)結束後,去釋放GIL鎖,並查看其他線程是否可以執行,由於任務被加鎖,會在第二個cpu時間片繼續把時間片分給第一個線程,這會讓cpu調度時間白白浪費,反而導致多線程比協程(遇到耗時操作自動切換任務)耗時更久
-
計算密集型中多線程與單線程的執行速度
計算量小的情況下單線程快,因為多線程切換需要時間 計算量大的情況下多線程快,多線程會獲得更多的CPU執行時間
-