多進程(multiprocessing module)

来源:https://www.cnblogs.com/horror/archive/2018/07/21/9348285.html
-Advertisement-
Play Games

一、多進程 1.1 多進程的概念 由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。藉助 ...


一、多進程

1.1 多進程的概念

  由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。藉助這個包,可以輕鬆完成從單進程到併發執行的轉換。multiprocessing支持子進程、通信和共用數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

  multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程式內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

但在使用這些共用API的時候,我們要註意以下幾點:

  a、在UNIX平臺上,當某個進程終結之後,該進程需要被其父進程調用wait,否則進程成為僵屍進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同於wait)。對於多線程來說,由於只有一個進程,所以不存在此必要性。

  b、Windows系統下,需要註意的是要想啟動一個子進程,必須加上 if __name__ == "__main__",進程相關的要寫在這句下麵。

1.2 創建進程的兩種方式

直接創建:

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4     time.sleep(1)
 5     print('hello', name,time.ctime())
 6 
 7 if __name__ == '__main__':
 8     p_list=[]
 9     for i in range(3):
10         p = Process(target=f, args=('alvin',))
11         p_list.append(p)
12         p.start()
13     for i in p_list:
14         p.join()
15     print('end')
Demo1

類式調用:

 1 from multiprocessing import Process
 2 import time
 3 
 4 class MyProcess(Process):
 5     def __init__(self):
 6         super(MyProcess, self).__init__()
 7         #self.name = name
 8 
 9     def run(self):
10         time.sleep(1)
11         print ('hello', self.name,time.ctime())
12 
13 
14 if __name__ == '__main__':
15     p_list=[]
16     for i in range(3):
17         p = MyProcess()
18         p.start()
19         p_list.append(p)
20 
21     for p in p_list:
22         p.join()
23 
24     print('end')
Demo2

二、Process類

2.1 構造方法

   Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

2.2 實例方法

   is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t預設run()方法。

  terminate():不管任務是否完成,立即停止工作進程

2.3 屬性

    authkey

  daemon:和線程的setDeamon功能一樣

  exitcode(進程在運行時為None、如果為–N,表示被信號N結束)

  name:進程名字。

  pid:進程號。

 1 import time
 2 from  multiprocessing import Process
 3 
 4 def foo(i):
 5     time.sleep(1)
 6     print (p.is_alive(),i,p.pid)
 7     time.sleep(1)
 8 
 9 if __name__ == '__main__':
10     p_list=[]
11     for i in range(10):
12         p = Process(target=foo, args=(i,))
13         #p.daemon=True
14         p_list.append(p)
15 
16     for p in p_list:
17         p.start()
18     # for p in p_list:
19     #     p.join()
20 
21     print('main process end')
Demo

三、進程間通信

不同進程間記憶體是不共用的,要想實現兩個進程間的數據交換,可以用以下方法:

3.1 Queues 使用方法跟threading里的queue類似 --> 將q作為參數傳遞給子進程。

 1 from multiprocessing import Process, Queue
 2 
 3 def f(q,n):
 4     q.put([42, n, 'hello'])
 5 
 6 if __name__ == '__main__':
 7     q = Queue()
 8     p_list=[]
 9     for i in range(3):
10         p = Process(target=f, args=(q,i))
11         p_list.append(p)
12         p.start()
13     print(q.get())
14     print(q.get())
15     print(q.get())
16     for i in p_list:
17             i.join()
Demo1

3.2 Pipes --> 通過管道Pipe實現。

 1 from multiprocessing import Process, Pipe
 2  
 3 def f(conn):
 4     conn.send([42, None, 'hello'])
 5     conn.close()
 6  
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe()
 9     p = Process(target=f, args=(child_conn,))
10     p.start()
11     print(parent_conn.recv())   # prints "[42, None, 'hello']"
12     p.join()
Demo2

3.3 數據共用(Manager)

Manager()返回的管理器對象控制一個伺服器進程,該進程保存Python對象並允許其他進程使用代理操作它們。

 1 from multiprocessing import Process, Manager
 2 
 3 def f(d, l,n):
 4     d[n] = '1'
 5     d['2'] = 2
 6     d[0.25] = None
 7     l.append(n)
 8     print(l)
 9 
10 if __name__ == '__main__':
11     with Manager() as manager:
12         d = manager.dict()
13 
14         l = manager.list(range(5))
15         p_list = []
16         for i in range(10):
17             p = Process(target=f, args=(d, l,i))
18             p.start()
19             p_list.append(p)
20         for res in p_list:
21             res.join()
22 
23         print(d)
24         print(l)
Demo

3.4 進程同步

Without using the lock output from the different processes is liable to get all mixed up.  如果不使用來自不同進程的鎖定輸出,則可能會混淆不清。

 1 from multiprocessing import Process, Lock
 2 
 3 def f(l, i):
 4     l.acquire()
 5     try:
 6         print('hello world', i)
 7     finally:
 8         l.release()
 9 
10 if __name__ == '__main__':
11     lock = Lock()
12 
13     for num in range(10):
14         Process(target=f, args=(lock, num)).start()

3.5 進程池

進程池內部維護一個進程式列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那麼程式就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply
  • apply_async
 1 from  multiprocessing import Process,Pool
 2 import time
 3  
 4 def Foo(i):
 5     time.sleep(2)
 6     return i+100
 7  
 8 def Bar(arg):
 9     print('-->exec done:',arg)
10  
11 pool = Pool(5)
12  
13 for i in range(10):
14     pool.apply_async(func=Foo, args=(i,),callback=Bar)
15     #pool.apply(func=Foo, args=(i,))
16  
17 print('end')
18 pool.close()
19 pool.join()

四、協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操作鎖定及同步的開銷
  • "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
  • 方便切換控制流,簡化編程模型
  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高併發處理。

 

缺點:

  • 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程式

4.1 使用yield實現協程操作

 1 import time 
2 import queue
3
def consumer(name): 4 print("--->starting eating baozi...") 5 while True: 6 new_baozi = yield 7 print("[%s] is eating baozi %s" % (name,new_baozi)) 8 #time.sleep(1) 9 10 def producer(): 11 12 r = con.__next__() 13 r = con2.__next__() 14 n = 0 15 while n < 5: 16 n +=1 17 con.send(n) 18 con2.send(n) 19 print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) 20 21 22 if __name__ == '__main__': 23 con = consumer("c1")    # 創建生成器對象 24 con2 = consumer("c2")    # 創建生成器對象 25 p = producer()        # 執行producer函數

協程標准定義,即符合什麼條件就能稱之為協程:

  1. 必須在只有一個單線程里實現併發
  2. 修改共用數據不需加鎖
  3. 用戶程式里自己保存多個控制流的上下文棧
  4. 一個協程遇到IO操作自動切換到其它協程

基於上面這4點定義,我們剛纔用yield實現的程並不能算是合格的線程,因為它有一點功能沒實現。

4.2 Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator

 1 # -*- coding:utf-8 -*-
 2  
 3  
 4 from greenlet import greenlet
 5  
 6 def test1():
 7     print(12)
 8     gr2.switch()
 9     print(34)
10     gr2.switch()
11  
12 def test2():
13     print(56)
14     gr1.switch()
15     print(78)
16  
17 gr1 = greenlet(test1)
18 gr2 = greenlet(test2)
19 gr1.switch()

感覺確實用著比generator還簡單了,但好像還沒有解決一個問題,就是遇到IO操作,自動切換,並不對。

4.3 Gevent 

Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程式操作系統進程的內部,但它們被協作式地調度。

 1 import gevent
 2  
 3 def func1():
 4     print('\033[31;1m李闖在跟海濤搞...\033[0m')
 5     gevent.sleep(2)
 6     print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m')
 7  
 8 def func2():
 9     print('\033[32;1m李闖切換到了跟海龍搞...\033[0m')
10     gevent.sleep(1)
11     print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m')
12  
13  
14 gevent.joinall([
15     gevent.spawn(func1),
16     gevent.spawn(func2),
17     #gevent.spawn(func3),
18 ])

輸出:

李闖在跟海濤搞...
李闖切換到了跟海龍搞...
李闖搞完了海濤,回來繼續跟海龍搞...
李闖又回去跟繼續跟海濤搞...

五、補充

5.1 同步與非同步的性能區別

 1 import gevent
 2  
 3 def task(pid):
 4     """
 5     Some non-deterministic task
 6     """
 7     gevent.sleep(0.5)
 8     print('Task %s done' % pid)
 9  
10 def synchronous():
11     for i in range(1,10):
12         task(i)
13  
14 def asynchronous():
15     threads = [gevent.spawn(task, i) for i in range(10)]
16     gevent.joinall(threads)
17  
18 print('Synchronous:')
19 synchronous()
20  
21 print('Asynchronous:')
22 asynchronous()

上面程式的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。

5.2 遇到IO阻塞時會自動切換任務(gevent 庫中的 monkey 方法)--> 補丁

  ------->能夠最大程度監聽IO阻塞,提高效率。

 1 from gevent import monkey
 2 monkey.patch_all()
 3 
 4 import gevent
 5 from  urllib.request import urlopen
 6  
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = urlopen(url)
10     data = resp.read()
11     print('%d bytes received from %s.' % (len(data), url))
12  
13 gevent.joinall([
14         gevent.spawn(f, 'https://www.python.org/'),
15         gevent.spawn(f, 'https://www.yahoo.com/'),
16         gevent.spawn(f, 'https://github.com/'),
17 ])

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1 // C++函數和類 05-返回類型.cpp: 定義控制台應用程式的入口點。 2 // 3 4 #include "stdafx.h" 5 #include 6 #include 7 #include 8 #include 9 #include 10 using namespace std; 1... ...
  • 1 // C++函數和類 01-函數.cpp: 定義控制台應用程式的入口點。 2 // 3 4 #include "stdafx.h" 5 #include 6 #include 7 #include 8 #include 9 #include 10 using namespace std; 11 ... ...
  • c/c++ 用前序和中序,或者中序和後序,創建二叉樹 用前序和中序創建二叉樹 用後序和中序創建二叉樹 bintreemain.c "完整代碼" 編譯方法:g++ g nodestack.c nodequeue.c bintree.c bintreemain.c ...
  • static 用法 1.static 變數 static變數又稱為靜態變數,靜態變數保存在方法區靜態域中,一個類的靜態變數被其所有實例共用。 2.static方法 靜態方法不與包含它的任何對象關聯,即使沒有創建對象,也可使用,例: 1 public class StaticTest { 2 3 pu ...
  • ng-app="angular_app" 範圍 ng-controller="angular_controller" 控制器 ng-init="findAll()" 初始化方法 雙向數據綁定 就是angular的ng-model屬性同時具備了表單元素中的name與value兩個屬性,既可以提交表單數 ...
  • static 和 final 關鍵字 對實例變數賦初始值的影響 最近一直在看《深入理解Java虛擬機》,在看完了對象記憶體分配、Class文件格式之後,想深扒一下實例變數是如何被賦上初始值的這個問題的細節。 在2.3.1小節中講對象創建的時候,講到記憶體分配有兩種方式:一種是指針碰撞;另一種是空閑列表。 ...
  • python語言程式設計基礎 習題2.5 ​​​ ...
  • 大道至簡這本書總體來說比較通俗易懂,同時在說明自己觀點的時候引用了許多古代的例子,更加的形象生動有趣,可讀性很強。 前幾章的主要思想如下: 程式=演算法+結構+方法;編程的第一要務是先把事情分析清楚,把事件先後的邏輯關係和依賴關係搞清楚,然後再去寫代碼實現。代碼是不存在的,存在的只是思想。其實演算法是對 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...