第三十二天- 管道 進程池

来源:https://www.cnblogs.com/xi1419/archive/2018/11/29/10041082.html
-Advertisement-
Play Games

1.管道 進程間通信(IPC)方式二:管道(不推薦使用,瞭解即可),埠易導致數據不安全的情況出現。 2.共用數據 進程之間數據共用的模塊之一Manager模塊(少用): 進程間數據是獨立的,可以藉助於隊列或管道實現通信,二者都是基於消息傳遞的雖然進程間數據獨立,但可以通過Manager實現數據共用 ...


 

1.管道

  進程間通信(IPC)方式二:管道(不推薦使用,瞭解即可),埠易導致數據不安全的情況出現。

 1 from multiprocessing import Pipe,Process
 2 
 3 
 4 def func(conn1,conn2):
 5     msg = conn1.recv()  # 接收了conn2傳遞的
 6     # msg1 = conn2.recv()  # 接收了conn1傳遞的
 7     print('>>>',msg)
 8     # print('>>>',msg1)
 9 
10 
11 if __name__ == '__main__':
12     # 拿到管道的兩端,雙工通信方式,兩端都可以收發消息
13     conn1,conn2 = Pipe()  # 必須在Process之前產生管道
14     p = Process(target=func,args=(conn1,conn2,))  # 管道給子進程
15     p.start()
16     conn1.send('hello')
17     conn1.close()
18     conn2.send('小子')
19     conn2.close()
20 
21     print('進程結束')
22 
23 # 註意管道不用了就關閉防止異常

 

 

2.共用數據

  進程之間數據共用的模塊之一Manager模塊(少用):

  進程間數據是獨立的,可以藉助於隊列或管道實現通信,二者都是基於消息傳遞的雖然進程間數據獨立,但可以通過Manager實現數據共用:

 1 from multiprocessing import Manager,Process,Lock
 2 
 3 
 4 def func1(dic,loc):
 5     # loc.acquire()  # 不加鎖易出錯
 6     dic['num'] -= 1
 7     # loc.release()
 8 
 9 
10 if __name__ == '__main__':
11     m = Manager()
12     loc = Lock()
13     dic = m.dict({'num':100})
14     p_list = []
15     for i in range(100):
16         p = Process(target=func1, args=(dic,loc))
17         p_list.append(p)
18         p.start()
19 
20     [pp.join() for pp in p_list]
21 
22     print('>>>>>',dic['num'])
23 # 共用時不加鎖,很可能導致同一個數據被多個子進程取用,數據是不安全的,且超多進程消耗大量資源易導致卡死.
基於Manager的數據共用

  多進程共同去處理共用數據的時候,就和我們多進程同時去操作一個文件中的數據是一樣的,不加鎖就會出現錯誤的結果,進程不安全的,所以也需要加鎖

 

總結:進程間應該儘量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。

 

 

3.進程池 Pool

  創建進程需要消耗時間,銷毀進程(空間,變數,文件信息等等的內容)也需要消耗時間。開啟成千上萬的進程,操作系統無法讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還需要進行切換並且記錄每個進程的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是操作系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束進程,這就需要用到進程池:

  定義一個池子,在裡面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現併發效果。

 

創建方法:

 Pool([numprocess [,initializer [, initargs]]]):創建進程池

參數介紹:

1 numprocess:要創建的進程數,如果省略,將預設使用cpu_count()的值
2 initializer:是每個工作進程啟動時要執行的可調用對象,預設為None
3 initargs:是要傳給initializer的參數組

常用方法:

p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數併發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
    
p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成

P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用

主要方法介紹

 

 1 import time
 2 from multiprocessing import Process,Pool
 3 
 4 
 5 def func1(i):
 6     num = 0
 7     for j in range(5):
 8         num += i
 9 
10 
11 if __name__ == '__main__':
12     pool = Pool(6)  # 創建進程池
13 
14     p_list = []
15     start_time = time.time()
16     for i in range(500):
17         p = Process(target=func1,args=(i,))
18         p_list.append(p)
19         p.start()
20 
21     [pp.join() for pp in p_list]
22     end_time = time.time()
23     print('耗時:',end_time-start_time)
24 
25     s_time = time.time()
26     pool.map(func1,range(500))  # map
27     e_time = time.time()
28     print('耗時:',e_time - s_time)  # 耗時遠遠小於直接開500進程
進程池 簡單應用

 

apply同步方法:

 1 from multiprocessing import Process,Pool
 2 import time
 3 
 4 
 5 def func1(i):
 6     num = 0
 7     for j in range(3):
 8         num += i
 9     time.sleep(1)
10     print(num)
11     return num
12 
13 
14 if __name__ == '__main__':
15     pool = Pool(6)
16 
17     for i in range(10):
18         res = pool.apply(func1,args=(i,))  # apply 進程同步/串列方法 效率低,不常用
19         # print(res)
apply 進程同步/串列方法

 

apply_async非同步方法:

 1 from multiprocessing import Process,Pool
 2 import time
 3 
 4 
 5 def func1(i):
 6     num = 0
 7     for j in range(5):
 8         num += i
 9     time.sleep(1)
10     # print('>>>>>',num)
11     return num
12 
13 
14 if __name__ == '__main__':
15     pool = Pool(6)
16 
17     red_list = []
18     for i in range(10):
19         res = pool.apply_async(func1,args=(i,))
20         red_list.append(res)
21 
22     pool.close()  # 不是關閉,只是鎖定進程池,告訴主進程不會再添加數據進去
23     pool.join()  # 等待子程式執行完
24 
25     for ress in red_list:
26         print(ress.get())  # get方法取出返回值num 按添加順序取出已保存在緩存區的結果 所以是順序列印出的
View Code

 

回調函數:運用時註意一點,回調函數的形參執行有一個,如果你的執行函數有多個返回值,那麼也可以被回調函數的這一個形參接收,接收的是一個元祖,包含著你執行函數的所有返回值。

 1 from multiprocessing import Pool,Process
 2 import time,os
 3 
 4 
 5 def func1(n):
 6     # print('子進程的pid:',os.getpid())
 7     return n*n
 8 
 9 
10 def func2(i):
11     res = i**2
12     # print('callback的pid:',os.getpid())
13     print(res)
14     return res
15 
16 
17 if __name__ == '__main__':
18     pool = Pool(4)
19     pool.apply_async(func1,args=(3,),callback=func2)  # callback把前面的返回值作參數傳給後面
20     # print('主進程的pid:',os.getpid())  # 主進程執行了callback
21     pool.close()
22     pool.join()
回調函數 callback

 

 

4.總結

  進程之間的通信:隊列、管道、數據共用也算

  信號量和事件也相當於鎖,也是全局的,所有進程都能拿到這些鎖的狀態,進程之間這些鎖啊信號量啊事件啊等等的通信,其實底層還是socekt,只不過是基於文件的socket通信,而不是跟上面的數據共用啊空間共用啊之類的機制,我們之前學的是基於網路的socket通信,socket的兩個家族,一個文件的一個網路的,所以如果說這些鎖之類的報錯,可能你看到的就是類似於socket的錯誤。工作中常用的是鎖,信號量和事件不常用,但是信號量和事件面試的時候會問到(做瞭解)

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這是來自於leetcode的題目: 給定一個整數數組 nums 和一個目標值 target,請你在該數組中找出和為目標值的 兩個 整數。 你可以假設每種輸入只會對應一個答案。但是,你不能重覆利用這個數組中同樣的元素。這是示例: 給定 nums = [2, 7, 11, 15], target = 9 ...
  • #作業回顧s = 11 s1 = s.bit_length() print(s1) s = bool() print(s) s = 'bhsdjshfa' s3 = s[2::-2] print(s3) a = 'abisdugs' for i in a : print(i) #字元串的拼接可以通過... ...
  • 1 public boolean isPalindrome(int number) { 2 int s=number; 3 if(number0){ 8 re=re*10+number%10; 9 number/=10; 10 } 11 return re==s; 12 ... ...
  • <引言> Eclipse 中提供了一個非常人性化的功能,可以自動生成註釋為我們程式員做項目時提供便利,並且註釋內容還具有定製化 可以根據自己的喜好配置不同的樣式。 <正文> 首先我們需要找到,配置註釋的地方。 Window --> preferences --> java --> Code Styl ...
  • 前面介紹的Java編程,要麼是與數字有關的計算,要麼是與邏輯有關的推理,充其量只能實現計算器和狀態機。若想讓Java運用於更廣闊的業務領域,就得使其支撐更加血肉豐滿的業務場景,而豐滿的前提是能夠表達大眾熟知的人類語言和文字。對於英文世界來說,除了數字之外,編程語言起碼還要支持ABCD等大小寫字母,以 ...
  • 1, int 整數 2. str 字元串 3. bool 布爾值 4. list 列表. 一般存放大量的數據 ["門神xxxx", "風扇哥xxxx", 元素] 5. tuple 元組. 只讀列表, 只能看啥也不能幹. (元素, 元素) 6. dict 字典. {"風扇哥":"王偉哲", "wlh" ...
  • python元編程,使用__setatt__,__getattribute__,__delattr__等特殊方法實現定製類 ...
  • 摘要:本文主要對Java這門編程語言進行簡單的介紹。 Java簡介 說明 Java語言歷時十多年,已發展成為人類電腦史上影響深遠的編程語言,從某種程度上來看,它甚至超出了編程語言的範疇,成為一種開發平臺,一種開發規範。Java語言所崇尚的開源、自由等精神,吸引了全世界無數優秀的程式員。事實是,從人 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...