python進程池:multiprocessing.pool

来源:http://www.cnblogs.com/kevinchou/archive/2016/06/16/5591130.html
-Advertisement-
Play Games

本文轉至http://www.cnblogs.com/kaituorensheng/p/4465768.html,在其基礎上進行了一些小小改動。 在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程式控制制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用m ...


本文轉至http://www.cnblogs.com/kaituorensheng/p/4465768.html,在其基礎上進行了一些小小改動。

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程式控制制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

例1:使用進程池

from multiprocessing import freeze_support,Pool
import time

def Foo(i):
    time.sleep(2)
    print('___time---',time.ctime())
    return i+100

def Bar(arg):
    print('----exec done:',arg,time.ctime())

if __name__ == '__main__':
    freeze_support()
    pool = Pool(3) #線程池中的同時執行的進程數為3

    for i in range(4):
        pool.apply_async(func=Foo,args=(i,),callback=Bar) #線程池中的同時執行的進程數為3,當一個進程執行完畢後,如果還有新進程等待執行,則會將其添加進去
        # pool.apply(func=Foo,args=(i,))

    print('end')
    pool.close()
    pool.join()#調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束
View Code

執行結果:

end
___time--- Thu Jun 16 15:11:45 2016
----exec done: 100 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:45 2016
----exec done: 101 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:45 2016
----exec done: 102 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:47 2016
----exec done: 103 Thu Jun 16 15:11:47 2016

函數解釋

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
  • close()    關閉pool,使其不在接受新的任務。
  • terminate()    結束工作進程,不在處理未完成的任務。
  • join()    主進程阻塞,等待子進程的退出, join方法要在close或terminate之後使用。

執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事後才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"後。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for迴圈後直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程式在pool.join()處等待各個進程的結束。

例2:使用進程池(阻塞)

from multiprocessing import freeze_support,Pool
import time

def Foo(i):
    time.sleep(2)
    print('___time---',time.ctime())
    return i+100

def Bar(arg):
    print('----exec done:',arg,time.ctime())

if __name__ == '__main__':
    freeze_support()
    pool = Pool(3) #線程池中的同時執行的進程數為3

    for i in range(4):
        pool.apply(func=Foo,args=(i,))

    print('end')
    pool.close()
    pool.join()#調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束
View Code

執行結果

___time--- Thu Jun 16 15:15:16 2016
___time--- Thu Jun 16 15:15:18 2016
___time--- Thu Jun 16 15:15:20 2016
___time--- Thu Jun 16 15:15:22 2016
end

例3:使用進程池,並關註結果

import multiprocessing
import time

def func(msg):
    print('hello :',msg,time.ctime())
    time.sleep(2)
    print('end',time.ctime())
    return 'done' + msg

if __name__=='__main__':
    pool = multiprocessing.Pool(2)
    result = []
    for i in range(3):
        msg = 'hello %s' %i
        result.append(pool.apply_async(func=func,args=(msg,)))

    pool.close()
    pool.join()

    for res in result:
        print('***:',res.get())

    print('AAAAAAAAll end--')
View Code

執行結果


hello : hello 0 Thu Jun 16 15:26:33 2016
hello : hello 1 Thu Jun 16 15:26:33 2016
end Thu Jun 16 15:26:35 2016
hello : hello 2 Thu Jun 16 15:26:35 2016
end Thu Jun 16 15:26:35 2016
end Thu Jun 16 15:26:37 2016
***: donehello 0
***: donehello 1
***: donehello 2
AAAAAAAAll end--

 :get()函數得出每個返回結果的值

例4:使用多個進程池

import multiprocessing
import time,os,random

def Lee():
    print('\nRun task Lee--%s******ppid:%s'%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.randrange(10))
    end = time.time()
    print('Task Lee,runs %0.2f seconds.'%(end-start),'~~~~',time.ctime())

def Marlon():
    print("\nRun task Marlon-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print( 'Task Marlon runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())

def Allen():
    print( "\nRun task Allen-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print( 'Task Allen runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())

def Frank():
    print( "\nRun task Frank-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print( 'Task Frank runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())

if __name__ == '__main__':
    func_list = [Lee,Marlon,Allen,Frank]
    print('parent process id %s'%os.getpid())

    pool = multiprocessing.Pool(4)
    for func in func_list:
        pool.apply_async(func)  #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中

    print( 'Waiting for all subprocesses done...')
    pool.close()
    pool.join()    #調用join之前,一定要先調用close() 函數,否則會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束
    print ('All subprocesses done.')
View Code

執行結果

parent process id 98552
Waiting for all subprocesses done...

Run task Lee--97316******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016

Run task Marlon-95536******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016

Run task Allen-95720******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016

Run task Frank-98784******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
Task Allen runs 0.31 seconds. ~~~~ Thu Jun 16 15:20:51 2016
Task Lee,runs 7.00 seconds. ~~~~ Thu Jun 16 15:20:57 2016
Task Frank runs 14.48 seconds. ~~~~ Thu Jun 16 15:21:05 2016
Task Marlon runs 31.72 seconds. ~~~~ Thu Jun 16 15:21:22 2016
All subprocesses done.

multiprocessing pool map

複製代碼
#coding: utf-8
import multiprocessing 

def m1(x): 
    print x * x 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
    i_list = range(8)
    pool.map(m1, i_list)
複製代碼

一次執行結果

0
1
4
9
16
25
36
49

 參考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx 

 

問題:http://bbs.chinaunix.net/thread-4111379-1-1.html

複製代碼
#coding: utf-8
import multiprocessing
import logging

def create_logger(i):
    print i

class CreateLogger(object):
    def __init__(self, func):
        self.func = func

if __name__ == '__main__':
    ilist = range(10)

    cl = CreateLogger(create_logger)
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(cl.func, ilist)

    print "hello------------>"
複製代碼

一次執行結果

0
1
2
3
4
5
6
7
8
9
hello------------>


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 安裝篇 第一步:配置防火牆(預設情況下,埠80和3306是拒絕訪問的,在防火牆上進行配置): vi /etc/sysconfig/iptables(在"COMMIT"的上一行加上如下兩句) -A INPUT -m state --state NEW -m tcp -p tcp --dport 80 ...
  • 目錄 1 如何更新權值向量?2 最小均方法(LMS)與感知機:低效的民主3 最小二乘法:完美的民主4 支持向量機:現實的民主5 總結6 參考資料 1 如何更新權值向量? 在關於線性模型你可能還不知道的二三事(一、樣本)中我已提到如何由線性模型產生樣本,在此前提下,使用不同機器學習演算法來解決回歸問題的 ...
  • 1、下載php安裝包 http://cn2.php.net/get/php-5.5.36.tar.gz/from/this/mirror 預設情況下Nginx和PHP他倆之間是一點感覺沒有的。Apache+PHP編譯後生成的是模塊文件,而Nginx+PHP需要PHP生成可執行文件才可以,所以要利用f ...
  • 如何解決PHP中文亂碼問題 如何解決PHP中文亂碼問題 一、解決HTML中中文亂碼問題方法 1、在head標簽裡面加入UTF8編碼(國際化編碼):UTF-8是沒有國家的編碼,也就是獨立於任何一種語言,任何語言都可以使用的。 <meta http-equiv="Content-Type" conten ...
  • 都說我們要做模塊化設計,而不要做功能化設計 什麼是模塊化設計,就是可插拔性高,組件化,想要就用,不要用拉倒,直接刪除就行 什麼是功能化設計,就是一個簡單的功能,實現想要的效果,但是不夠通用化,別人要用的話需要讀懂你的代碼,還需要複製黏貼很多代碼這樣效率不高 今天寫了一個省市區三級聯動的模塊,寫完後使 ...
  • 最近開發一個項目,用到了單例模式,頭文件大概如下 class CRecGuard{public: CRecGuard(){::InitializeCriticalSection(&cs);Guard();} ~CRecGuard(){UnGuard();::DeleteCriticalSection ...
  • 1. 相關內容介紹 1>互聯網開發 互聯網:傳統互聯網、移動互聯網 互聯網開發:前端開發(前臺)、後臺開發(後端、服務端) 前端開發:視覺展示(用戶界面)、用戶交互、採集輸入信息 後臺開發:管理和處理數據、開發對客戶端的介面、控制輸出 互聯網:傳統互聯網、移動互聯網 互聯網開發:前端開發(前臺)、後 ...
  • 在php中,有兩種基本的輸出方法:echo 和 print echo 和 print 之間的差異: echo——能夠輸出一個以上的字元串,無返回值 print——只能輸出一個字元串,並始終返回值為1 echo的語句: echo是一個語言結構,有無括弧均可使用:echo 或echo(); 例如: 顯示 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...