多任務之進程與線程

来源:https://www.cnblogs.com/cs-songbai/p/18095578
-Advertisement-
Play Games

多任務進程與線程 一、多任務介紹 ​ 我們生活中有很多事情是同時進行的,比如開車的時候 手和腳共同來駕駛汽車,再比如唱歌跳舞也是同時進行的;用程式來模擬: from time import sleep def sing(): for i in range(3): print("正在唱歌...%d"% ...


多任務進程與線程

一、多任務介紹

​ 我們生活中有很多事情是同時進行的,比如開車的時候 手和腳共同來駕駛汽車,再比如唱歌跳舞也是同時進行的;用程式來模擬:

from time import sleep

def sing():
    for i in range(3):
        print("正在唱歌...%d"%i)
        sleep(1)

def dance():
    for i in range(3):
        print("正在跳舞...%d"%i)
        sleep(1)

if __name__ == '__main__':
    sing() 
    dance() 

總結

  • 很顯然剛剛的程式並沒有完成唱歌和跳舞同時進行的要求,我們稱之為單進程

  • 如果想要實現“唱歌跳舞”同時進行,那麼就需要一個新的方法,叫做:多任務

  • 那什麼是多任務

    ​ 簡單地說,就是操作系統可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕作業,這就是多任務

    ​ 現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多任務。由於CPU執行代碼都是順序執行的,那麼,單核CPU是怎麼執行多任務的呢?

    ​ 答案就是操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。錶面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。

    多任務

  • 真正的並行執行多任務只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。所以以上只能算是併發。

  • 併發與並行

    • 併發:指的是任務數多餘CPU核數,通過操作系統的各種任務調度演算法,實現用多個任務“一起”執行(實際上總有一些任務不在執行,因為切換任務的速度相當快,看上去一起執行而已)
    • 並行:指的是任務數小於等於CPU核數,即任務真的是一起執行的

二、進程

1> 基本概念

  • 程式:例如xxx.py這是程式,是一個靜態的
  • 進程:一個程式運行起來後,代碼+用到的資源 稱之為進程,它是操作系統分配資源的基本單元
  • 工作中,任務數往往大於CPU的核數,即一定有一些任務正在執行,而另外一些任務在等待CPU進行執行,因此導致了有了不同的狀態
    • 就緒態:運行的條件都已經慢去,正在等在CPU執行
    • 執行態:CPU正在執行其功能
    • 等待態:等待某些條件滿足,例如一個程式sleep了,此時就處於等待態

2> 進程使用和特性

multiprocessing模塊就是跨平臺版本的多進程模塊,提供了一個Process類來代表一個進程對象,這個對象可以理解為是一個獨立的進程,可以執行另外的事情

  • 進程的簡單實現

    from multiprocessing import Process
    import time
    
    
    def func():
        while True:
            print("【子進程】")
            time.sleep(3)
    
    
    if __name__ == '__main__':
        p = Process(target=func)
        p.start()
        while True:
            print("【主進程】")
            time.sleep(3)
    

    總結:創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動

  • 進程傳遞參數與進程對象和進程ID

    import multiprocessing
    import os
    from time import sleep
    
    
    def func(name, age, **kwargs):
        print(f"【子進程】({multiprocessing.current_process()}) 的進程號為:{os.getpid()}")
        print(f"【子進程】 name={name}, age={age}, kwargs={kwargs}")
        for i in range(10):
            print(f"【子進程】 ---{i}---")
            sleep(0.2)
    
    
    if __name__ == '__main__':
        p = multiprocessing.Process(target=func, args=('test', 18), kwargs={"m": 20})
        p.start()
        sleep(1)
        # 主進程會等待所有的子進程執行結束再結束
        print("【主進程】--- 結束 ---")
    
  • 使用多進程實現UDP通信同時收發數據

    import socket
    from multiprocessing import Process
    
    
    def recv_data(udp_socket):
        while True:
            recv_msg = udp_socket.recvfrom(1024)
            data = recv_msg[0].decode("gbk")
            source = recv_msg[1]
            print(f"{source}: {data}")
    
    
    def main():
        udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udp_client.bind(("", 9000))
    
        # 子進程接收數據
        Process(target=recv_data, args=(udp_client,)).start()
        # 主進程發送數據 (子進程中無法使用input)
        while True:
            msg = input("請輸入發送的數據: ").encode("gbk")
            dest_addr = ("127.0.0.1", 8000)
            udp_client.sendto(msg, dest_addr)
    
    
    if __name__ == "__main__":
        main()
    
    
  • 進程間不共用全局變數

    from multiprocessing import Process
    import os
    import time
    
    nums = [11, 22]
    
    
    def work1():
        print(f"【子進程work1】 pid={os.getpid()} ,nums={nums}")
        for i in range(3):
            nums.append(i)
            time.sleep(1)
            print(f"【子進程work1】 pid={os.getpid()} ,nums={nums}")
         print("【子進程work1】", id(nums), nums)
    
    
    def work2():
        print(f"【子進程work2】 pid={os.getpid()} ,nums={nums}")
        print("【子進程work2】", id(nums), nums)
    
    
    print(f"當前進程為{os.getpid()}, 進程中nums的id為{id(nums)}")
    if __name__ == '__main__':
        p1 = Process(target=work1)
        p1.start()
        time.sleep(5)
    
        p2 = Process(target=work2)
        p2.start()
        print(f"【主進程】 pid={os.getpid()} ,nums={nums}")
    
    
  • 進程間通信

    在多進程編程中,不同的進程之間需要進行通信。multiprocessing模塊提供了多種進程間通信的方式,例如使用隊列、管道、共用記憶體等 進程通信

    • 生產者消費者模型

      生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞隊列就是用來給生產者和消費者解耦的。

      生產者消費者模型

    • 隊列

      • multiprocessing.Queue()queue.Queue()的區別

        • queue.Queue是進程內非阻塞隊列,multiprocess.Queue是跨進程通信隊列。
        • queue.Queue是進程內的用的隊列,也就是多線程,multiprocessing.Queue是跨進程通信隊列,也就是多進程
      • multiprocessing.Queue()queue.Queue()隊列使用

         from multiprocessing import Queue
        # 初始化隊列;若括弧中沒有指定最大可接收的消息數量,或數量為負值,那麼就代表可接受的消息數量沒有上限(直到記憶體的盡頭)
        q=Queue()
        
        # 返回當前隊列包含的消息數量
        q.qsize()
        
        # 如果隊列為空,返回True,反之False
        q.empty()
        
        # 如果隊列滿了,返回True,反之False;
        q.full()
        
        # 獲取隊列中的一條消息,然後將其從列隊中移除,block預設值為True
        # Queue.get([block[, timeout]])
        q.get()
        # 相當Queue.get(False);
        q.get_nowait()
        
        # 將item消息寫入隊列,block預設值為True;
        # Queue.put(item,[block[, timeout]])
        q.put()
        # 相當Queue.put(item, False);
        q.put_nowait(item)
        
      • queue.Queue() 其他功能

        # 與multiprocessing.Queue()基本一致
        # 增加了隊列計數器來實現隊列的阻塞控制
        from queue import Queue
        
        # 隊列阻塞,直到隊列中的【所有項目】都已經被獲取並【處理】才會解堵塞,
        # 如果線程里每從隊列里取一次,但沒有執行task_done(),則join無法判斷隊列到底有沒有結束
        q.join()
        
        # 隊列解堵塞,指示以前已排隊的任務已完成,一般搭配.join使用
        q.task_done()
        

        總結:

        隊列內部有一個計數器來實現隊列的阻塞控制; 當調用 q.join()會開啟隊列阻塞,直到計數器計數為0則解堵塞。 往隊列q.put一個數據計數器加一, 每執行一次q.task_done()隊列計數器減一(q.get不影響計數器計數值);q.size()=0或q.empty()=True,只能表示隊列中沒有任務了,不能保證任務已經執行完成 。
        
    • 使用隊列和多進程實現生產者消費者模型

      from multiprocessing import Process, Queue
      import time
      import random
      
      
      def producer(q):
          for value in ['A', 'B', 'C', 'D', 'E', 'F']:
              print(f'【Producer】 put {value} to queue...')
              q.put(value)
              time.sleep(random.random())
          # 發送結束信號
          q.put(None)
      
      
      def consumer(q):
          while True:
              if not q.empty():
                  data = q.get()
                  # 接收到結束信號退出程式
                  if data is None:
                      return
                  print(f"【Consumer】 get {data} from queue")
                  time.sleep(random.random())
      
      
      q = Queue()
      if __name__ == "__main__":
          pw = Process(target=producer, args=(q,))
          pr = Process(target=consumer, args=(q,))
      
          pw.start()
          pr.start()
      
  • 進程池Pool

    ​ 當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建/銷毀進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法

    	初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務
    
    from multiprocessing import Pool
    import time
    import random
    import os
    
    
    def work(msg):
        start_time = time.time()
        print("任務{msg} 開始執行,進程號 {os.getpid()}")
        time.sleep(random.random() * 2)
        end_time = time.time()
        print("任務{msg} 結束執行,運行時間{end_time - start_time}")
    
    
    if __name__ == "__main__":
        p = Pool(3)
        for i in range(10):
            p.apply_async(work, args=(i,))
        
        # 觀察進程池任務什麼時候開始執行
        time.sleep(1)
        print("------start--------")
        # 關閉Pool,使其不再接受新的任務;
        p.close()
        # 主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;
        # p.join()
        print("--------end--------")
    

    總結:

    • 主進程不會主動等待進程池任務執行,如果主進程執行完畢,進程池任務立即結束
    • 程池在定義的時候沒有指定最大進程數,系統會按當前運行電腦的CPU核心數決定進程池內運行的最大進程數,如電腦為雙核,則進程池內最大進程數為2

    參數說明(multiprocessing.Pool):

    • apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表;
    • close():關閉Pool,使其不再接受新的任務;
    • terminate():不管任務是否完成,立即終止;
    • join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;
  • 進程池中的Queue

    	如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()
    
    • 使用隊列和進程池實現生產者消費者模型

      from multiprocessing import Manager, Pool
      import time
      import random
      import os
      
      
      def producer(q):
          print("producer啟動({os.getpid()}),父進程為({os.getppid()})")
          for value in ['A', 'B', 'C', 'D', 'E', 'F']:
              print(f'Put {value} to queue...')
              q.put(value)
              time.sleep(random.random())
          # 發送結束信號
          q.put(None)
      
      
      def consumer(q):
          print("consumer啟動({os.getpid()}),父進程為({os.getppid()})")
          while True:
              if not q.empty():
                  data = q.get()
                  # 接收到結束信號退出程式
                  if data is None:
                      return
                  print(f"get {data} from queue")
                  time.sleep(random.random())
      
      
      if __name__ == "__main__":
          q = Manager().Queue()
          p = Pool()
          p.apply_async(producer, (q,))
          p.apply_async(consumer, (q,))
      
          p.close()
          p.join()
          print(f"主進程{os.getpid()} 結束")
      

三、線程

1> 基本概念

  • 線程:線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程式計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共用進程所擁有的全部資源.

2> 線程使用和特性

  • 線程的簡單實現與線程標識符

    import threading
    from time import sleep, ctime
    
    
    def sing():
        print(f"\n【子線程sing】({threading.current_thread()})的標識符為: {threading.get_ident()}")
        for i in range(3):
            print(f"正在唱歌...{i}")
            sleep(3)
        print(f"【子線程sing】--- 結束 ---")
    
    
    def dance():
        print(f"\n【子線程dance】({threading.current_thread()})的標識符為: {threading.get_ident()}")
        for i in range(3):
            print(f"正在跳舞...{i}")
            sleep(3)
        print(f"【子線程dance】--- 結束 ---")
    
    
    if __name__ == '__main__':
        print(f"【主線程】---開始---:{ctime()}")
    
        t1 = threading.Thread(target=sing)
        t2 = threading.Thread(target=dance)
    
        t1.start()
        t2.start()
        print(f"【主線程】子線程sing的標識符: {t1.ident}")
        print(f"【主線程】子線程dance的標識符: {t2.ident}")
    
        length = len(threading.enumerate())
        print(f'【主線程】當前運行的線程數為:{length}')
        
        print(f"【主線程】--- 結束 ---:{ctime()}")
    

    總結:

    • 多線程併發的操作比單線程效率更高

  • 線程間共用全局變數

    from threading import Thread
    import time
    
    nums = [11, 22]
    
    
    def work1():
        for i in range(3):
            nums.append(i)
    
        print(f"【子線程work1】 g_num={nums}")
    
    
    def work2():
        print(f"【子線程work2】 g_num={nums}")
    
    
    print(f"【主線程】 子線程創建之前g_num={nums}")
    t1 = Thread(target=work1)
    t1.start()
    
    # 延時一會,保證t1線程中的任務做完
    time.sleep(1)
    
    t2 = Thread(target=work2)
    t2.start()
    

    總結:

    • 一個進程內的所有線程共用全局變數,很方便在多個線程間共用數據
    • 缺點就是,線程是對全局變數隨意遂改可能造成多線程之間對全局變數的混亂
  • 多線程資源競爭問題

    import threading
    import time
    
    g_num = 0
    
    
    def work1(num):
        global g_num
        for i in range(num):
            g_num += 1
        print(f"【子線程work1】 g_num={g_num}")
    
    
    def work2(num):
        global g_num
        for i in range(num):
            g_num += 1
        print(f"【子線程work2】 g_num={g_num}")
    
    
    print(f"【主線程】 子線程創建之前g_num={g_num}")
    
    t1 = threading.Thread(target=work1, args=(1000000,))
    t2 = threading.Thread(target=work2, args=(1000000,))
    t1.start()
    t2.start()
    
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    
    print(f"【主線程】2個線程對同一個全局變數操作之後的最終結果是:{g_num}")
    
    
    • 結果分析:

      	假設兩個線程t1和t2都要對全局變數g_num(預設是0)進行加1運算,t1和t2都各對`g_num`加10次,g_num的最終的結果應該為20。 但是由於是多線程同時操作,有可能出現下麵情況:
      
      1> 在g_num=0時,t1取得g_num=0。此時系統把t1調度為”sleeping”狀態,把t2轉換為”running”狀態,t2也獲得g_num=0
      2> 然後t2對得到的值進行加1並賦給g_num,使得g_num=1
      3> 然後系統又把t2調度為”sleeping”,把t1轉為”running”。線程t1又把它之前得到的0加1後賦值給g_num。
      4> 這樣導致雖然t1和t2都對g_num加1,但結果仍然是g_num=1
      
  • 解決多線程資源競爭問題 - 互斥鎖

    • 互斥鎖

      解決多線程資源競爭問題,最簡單的機制就是引入互斥鎖,互斥鎖為資源引入一個狀態:鎖定/非鎖定

    • 互斥鎖工作原理

      某個線程要更改共用數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。

    • 互斥鎖使用

      # 創建鎖
      mutex = threading.Lock()
      
      # 鎖定
      mutex.acquire()
      
      # 釋放
      mutex.release()
      

      說明:

      • 如果這個鎖之前是沒有上鎖的,那麼acquire不會堵塞
      • 如果在調用acquire對這個鎖上鎖之前 它已經被 其他線程上了鎖,那麼此時acquire會堵塞,直到這個鎖被解鎖為止
    • 為全局變數加入互斥鎖

      import threading
      import time
      
      g_num = 0
      
      
      def work1(num):
          global g_num
          for i in range(num):
              mutex.acquire()  # 上鎖
              g_num += 1
              mutex.release()  # 解鎖
      
          print(f"【子線程work1】 g_num={g_num}")
      
      
      def work2(num):
          global g_num
          for i in range(num):
              mutex.acquire()  # 上鎖
              g_num += 1
              mutex.release()  # 解鎖
      
          print(f"【子線程work2】 g_num={g_num}")
      
      
      # 創建一個互斥鎖
      # 預設是未上鎖的狀態
      mutex = threading.Lock()
      
      # 創建2個線程,讓他們各自對g_num加1000000次
      p1 = threading.Thread(target=work1, args=(1000000,))
      p2 = threading.Thread(target=work2, args=(1000000,))
      p1.start()
      p2.start()
      
      # 等待計算完成
      while len(threading.enumerate()) != 1:
          time.sleep(1)
      
      print("【主線程】 2個線程對同一個全局變數操作之後的最終結果是:%s" % g_num)
      

      總結:

      • 鎖的好處:確保了某段尖鍵代碼只能由一個線程從頭到尾完整地執行
      • 鎖的壞處
        • 阻止了多線程併發執行﹐包含鎖的某段代碼實際上只能以單線程模式執行﹐效率就大大地下降了
        • 由於可以存在多個鎖﹐不同的線程持有不同的鎖﹐並試圖獲取對方持有的鎖時﹐可能會造成死鎖
      • 死鎖: 線上程間共用多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖。
      • 避免死鎖:
        • 程式設計時要儘量避免(銀行家演算法)
        • 添加超時時間等
  • 線程間通信

    • 使用隊列和多線程實現生產者消費者模型

      from queue import Queue
      import random
      import time
      import threading
      
      
      class Producer(threading.Thread):
      
          def run(self):
              for value in ['A', 'B', 'C', 'D', 'E', 'F']:
                  print(f'【Producer】 put {value} to queue...')
                  q.put(value)
                  # 設置隊列堵塞,只有消費者完成任務並執行task_done 計算器計數為0才會解堵塞
                  # q.join()
                  time.sleep(random.random())
                  # 發送結束信號
              q.put(None)
              print("生產者任務結束!")
      
      
      class Consumer(threading.Thread):
      
          def run(self):
              while True:
                  if not q.empty():
                      data = q.get()
                      # 任務完成 隊列計數器減一
                      # q.task_done()
                      # 接收到結束信號退出程式
                      if data is None:
                          return
                      print(f"【Consumer】 get {data} from queue")
                      time.sleep(random.random())
      
      
      q = Queue()
      Producer().start()
      Consumer().start()
      
  • 線程池ThreadPool

    ​ 線程池是一個線程管理技術,創建一個或者多個線程進行管理,避免線程的創建和銷毀帶來的開銷線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度.

    • 線程池的優點

      • 降低資源消耗;通過重覆利用已創建的線程降低創建和銷毀造成的消耗。
      • 提高響應速度,不必等待線程的創建,正常情況下(沒有任務進入隊列的情況)不需要等待。
      • 線程管理,線程統一由線程池管理,隨取隨用。
    • 自定義線程池實現

      from threading import Thread
      from queue import Queue
      import time
      import random
      import threading
      
      
      # 自定義線程池
      class MyThreadPool:
      
          def __init__(self, thread_num):
              self.thread_num = thread_num
              self.task_queue = Queue()
      
              # 初始化的時候啟動線程池
              self.__start()
      
          # 依次創建並啟動線程
          def __start(self):
              for _ in range(self.thread_num):
                  # 每個線程的執行相同的方法,在方法中讀取隊列的任務
                  # 這裡daemon=True 表示當主線程運行結束時不對這個子線程進行檢查而直接退出(實現主線程退出時,關閉線程池中子線程的任務執行)
                  # 參考daemon在多線程中的作用: https://blog.51cto.com/u_9653244/6450770
                  Thread(target=self._target, daemon=True).start()
      
          # 為線程分配任務
          def _target(self):
              while True:
                  target, args, kwargs = self.task_queue.get()
                  target(*args, **kwargs)
                  # 隊列計數器減一
                  self.task_queue.task_done()
      
          # 任務隊列執行完成之前一直堵塞
          def join(self):
              self.task_queue.join()
      
          # 往隊列中添加任務
          def submit_task(self, target, args=(), kwargs=None):
              if kwargs is None:
                  kwargs = {}
              self.task_queue.put((target, args, kwargs))
      
      
      def work(name, no, **kwargs):
          thread_id = threading.get_ident()
          print(f"【子線程{thread_id}-開始】({threading.current_thread()}) ")
          print(f"【子線程{thread_id}-data】 name={name}, no={no}, kwargs={kwargs}")
          time.sleep(random.randint(1, 10))
          print(f"【子線程{thread_id}-結束】")
      
      
      t_pool = MyThreadPool(3)
      for i in range(5):
          t_pool.submit_task(work, (f"任務{i}", i), {"test": 1})
      
      t_pool.join()
      
    • 內置線程池模塊 ThreadPool

      • 模塊說明

        from multiprocessing.pool import ThreadPool
        # multiprocessing.dummy.Pool為一個函數,本質是使用ThreadPool創建線程池
        from multiprocessing.dummy import Pool as TPool
        
        print(ThreadPool, TPool)
        print(ThreadPool().__class__, TPool().__class__)
        print(ThreadPool().__class__ is TPool().__class__)
        
      • 使用內置線程池ThreadPool 模塊實現線程復用

        from multiprocessing.pool import ThreadPool
        import time
        import random
        import threading
        
        
        def work(name, no, **kwargs):
            thread_id = threading.get_ident()
            print(f"【子線程{thread_id}-開始】({threading.current_thread()}) ")
            print(f"【子線程{thread_id}-data】 name={name}, no={no}, kwargs={kwargs}")
            time.sleep(random.randint(1, 10))
            print(f"【子線程{thread_id}-結束】")
        
        
        t_pool = ThreadPool(3)
        for i in range(5):
            t_pool.apply_async(work, (f"任務{i}", i), {"test": 1})
        
        # 關閉ThreadPool,使其不再接受新的任務;
        t_pool.close()
        t_pool.join()
        

四、進程線程的等待/終止 和 守護模式

  • 進程的等待和終止

    from multiprocessing import Process
    import time
    
    
    def func():
        while True:
            print("【子進程】")
            time.sleep(3)
    
    
    if __name__ == '__main__':
        print("【主進程】")
        p = Process(target=func)
        p.start()
    
        # 主進程等待子進程完成
        # p.join()
        # 立即結束子進程,不推薦使用,會導致子進程的資源無法被釋放
        # p.terminate()
        print("【主進程】 結束")
    
  • 線程的等待和終止

    • 線程的等待

      from threading import Thread
      import time
      
      
      def func():
          while True:
              print("【子線程】")
              time.sleep(3)
      
      
      if __name__ == '__main__':
          print("【主線程】")
          t = Thread(target=func)
          t.start()
          
          # 主線程等待子線程完成再繼續往後執行
          # t.join()
      
          print("【主線程】 結束")
      

      總結:

      • 主線程代碼執行完成以後預設等待子線程;所有子線程結束以後程式才會結束
      • t.join() 主線程在某個位置等待子線程執行完成, 再繼續執行
    • 線程的終止

      • 一. 使用 t1.stop()方法強行終止線程

        跟進程的terminate一樣,不推薦使用;目前該方法已被棄用,會導致被終止的線程所擁有的資源如打開的文件、資料庫事務等不能被正確釋放;造成數據泄漏或死鎖,除非可以肯定數據安全,否則不建議強行殺死線程
        
      • 二. 通過拋出異常來終止線程

        比較複雜一般不用
        
      • 三.通過一個終止標誌來終止線程

        • 實現方式1:

          # 方式一: 使用全局變數stop_threads控制線程終止; 
          # 這裡子線程直接訪問主線程中的全局變數會導致數據混亂,不推薦
          
          from threading import Thread
          import time
          
          
          def func():
              while True:
                  print("【子線程】")
                  time.sleep(3)
                  if stop_threads:
                      return
          
          
          if __name__ == '__main__':
              print("【主線程】")
              stop_threads = False
          
              t = Thread(target=func)
              t.start()
          
              time.sleep(10)
              stop_threads = True
          
              print("【主線程】 結束")
          
        • 實現方式2:

          # 方式二:通過函數間接訪問局部變數stop_threads, 推薦使用
          # 好處:通過在子線程中指定位置設置檢測點,可以在主線程中任何時候終止子線程,並且子線程所擁有的資源也能被正確釋放
          
          from threading import Thread
          import time
          
          
          def func(get_stop_flag):
              while True:
                  print("【子線程】")
                  time.sleep(3)
                  if get_stop_flag():
                      return
          
          
          if __name__ == '__main__':
              print("【主線程】")
              stop_threads = False
          
              t = Thread(target=func, args=(lambda: stop_threads,))
              t.start()
          
              time.sleep(10)
              stop_threads = True
          
              print("【主線程】 結束")
          
      • 四. 將子線程設置為守護線程

        不能指定子線程終止時間, 接下來我們就說下守護模式
        
  • 守護模式

    • 守護進程概念

      隨著主進程代碼執行結束,守護進程結束, 可以理解為子進程開啟守護進程以後,主進程為子進程的運行保駕護航;當主進程結束,沒有守護以後,子進程立刻就會結束;

    • 守護進程代碼示例

        from multiprocessing import Process
        import time
        
        
        def func():
            while True:
                print("【子進程】")
                time.sleep(3)
        
        
        if __name__ == '__main__':
            print("【主進程】")
            # 設置p為守護進程, 創建時設置
            # p = Process(target=func, daemon=True)
            p = Process(target=func)
            print(f"【主進程-創建進程後】子進程是否正在運行: {p.is_alive()}")
        
            # 設置p為守護進程, 啟動進程前設置(預設p.daemon = False)
            p.daemon = True
            p.start()
            print(f"【主進程-啟動進程後】子進程是否正在運行: {p.is_alive()}")
        
          print("【主進程】 結束")
      
    • 守護線程概念

      在主線程代碼執行結束後,等待其它非守護子線程執行結束,守護線程立即結束;即主線程只會等待非守護線程結束,不會等待守護線程執行完畢,只要主線程代碼執行結束,守護線程就會結束。

    • 守護線程代碼示例

      from threading import Thread
      import time
      
      
      def func():
          while True:
              print("【子線程】")
              time.sleep(3)
      
      
      if __name__ == '__main__':
          print("【主線程】")
          # 設置t為守護線程, 創建時設置
          # t = Thread(target=func, daemon=True)
          t = Thread(target=func)
          print(f"【主線程-創建線程後】子線程是否正在運行: {t.is_alive()}")
      
          # 設置t為守護線程, 啟動線程前設置(預設t.daemon = False)
          t.daemon = True
          # 等效於 t.daemon = True
          # t.setDaemon(True)
      
          t.start()
          print(f"【主線程-創建線程後】子線程是否正在運行: {t.is_alive()}")
      
          print("【主線程】 結束")
      

五、多進程多線程隊列綜合演練

  • 進程池線程池高性能併發通信

    # 【本機環境運行】
    # 導入進程池
    from multiprocessing import Pool, cpu_count
    # 導入線程池
    from multiprocessing.pool import ThreadPool
    from socket import *
    from queue import Queue
    import os
    
    
    # 從隊列讀取數據並返回給客戶端
    def send_data(client, addr, q):
        # 子進程中無法使用input,而且子進程錯誤不會展示
        print(f"【send_data】準備向客戶{addr}發送數據...")
        while True:
            msg = q.get()
            if not msg:
                print(f"【send_data】收到關閉通知, 發送功能關閉!")
                return
    
            client.send(f"您的消息 【{msg}】 已收到, over !".encode("gbk"))
    
            
    # 收到客戶發來的數據存儲到隊列
    def recv_data(client, addr, q):
        print(f"【recv_data】準備接收客戶{addr}的數據...")
        while True:
            data = client.recv(1024).decode('gbk')
            q.put(data)
            # 客戶端調用close; data為 ''  (網路調試助手需要關閉通訊視窗才會調用close)
            if not data:
                # 往隊列寫入None,通知發送消息的子線程關閉,並關閉服務套接字
                q.put('')
                client.close()
                print(f"【recv_data】客戶{addr}關閉連接, 接收功能關閉!")
                return
    
            print(f"【recv_data】 {addr} 發來消息 : {data}\n")
    
    
    # 進程負責處理連接請求(一個進程跟進一個客戶)
    def process_connect(client, addr):
        print(f"由進程 {os.getpid()} 為新客戶 {addr} 服務!")
        # 線程負責處理數據請求(一個線程處理客戶的一個需求)
        t_pool = ThreadPool(2)
        # 創建一個隊列,為接收和發送之間傳遞消息
        q = Queue()
        t_pool.apply_async(send_data, (client, addr, q))
        t_pool.apply_async(recv_data, (client, addr, q))
    
    
    def main():
        # 創建tcp監聽套接字
        tcp_server_socket = socket(AF_INET, SOCK_STREAM)
        tcp_server_socket.bind(("127.0.0.1", 9000))
        tcp_server_socket.listen(128)
    
        # 進程池負責接收連接請求(進程池數與cpu處理器數量一致)
        pool = Pool(cpu_count())
        while True:
            # 等待連接請求,獲取服務套接字
            client_socket, client_addr = tcp_server_socket.accept()
            pool.apply_async(process_connect, (client_socket, client_addr))
    
    if __name__ == '__main__':
       main() 
    

六、GIL全局解釋器鎖

基本概念

  GIL 是python的全局解釋器鎖,同一進程中假如有多個線程運行,一個線程在運行python程式的時候會霸占python解釋器(加了一把鎖即GIL),使該進程內的其他線程無法運行,等該線程運行完後其他線程才能運行。如果線程運行過程中遇到耗時操作,則解釋器鎖解開,使其他線程運行。所以在多線程中,線程的運行仍是有先後順序的,並不是同時進行。
  我們可以把GIL看作是“通行證”,並且在一個python進程中,GIL只有一個。拿不到通行證的線程,就不允許進入CPU執行。GIL只在cpython中才有,即同一個進程下的多個線程無法利用多核優勢。
  • 互斥鎖和GIL全局解釋器鎖的區別

    • 互斥鎖就是對共用數據進行鎖定,保證同一時刻只有一個線程操作數據,是數據級別的鎖。

    • GIL鎖是解釋器級別的鎖,保證同一時刻下同一個進程中只有一個線程拿到GIL鎖,擁有執行

      許可權。

  • 關於GIL全局解釋器鎖的說明

    • Python語言和GIL沒有半毛錢關係。僅僅是由於歷史原因在Cpython虛擬機(解釋器),難以移除GIL。 更換其他解釋器就不會存在GIL
    • Python使用多進程是可以利用多核的CPU資源。
    • 為什麼不刪除GIL-Guido的聲明

七、進程線程對比

  • 多進程和多線程的關係 進程與線程的一個簡單解釋

  • 定義不同

    • 進程是系統進行資源分配和調度的一個獨立單位.
    • 線程CPU調度和分派的基本單位
  • 功能對比

    • 進程,能夠完成多任務,比如 在一臺電腦上能夠同時運行多個QQ
    • 線程,能夠完成多任務,比如 一個QQ中的多個聊天視窗
  • 區別

    • 一個程式至少有一個進程,一個進程至少有一個線程.
    • 線程的劃分尺度小於進程(資源比進程少),使得多線程程式的併發性高。
      • 進程在執行過程中擁有獨立的記憶體單元,而多個線程共用記憶體,從而極大地提高了程式的運行效率
      • 線線程不能夠獨立執行,必須依存在進程中
      • 可以將進程理解為工廠中的一條流水線,而其中的線程就是這個流水線上的工人
  • 優缺點

    • 線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。
  • 進程線程選擇

    • 計算密集型: 多進程;
      • IO密集型: 多線程、協程
  • 其他問題

    • IO密集型中多線程與協程的執行速度

      IO密集型執行時間主要在IO讀寫,python中由於GIL鎖的原因,多線程其實還是使用的單核在進行cpu計算,如果計算任務加鎖了,cpu時間片調度機制會在一個cpu時間片(python預設是處理完1000個位元組碼)結束後,去釋放GIL鎖,並查看其他線程是否可以執行,由於任務被加鎖,會在第二個cpu時間片繼續把時間片分給第一個線程,這會讓cpu調度時間白白浪費,反而導致多線程比協程(遇到耗時操作自動切換任務)耗時更久
      
    • 計算密集型中多線程與單線程的執行速度

      計算量小的情況下單線程快,因為多線程切換需要時間
      計算量大的情況下多線程快,多線程會獲得更多的CPU執行時間
      

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、是什麼 TCP/IP,傳輸控制協議/網際協議,是指能夠在多個不同網路間實現信息傳輸的協議簇 TCP(傳輸控制協議) 一種面向連接的、可靠的、基於位元組流的傳輸層通信協議 IP(網際協議) 用於封包交換數據網路的協議 TCP/IP協議不僅僅指的是TCP和IP兩個協議,而是指一個由FTP、SMTP、T ...
  • 前言 習慣了在 css 文件裡面編寫樣式,其實JavaScript 的 CSS對象模型也提供了強大的樣式操作能力, 那就隨文章一起看看,有多少能力是你不知道的吧。 樣式來源 客從八方來, 樣式呢, 樣式五方來。 chrome舊版本用戶自定義樣式目錄: %LocalAppData%/Google/Ch ...
  • VUE 腳手架 腳手架文件結構 ├── node_modules ├── public │ ├── favicon.ico: 頁簽圖標 │ └── index.html: 主頁面 ├── src │ ├── assets: 存放靜態資源 │ │ └── logo.png │ │── componen ...
  • 最近看到了許多關於 :has() 選擇器的知識點,在此總結下來。 MDN 對 :has() 選擇器 的解釋是這樣的: CSS 函數式偽類 :has() 表示一個元素,如果作為參數傳遞的任何相對選擇器在錨定到該元素時,至少匹配一個元素。這個偽類通過把可容錯相對選擇器列表作為參數,提供了一種針對引用元素 ...
  • 系統功能文檔是一種描述軟體系統功能和操作方式的文檔。它讓開發團隊、測試人員、項目管理者、客戶和最終用戶對系統行為有清晰、全面的瞭解。 通過ChatGPT,我們能讓編寫系統功能文檔的效率提升10倍以上。 用ChatGPT生成系統功能文檔 我們以線上商城系統為例,介紹如何使用ChatGPT幫我們完成系統 ...
  • isa 走點陣圖 在講 OC->Class 底層類結構之前,先看下下麵這張圖: 通過isa走點陣圖 得出的結論是: 1,類,父類,元類都包含了 isa, superclass 2,對象isa指向類對象,類對象的isa指向了元類,元類的 isa 指向了根元類,根元類 isa 指向自己 3,類的 super ...
  • 零售商家為什麼要建設線上商城? 傳統的實體門店服務範圍有限,只能吸引周邊500米以內的消費者。因此,如何拓展服務範圍,吸引更多的消費者到店,成為了店家迫切需要解決的問題。 缺乏忠實顧客,客戶基礎不穩,往往是一次性購物,門店無法形成有效的顧客迴流。在當前的市場環境下,構建並維護粉絲群體,成為了商家的核 ...
  • C-04.邏輯架構 1.邏輯架構剖析 1.1 伺服器處理客戶端請求 首先MySQL是典型的C/S架構,即Client/Server架構,客戶端使用的是mysql,伺服器端程式使用的mysqld。 不論客戶端進程和伺服器進程是採用那種方式進行通信,最後實現的效果都是:客戶端進程向伺服器進程發送一段文本 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...