前言 上篇博客的內容是守護進程,對於操作系統來說可以在後臺執行一些程式.這篇的內容是互斥鎖,在上上篇博客上說到進程記憶體空間互相隔離,所以可以通過共用文件來操作同一個文件,那麼這樣操作的話會發生什麼呢? 鎖 互斥鎖 多個進程需要共用數據時,先將其鎖定,此時資源狀態為'鎖定',其他進程不能更改;知道該進 ...
前言
上篇博客的內容是守護進程,對於操作系統來說可以在後臺執行一些程式.這篇的內容是互斥鎖,在上上篇博客上說到進程記憶體空間互相隔離,所以可以通過共用文件來操作同一個文件,那麼這樣操作的話會發生什麼呢?
鎖
互斥鎖
多個進程需要共用數據時,先將其鎖定,此時資源狀態為'鎖定',其他進程不能更改;知道該進程釋放資源,將資源的狀態變成非'鎖定',其他的線程才能再次鎖定該資源.互斥鎖保證了每次只有一個進程進入寫入操作,從而保證了多進程情況下數據的正確性.
我們使用一個demo 來模擬多個進程操作同一個文件:
import json
import time,random
from multiprocessing import Process
def show_tickets(name):
time.sleep(random.randint(1,3))
with open('ticket.json', 'rt', encoding='utf-8') as f:
data = json.load(f)
print('%s 查看 剩餘票數: %s' % (name, data['count']))
def buy_ticket(name):
with open('ticket.json', 'rt', encoding='utf-8') as f:
dic = json.load(f)
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.randint(1,3))
with open('ticket.json', 'wt', encoding='utf-8') as f:
json.dump(dic, f)
print('%s: 購票成功' % name)
def task(name):
show_tickets(name)
buy_ticket(name)
if __name__ == '__main__':
for i in range(1,11):
p = Process(target=task, args=(i,))
p.start()
運行結果:
在 ticket.json 裡面只有一張票,結果卻造成多個用戶購買成功,這很顯然是不符合實際情況的.
那麼怎麼解決呢?如果多個進程對同一個文件進行讀操作可以不進行限制,但是對同一個文件進行寫操作就必要要進行限制,不可以同時多個人對同一個文件進行寫操作.python 在多進程模塊里提供一個類, Lock 類,當進程獲取到鎖的時候其他的進程就必須要等待鎖釋放才可以進行爭搶,在這個例子裡面就可以加上一把鎖來保護數據安全.
from multiprocessing import Process, Lock
import json,time,random
def show_tickets(name):
time.sleep(random.randint(1,3))
with open('ticket.json', 'rt', encoding='utf-8') as f:
data = json.load(f)
print('%s 查看 剩餘票數: %s' % (name, data['count']))
def buy_ticket(name):
time.sleep(random.randint(1,3))
with open('ticket.json', 'rt', encoding='utf-8') as f:
dic = json.load(f)
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.randint(1,3))
with open('ticket.json', 'wt', encoding='utf-8') as f:
json.dump(dic, f)
print('%s: 購票成功' % name)
def task(name,lock):
show_tickets(name)
lock.acquire()
buy_ticket(name)
lock.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(1,11):
p = Process(target=task, args=(i,mutex))
p.start()
運行結果:
這樣加了鎖(互斥鎖)就可以解決同時操作同一個文件造成的數據混亂問題了.
當使用多進程開發時,如果多個進程同時讀寫同一個資源,可能會造成數據的混亂,為了防止發生問題,使用鎖,或者使用 Process 的方法 join 將並行變為串列.
join 和鎖的區別
- join 人為控制進程的執行順序
- join 把整個進程全部串列,而鎖可以指定部分代碼串列
一旦串列,效率就會降低,一旦並行,數據就可能會出錯.
進程間通信
進程間通信( internal-process communication),我們在開啟子進程是希望子進程幫助完成任務,很多情況下需要將數據返回給父進程,然而進程間記憶體是物理隔離的.
解決辦法:
- 將共用數據放到文件中
- 管道 多進程模塊中的一個類,需要有父子關係
- 共用一快記憶體區域 需要操作系統分配
管道通信
Pipe類返回一個由管道連接的連接對象,預設情況下為雙工:
from multiprocessing import Process,Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
運行結果:
[42, None, 'hello']
實例化 Pipe 類會返回兩個連接對象表示管道的兩端.每個連接對象都有 send() 和 recv() 方法(及其他).請註意,如果兩個進程同時嘗試讀寫管道的同一端,則管道中的數據可能會損壞.當然,同時使用管道的不同端部的過程不存在損壞的風險.
共用記憶體通信
Queue 通信
Queue類會生成一個先進先出的容器,通過往隊列中存取數據而進行進程間通信.
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get())
p.join()
運行結果:
[42, None, 'hello']
隊列其他特性
# 阻塞操作 必須掌握
q = Queue(3)
# # 存入數據
q.put("hello",block=False)
q.put(["1","2","3"],block=False)
q.put(1,block=False)
# 當容量滿的時候 再執行put 預設會阻塞直到執行力了get為止
# 如果修改block=False 直接報錯 因為沒地方放了
# q.put({},block=False)
# # # 取出數據
print(q.get(block=False))
print(q.get(block=False))
print(q.get(block=False))
# 對於get 當隊列中中沒有數據時預設是阻塞的 直達執行了put
# 如果修改block=False 直接報錯 因為沒數據可取了
print(q.get(block=False))
# 瞭解
q = Queue(3)
q.put("q",timeout=3)
q.put("q2",timeout=3)
q.put("q3",timeout=3)
# 如果滿了 願意等3秒 如果3秒後還存不進去 就炸
# q.put("q4",timeout=3)
print(q.get(timeout=3))
print(q.get(timeout=3))
print(q.get(timeout=3))
# 如果沒了 願意等3秒 如果3秒後還取不到數據 就炸
print(q.get(timeout=3))
Manager 通信
demo
from multiprocessing import Process,Manager
import time
def task(dic):
print("子進程xxxxx")
# li[0] = 1
# print(li[0])
dic["name"] = "xx"
if __name__ == '__main__':
m = Manager()
# li = m.list([100])
dic = m.dict({})
# 開啟子進程
p = Process(target=task,args=(dic,))
p.start()
time.sleep(3)
可以創建一片共用記憶體區域用來存取數據.
生產者消費者模型
什麼是生產者消費者模型
在軟體開發過程中,經常碰到這樣的場景:
某些模塊負責生產數據,這些數據由其他模塊來負責處理(此處的模塊可能是:函數,線程,進程等).生產數據的模塊稱為生產者,而處理數據的模塊稱為消費者.在生產者與消費者之間的緩衝區稱之為倉庫.生產者負責往倉庫運輸商品,而消費者負責從倉庫里取出商品,這就構成了生產者消費者模型.
結構圖如下:
為了便於理解,我們舉一個寄信的例子。假設你要寄一封信,大致過程如下:
- 你把信寫好——相當於生產者生產數據;
- 你把信放入郵箱——相當於生產者把數據放入緩衝區;
- 郵遞員把信從郵箱取出,做相應處理——相當於消費者把數據取出緩衝區,處理數據.
生產者消費者模型的優點
- 解耦
假設生產者和消費者分別是兩個線程.如果讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(耦合).如果未來消費者的代碼發生改變,可能會影響到生產者的代碼.而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了.
舉個例子,我們去郵局投遞信件,如果不使用郵箱(也就是緩衝區,你必須得把信直接交給郵遞員.有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他.這就產生了你和郵遞員之間的依賴(相當於生產者和消費者的強耦合).萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者代碼).而郵箱相對來說比較固定,你依賴它的成本就比較低(相當於和緩衝區之間的弱耦合).
- 併發
由於生產者與消費者是兩個獨立的併發體,它們之間是使用緩衝區通信的,生產者只需要往緩衝區里丟數據,就可以接著生產下一個數據了,而消費者只需要從緩衝區拿數據即可,這樣就不會因為彼此的處理速度而發生阻塞.
繼續上面的例子,如果沒有郵箱,就得在郵局等郵遞員,知道他回來,把信交給他,這期間我們什麼事都幹不了(生產者阻塞).或者郵遞員挨家挨戶問,誰要寄信(消費者阻塞).
- 支持忙閑不均
當生產者製造數據快的時候,消費者來不及處理,為處理的數據可以暫時存在緩衝區中,慢慢處理,而不至於因為消費者的性能過慢造成數據丟失或影響生產者生產數據.
再拿寄信的例子,假設郵遞員一次只能帶走1000封信,萬一碰上情人節或者其他的緊急任務,需要寄出的信超過了1000封,這個時候郵箱作為緩衝區就派上用場了.郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時在拿走.
使用
from multiprocessing import Process, Queue
import time, random
def producer(name, food, q):
for i in range(10):
res = '%s %s' % (food, i)
time.sleep(random.randint(1,3))
q.put(res)
print('%s 生產了 %s' % (name, res))
def consumer(name, q):
while True:
res = q.get()
time.sleep(random.randint(1,3))
print('%s 消費了 %s' % (name, res))
if __name__ == '__main__':
q = Queue()
p = Process(target=producer, args=('musibii', '