python進程、進程池(二)代碼部分

来源:https://www.cnblogs.com/caesar-id/archive/2019/04/20/10743408.html
-Advertisement-
Play Games

創建進程,守護進程,進程鎖Lock,進程通信IPC,JoinableQueue模塊簡介,進程池 ...


第一種創建進程的方式:

from multiprocessing import Process
def f(name):
    print(name,"在子進程")
if __name__ == "__main__":
    p = Process(target=f,args=("aaa",))
    p.start()
    print("執行主進程內容")

# 列印內容如下
執行主進程內容
aaa 在子進程

從列印結果我們可以看出程式先執行了主進程的print之後才執行了子進程的print。這裡主要是因為操作系統在開闢進程時需要花費一定的時間,所以程式在這段時間里,先執行了主進程的print,然後才執行子進程print。

第二種方式創建進程:

from multiprocessing import Process

class MyProcess(Process):
   # 這裡必須要調用Process中的init初始化參數
   # 否則會因為無法傳參導致錯誤
   def __init__(self,name):
      super().__init__()  # 必須有
      self.name = name
   def run(self):
      print(f"我是子進程{self.name}")
if __name__ == "__main__":
   p = MyProcess("aaa")
   p.start()
   print("我是主進程")

# 列印內容如下
我是主進程
我是子進程aaa

可以在創建子進程後使用join的方法,使程式等待子進程結束後在執行join下麵的代碼。

from multiprocessing import Process
def f(name):
    print("我是子進程",name)
if __name__ == "__main__":
    p = Process(target=f,args=("aaa",))
    p.start()
    p.join()  # 子進程結束後在,執行下麵的代碼
    print("我是主進程")

# 列印內容如下
我是子進程 aaa
我是主進程

使用os模塊查看進程PID號。

from multiprocessing import Process
import os
def f():
    print(f"父進程PID:{os.getppid()},子進程PID:{os.getpid()}")
if __name__ == "__main__":
    p = Process(target=f,args=())
    p.start()
    p.join()  # 子進程結束後在,執行下麵的代碼
    print("主進程內容")

# 列印內容如下
父進程PID:1588,子進程PID:3292
主進程內容

執行多個進程:

from multiprocessing import Process
def f(process_name):
    print(f"{process_name}")
if __name__ == "__main__":
    for i in range(3):
        p = Process(target=f, args=("子進程-"+str(i),))
        p.start()
    print("主進程")

# 列印內容如下
主進程
子進程-0
子進程-1
子進程-2

我們會發現主進程比所有子進程都優先執行了,如果我們想要在執行完所有子進程在執行父進程該怎麼辦呢?沒錯是使用join

示例一:

from multiprocessing import Process
import time
def f(process_name):
    print(f"{process_name}")
if __name__ == "__main__":
    start_time = time.time()
    for i in range(3):
        p = Process(target=f, args=("子進程-"+str(i),))
        p.start()
        p.join()
    end_time = time.time()
    print(f"執行了{end_time - start_time}")

# 列印內容如下
子進程-0
子進程-1
子進程-2
執行了0.4480257034301758

從列印結果我們可以看出是所有子進程運行後才執行了主進程。但是發現會很慢,這是因為我們的join把原本應多進程同時運行的程式(非同步),變成了同步,必須等待一個子進程結束後才會執行下一個子進程。這樣就違背了我們多進程同時執行的初衷,所以我們的join不能放在那個位置。

下麵請看示例二:

from multiprocessing import Process
import time
def f(process_name):
    print(f"{process_name}")
if __name__ == "__main__":
    start_time = time.time()
    pro_list = []
    for i in range(3):
        p = Process(target=f, args=("子進程-"+str(i),))
        p.start()
        pro_list.append(p) # 將進程對象添加到一個列表中

    for i in pro_list: # 迴圈等待所有進程結束
        i.join()
    end_time = time.time()
    print(f"執行了{end_time - start_time}")

# 列印內容如下
子進程-1
子進程-2
子進程-0
執行了0.18201017379760742

對比示例一和示例二我們可以明顯發現示例二真正實現了多個進程的併發效果。

進程的創建(第二種方式創建,很少有人用)

import os
from multiprocessing import Process
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print(f"子進程-{self.name},PID:{os.getpid()}")

if __name__ == "__main__":
    p1 = MyProcess("aaa")
    p2 = MyProcess("bbb")
    p3 = MyProcess("ccc")
    p1.start()
    p2.start()
    p3.start()
    print("主線程")

# 列印內容如下
子進程-aaa,PID:7360
子進程-bbb,PID:6956
子進程-ccc,PID:4912

雖然不常用,但是最好知道有這種方式可以創建線程。

守護進程:

有兩個特性:

1、守護進程會在主進程代碼執行結束後就終止。

2、守護進程內無法再開啟子進程,否則拋出異常。

創建守護進程比較簡單如下:

from multiprocessing import Process
def f():
    print("守護進程")
if __name__ == "__main__":
    p = Process(target=f,args=())
    p.daemon=True  # 一定要在start前執行daemom=True
    p.start()
    print("主進程")

# 列印內容如下
主進程

我們發現守護進程並沒有被執行,或者說還沒來得及執行就結束了,我們知道操作系統在開啟進程時要花費一定時間,在這個時間內主進程代碼執行完了,所以守護進程還沒來得及執行就結束了。可以使用join來等待守護進程執行完畢後在結束主進程。

from multiprocessing import Process
def f():
    print("守護進程")
if __name__ == "__main__":
    p = Process(target=f,args=())
    p.daemon=True  # 一定要在start前執行daemom=True
    p.start()
    p.join()  # 等待守護進程結束
    print("主進程")

# 列印內容如下
守護進程
主進程

進程鎖:

為保證數據的安全性,在有些場合要使用進程鎖,進程鎖會使由原來的並行變成串列,程式效率會下降,但是卻保證了數據的安全性,在數據安全性和程式效率面前,數據的安全性是大於程式的效率的。

下麵以搶票為例,現在票數還有一張:

 

from multiprocessing import Process
import time,json
def search(name):  # 查票
    di = json.load(open("db"))
    print(f"{name}查票,剩餘票數{di['count']}")

def get(name):  # 購票
    di = json.load(open("db"))
    time.sleep(0.1)
    if di["count"] > 0:
        di["count"] -= 1
        time.sleep(0.2)
        json.dump(di,open("db","w"))
        print(f"{name}購票成功")
def task(name):
    search(name)
    get(name)
if __name__ == "__main__":
    for i in range(5):  # 只模擬5個人搶一張票
        p = Process(target=task,args=("游客-"+str(i),))
        p.start()

# 列印結果如下
游客-2查票,剩餘票數1
游客-1查票,剩餘票數1
游客-0查票,剩餘票數1
游客-4查票,剩餘票數1
游客-3查票,剩餘票數1
游客-2購票成功
游客-1購票成功
游客-0購票成功
游客-4購票成功
游客-3購票成功

所有人全部購票成功,這就對數據的安全性提出了挑戰。本來只有一張票,但是5個人都顯示購票成功,這當然不是我們想要的結果,問題的原因在於,所有的游客在差不多同一時間都進行了購票,大家看到的票數都是1張,第一個用戶購票後,將票數減1等於0還沒來得及將結果寫入文件,其它用戶也進行了購票的操作,在餘票0被寫入文件的過程中,其它用戶也購票成功,並將結果寫入文件,造成了數據的混亂。

這裡我們使用進程鎖Lock也叫互斥鎖,來解決問題。

from multiprocessing import Process,Lock
import time,json

def search(name):  # 查票
    di = json.load(open("db"))
    print(f"{name}查票,剩餘票數{di['count']}")

def get(name):  # 購票
    di = json.load(open("db"))
    time.sleep(0.1)
    if di["count"] > 0:
        di["count"] -= 1
        time.sleep(0.2)
        json.dump(di,open("db","w"))
        print(f"{name}購票成功")
def task(name,lock):
    search(name)  # 查票
    lock.acquire() # 加鎖
    get(name)   # 購票
    lock.release()  # 解鎖
if __name__ == "__main__":
    lock = Lock()  # 獲取鎖
    for i in range(5):  # 只模擬5個人搶一張票
        p = Process(target=task,args=("游客-"+str(i),lock))
        p.start()

# 列印內容如下
游客-0查票,剩餘票數1
游客-1查票,剩餘票數1
游客-2查票,剩餘票數1
游客-3查票,剩餘票數1
游客-4查票,剩餘票數1
游客-0購票成功

在購票時加一個互斥鎖,這樣一個進程在購票時,其它的進程只能查看就不能進行購票的操作了,保證了數據的安全性,最終結果是正確的,這就是為什麼我們明明看到有票,但是點擊購買後卻說沒票的原因,雖然加鎖後使原本並行的程式,變成了串列。但我們要知道在不能保證數據安全的情況下一切效率都是空談。

進程間的通信IPC(Inter-Process Communication)隊列

隊列:Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

隊列的常用方法:

Queue([maxsize]) 
創建共用的進程隊列。maxsize是隊列中允許的最大數值。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。如果隊列為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,預設為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( ) 同q.get(False)方法。
q.put(item [, block [,timeout ] ] ) 
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty() 
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full() 
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.cancel_join_thread() 
不會再進程退出時自動連接後臺線程。這可以防止join_thread()方法阻塞。
q.join_thread() 
連接隊列的後臺線程。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

下麵我們已生產者消費者模型來進行演示:

 

from multiprocessing import Process,Queue
import time,random

def consumer(name,q):  # 消費者
    while True:
        task = q.get() # 從隊列中取出數據
        if task == None:break
        print(f"{name}獲取數據{task}")
        time.sleep(random.random())   # 消費者效率比生產者效率高

def producer(name,q):  # 生產者
    for i in range(3):
        q.put(i)  # 向對列中添加數據
        print(f"{name}生產數據{i}")
        time.sleep(random.uniform(1,2))  # 模擬生產者的效率沒有消費者效率高

if __name__ == "__main__":
    q = Queue()  # 獲取一個隊列
    pro = []
    for i in range(3):  # 開啟生產者進程
        p = Process(target=producer,args=("生產者"+str(i),q))
        p.start()
        pro.append(p)
    # 開啟消費者進程
    p1 = Process(target=consumer,args=("aaa",q))
    p2 = Process(target=consumer,args=("bbb",q))
    p1.start()
    p2.start()
    for i in pro:  # 等待生產者結束
        i.join()
  
    q.put(None)  # 有幾個消費者進程,就put幾次None
    q.put(None)

JoinableQueue([maxsize]) 模塊
創建可連接的共用進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共用的信號和條件變數來實現的。 

JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done() 
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。
q.join() 
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 
下麵的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。

我們在來實現上述的生產者消費者模型。

from multiprocessing import Process,JoinableQueue
import time,random

def consumer(name,q):  # 消費者
    while True:
        task = q.get() # 從隊列中取出數據
        q.task_done()  # 通知生產者,我已經取完所有數據了
        print(f"{name}獲取數據{task}")
        time.sleep(random.random())   # 消費者效率比生產者效率高

def producer(name,q):  # 生產者
    for i in range(1):
        q.put(i)  # 向對列中添加數據
        print(f"{name}生產數據{i}")
        time.sleep(random.uniform(1,2))  # 模擬生產者的效率沒有消費者效率高
    q.join()  # 生產完畢,等待消費者通知數據已經獲取完了

if __name__ == "__main__":
    q = JoinableQueue()  # 獲取一個隊列
    pro = []
    for i in range(1):  # 開啟生產者進程
        p = Process(target=producer,args=("生產者"+str(i),q))
        p.start()
        pro.append(p)
    # 開啟消費者進程
    p1 = Process(target=consumer,args=("aaa",q))
    p2 = Process(target=consumer,args=("bbb",q))
    p1.daemon=True  # 如果不設置守護進程,這兩個進程就不會結束。
    p2.daemon=True  # 因為他們只是通知生產者我接收到所有數據了,並沒有終止迴圈。
    p1.start()
    p2.start()
    for i in pro:  # 等待生產者結束
        i.join()

這裡再次說明將消費者設置成守護進程的原因,q.task_done它只是通知生產者,我把數據已經都取完了,僅此而已,所以while迴圈並不會退出。如果不設置守護進程,程式會卡在while迴圈里。

 

進程池

進程池就是預先創建一個進程組,然後有任務時從池中分配一個進程去執行任務。當任務數量超過進程池的數量時,就必須等待進程池中有空閑的進程時,才能利用空閑的進程去執行任務。

進程池的優點:

1、充分利用CPU資源。

2、多個進程在同一時刻可以同時執行,達到了並行的效果。

進程池的缺點:進程的創建、銷毀需要耗費CPU的時間。多進程適用於需要複雜計算少I/O阻塞的情況。如果程式不涉及複雜運算,最好是使用線程池。

關於進程池multiprocessing.Pool的一些方法

apply(func [, args [, kwargs]]):
 func(*args,**kwargs),然後返回結果。
需要註意的是:apply屬於進程同步的操作,即必須等待一個進程結束後才能執行下一個進程。 
apply_async(func [, args [, kwargs]]):
func(*args,**kwargs),然後返回結果。
apply_async屬於進程的非同步操作,所有進程可以同時執行,此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將立即傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。
p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用

同步進程池的示例:

import os,time
from multiprocessing import Pool
def work(n):
    print("PID:%s run" %os.getpid())
    time.sleep(1)
    return n ** 2

if __name__ == "__main__":
    p = Pool(3)  # 開啟進程池
    res = []
    for i in range(3):
        res.append(p.apply(work,args=(i,)))  # 進程同步模式
    print(res)  # 列印返回結果

 # 列印內容如下
PID:6180 run
PID:9728 run
[0, 1, 4]

因為是進程池的同步,所以進程時的執行順序是有序的,並且必須一個進程執行後才執行下一個進程。

進程池的非同步示例: 

import os,time
from multiprocessing import Pool
def work(n):
    print("PID:%s run" %os.getpid())
    time.sleep(1)
    return n ** 2

if __name__ == "__main__":
    p = Pool(3)  # 開啟進程池
    res = []
    for i in range(5):
        # 進程非同步模式
        res.append(p.apply_async(work,args=(i,)))
    # 因為是非同步,所以開進程會很快
    # 所以我們所有進程結束後在列印結果
    p.close()  # 關閉進程池
    p.join()   # 等待進程池結束,
    for i in res:
        print(i.get(),end=" ")

# 列印內容如下
PID:7512 run
PID:10176 run
PID:7240 run
PID:10176 run
PID:7512 run
0 1 4 9 16

關於進程池有個問題需要註意,在子進程中不能使用input函數() 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 依賴倒置原則DIP(Dependence Inversion Principle) 依賴倒置原則的含義 高層模塊不能依賴低層模塊,二者都應該依賴其抽象。 抽象不應該依賴於細節。 細節應該依賴抽象。 什麼是 高層模塊?低層模塊 ? 每一個原子邏輯就是低層模塊,原子邏輯再組就是高層模塊。 什麼是 抽象和 ...
  • //javascript代碼 $$(".bb").addEvent('change',function(e){ var order_item_id = this.get('order_item_id'); var product_name= this.value; new Request.JSON(... ...
  • AQS 概述 AQS(隊列同步器,AbstractQueuedSynchronizer),是用來構建鎖或其他同步組件的核心基礎框架(比如 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch) AQS的底層結構是:一個整型變數st ...
  • 你是否想過生成一份屬於你的微信個人數據報告,瞭解你的微信社交歷史。現在,我們基於python對微信好友進行全方位數據分析,包括:昵稱、性別、年齡、地區、備註名、個性簽名、頭像、群聊、公眾號等。 其中,在分析好友類型方面,主要統計出你的陌生人、星標好友、不讓他看我的朋友圈的好友、不看他的朋友圈的好友... ...
  • go 調用windows dll 的方法 ,代碼如下: ...
  • 第一個自己手寫的代碼~ If 與 Elif #pass 用法 寫代碼前加的兩行首碼 第二行顯示的告訴python解釋器,用什麼編碼來執行源代碼,一般在python2使用,python3可用可不用。 ...
  • unsafe.Pointer其實就是類似C的void *,在golang中是用於各種指針相互轉換的橋梁。uintptr是golang的內置類型,是能存儲指針的整型,uintptr的底層類型是int,它和unsafe.Pointer可相互轉換。uintptr和unsafe.Pointer的區別就是:u ...
  • 時隔三個多月,我終於想起我還有個博客,其實也不是忘了我這個博客,只是平時工作繁忙沒時間去寫博客,故今晚騰出時間來記錄一下上次工作中遇到的一個問題,給園友們分享出來,以免入坑。 上個星期在工作中使用JdbcTemplate執行了一個select * from table where id in (?, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...