python多進程通信實例分析

来源:https://www.cnblogs.com/yssjun/archive/2019/08/31/11438850.html
-Advertisement-
Play Games

操作系統會為每一個創建的進程分配一個獨立的地址空間,不同進程的地址空間是完全隔離的,因此如果不加其他的措施,他們完全感覺不到彼此的存在。那麼進程之間怎麼進行通信?他們之間的關聯是怎樣的?實現原理是什麼?本文就來藉助Python簡單的聊一下進程之間的通信?還是那句話,原理是相同的,希望能透過具體的例子 ...


操作系統會為每一個創建的進程分配一個獨立的地址空間,不同進程的地址空間是完全隔離的,因此如果不加其他的措施,他們完全感覺不到彼此的存在。那麼進程之間怎麼進行通信?他們之間的關聯是怎樣的?實現原理是什麼?本文就來藉助Python簡單的聊一下進程之間的通信?還是那句話,原理是相同的,希望能透過具體的例子來體會一下本質的東西。

 下麵儘量以簡單的方式介紹一下每一類通信方式,具體的細節可以參照文檔使用;

1. 管道

先來看一下最簡單、古老的一種IPC:管道。通常指的是無名管道,本質上可以看做一種文件,只存在於記憶體當中,不會存檔。不同進程通過系統提供的介面來向管道中讀取或者寫入數據。

也就是說我們通過這樣一個中間介質為進程提供交流的方式。無名管道的局限在於一般只用於有直接關聯關係的父子進程。下麵通過一個簡單的例子來看一下其用法。

from multiprocessing import Process, Pipe

def pstart(pname, conn):
    conn.send("Data@subprocess")
    print(conn.recv())          # Data@parentprocess

if __name__ == '__main__':
    conn1, conn2 = Pipe(True)
    sub_proc = Process(target=pstart, args=('subprocess', conn2,))
    sub_proc.start()
    print (conn1.recv())        # Data@subprocess
    conn1.send("Data@parentprocess")
    sub_proc.join()

管道通信三步曲:

  1. 創建Pipe,得到兩個connection對象conn1和conn2;
  2. 父進程持有conn1,將conn2傳遞給子進程;
  3. 父子進程通過對持有的connection對象進行send和recv操作以進行數據傳遞和接受;

上面我們創建的是全雙工管道,也可以創建半雙工管道,具體使用可以參照官網描述:

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

If duplex is True (the default) then the pipe is bidirectional. If duplex is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.

2. 具名管道(FIFO)

上面介紹的管道主要用於有直接關係的進程,局限性比較大。下麵來看一下可以在任意進程間進行通信的具名管道。

由於window平臺上os模塊沒有mkfifo屬性,因此這個例子只能在linux上運行(測試環境 CentOS 7, Python 2.7.5):

#!/usr/bin/python
import os, time
from multiprocessing import Process

input_pipe = "./pipe.in"
output_pipe = "./pipe.out"

def consumer():
    if os.path.exists(input_pipe):
        os.remove(input_pipe)
    if os.path.exists(output_pipe):
        os.remove(output_pipe)

    os.mkfifo(output_pipe)
    os.mkfifo(input_pipe)
    in1 = os.open(input_pipe, os.O_RDONLY)        # read from pipe.in
    out1 = os.open(output_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR)
    while True:
        read_data = os.read(in1, 1024)
        print("received data from pipe.in: %s @consumer" % read_data)
        if len(read_data) == 0:
            time.sleep(1)
            continue

        if "exit" in read_data:
            break
        os.write(out1, read_data)
    os.close(in1)
    os.close(out1)

def producer():
    in2 = None
    out2 = os.open(input_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR)

    for i in range(1, 4):
        msg = "msg " + str(i)
        len_send = os.write(out2, msg)
        print("------product msg: %s by producer------" % msg)
        if in2 is None:
            in2 = os.open(output_pipe, os.O_RDONLY)        # read from pipe.out
        data = os.read(in2, 1024)
        if len(data) == 0:
            break
        print("received data from pipe.out: %s @producer" % data)
        time.sleep(1)

    os.write(out2, 'exit')
    os.close(in2)
    os.close(out2)

if __name__ == '__main__':
    pconsumer = Process(target=consumer, args=())
    pproducer = Process(target=producer, args=())
    pconsumer.start()
    time.sleep(0.5)
    pproducer.start()
    pconsumer.join()
    pproducer.join()

運行流程如下:

 每一輪的過程如下:

  1. producer進程往pipe.in文件中寫入消息數據;
  2. consumer進程從pipe.in文件中讀入消息數據;
  3. consumer進程往pipe.out文件中寫入回執消息數據;
  4. producer進程從pipe.out文件中讀出回執消息數據;

結果如下:

[shijun@localhost python]$ python main.py
------product msg: msg 1 by producer------
received data from pipe.in: msg 1 @consumer
received data from pipe.out: msg 1 @producer
------product msg: msg 2 by producer------
received data from pipe.in: msg 2 @consumer
received data from pipe.out: msg 2 @producer
------product msg: msg 3 by producer------
received data from pipe.in: msg 3 @consumer
received data from pipe.out: msg 3 @producer
received data from pipe.in: exit @consumer
View Code

兩個進程沒有直接的關係,每個進程有一個讀文件和寫文件,如果兩個進程的讀寫文件是關聯的,就可以進行通信。

 3. 消息隊列(Queue)

進程之間通過向隊列中添加數據或者從隊列中獲取數據來進行消息數據的傳遞。下麵是一個簡單的例子。

from multiprocessing import Process, Queue
import time

def producer(que):
    for product in ('Orange', 'Apple', ''):
        print('put product: %s to queue' % product)
        que.put(product)
        time.sleep(0.5)
        res = que.get()
        print('consumer result: %s' % res)

def consumer(que):
    while True:
        product = que.get()
        print('get product:%s from queue' % product)
        que.put('suc!')
        time.sleep(0.5)
        if not product:
            break

if __name__ == '__main__':
    que = Queue(1)
    p = Process(target=producer, args=(que,))
    c = Process(target=consumer, args=(que,))
    p.start()
    c.start()
    p.join()
    c.join()

這個例子比較簡單,queue的具體用法可以參考一下官網。

結果:

put product: Orange to queue
consumer result: suc!
put product: Apple to queue
consumer result: suc!
put product:  to queue
consumer result: suc!
get product:Orange from queue
get product:Apple from queue
get product: from queue
View Code

這裡有幾點需要註意下:

  1. 可以指定隊列的容量,如果超出容量會有異常:raise Full;
  2. 預設put和get均會阻塞當前進程;
  3. 如果put沒有設置成阻塞,那麼可能自己從隊列中取出自己放入的數據;

4. 共用記憶體

共用記憶體是一種常用的,高效的進程之間的通信方式,為了保證共用記憶體的有序訪問,需要對進程採取額外的同步措施。

下麵的這個例子僅僅簡單的演示了Python中如何在不同進程間使用共用記憶體進行通信的。

from multiprocessing import Process
import mmap
import contextlib
import time

def writer():
    with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_WRITE)) as mem:
        for share_data in ("Hello", "Alpha_Panda"):
            mem.seek(0)
            print('Write data:== %s == to share memory!' % share_data)
            mem.write(str.encode(share_data))
            mem.flush()
            time.sleep(0.5)

def reader():
    while True:
        invalid_byte, empty_byte = str.encode('\x00'), str.encode('')
        with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_READ)) as mem:
            share_data = mem.read(1024).replace(invalid_byte, empty_byte)
            if not share_data:
                """ 當共用記憶體沒有有效數據時結束reader """
                break
            print("Get data:== %s == from share memory!" % share_data.decode())
        time.sleep(0.5)


if __name__ == '__main__':
    p_reader = Process(target=reader, args=())
    p_writer = Process(target=writer, args=())
    p_writer.start()
    p_reader.start()
    p_writer.join()
    p_reader.join()

執行結果:

Write data:== Hello == to share memory!
Write data:== Alpha_Panda == to share memory!
Get data:== Hello == from share memory!
Get data:== Alpha_Panda == from share memory!

下麵簡單的來說明一下共用記憶體的原理;

進程虛擬地址到物理地址的一個映射關如下:

 上面這個圖已經很明白的展示了共用記憶體的原理。

左邊是正常情況下,不同進程的線性地址空間被映射到不同的物理記憶體頁,這樣不管其他進程怎麼修改物理記憶體,都不會影響到其他進程;

右邊表示的是進程共用記憶體的情況下,不同進程的部分線性地址會被映射到同一物理頁,一個進程對這個物理頁的修改,會對另一個進程立即可見;

當然潛在的問題就是要採取進程同步措施,也就是對共用記憶體的訪問必須是互斥的。這個可以藉助信號量來實現。

5. socket通信

最後再來介紹一種可以跨主機的進程間通信:socket。

懂網路編程的人,對這個應該都比較熟悉。socket不僅可以跨主機進行通信,甚至有時候可以使用socket在同一主機的不同進程間進行通信。

這部分代碼比較簡單常見,這裡僅僅使用流程圖來表示一下socket通信的流程及相關介面。

 上圖表示客戶端上某進程使用socket和伺服器上監聽程式進行socket通信的一個流程。

 小結

到這裡關於常見的進程間通信相關的概念和實例均簡單介紹了一下。希望本文能讓你對進程間通信有一個更深入的理解和認識。

結合之前幾篇介紹線程、進程概念及線程間同步的一些措施的介紹,相信應該對線程和進程相關概念有一個簡單清晰的認識了。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 源碼說明:這套系統比淘小秘、淘小白、E速達、好多了他們還要電腦開機並掛著軟體才能自動發貨,而這套系統完全可以秒殺這一切。安裝之前說明,如果你沒有新浪賬號的話請註冊好新浪賬號,網站名稱LOGO自備。本程式不限制功能變數名稱,單用戶版本,支持sae、bae、虛擬主機等使用之後買家拍下付款到網站提取了貨,淘寶就自 ...
  • Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured. ...
  • lambda表達式主要用於短小的回調函數。 形如: arg1[,arg2][,arg3][...]為參數列表。 expression表達式語句中不能出現多條語句。 lambda應用舉例: 計算最大值: 結果: 222 計算輸入的和: 結果: 232 ...
  • 題目傳送門 簡單分析一下題目,可以發現: 第一個數最小為123 第一個數最大為333,再大第三個數就是四位數了 所以可以這樣做: 然後我們可以將每個數的各個數位分離出來,再加到標記數組裡面: code: 另外,為了防止複製,我悄悄地在代碼中留了個小錯誤,能不能直接提交,就看你的啦! PS:如果發現了 ...
  • 一:代碼規範 參考 :https://zh-google-styleguide.readthedocs.io/en/latest/google-cpp-styleguide/headers/# 1 . 頭文件 1.1. Self-contained 頭文件 頭文件應該能夠自給自足(self-cont ...
  • [TOC] Object 類 一、clone 1. 完整形式 2. 此方法用來實現對象的複製,如果要調用這個方法,必須實現 介面和覆蓋 方法,還需要在使用克隆的時候處理 ,因為此異常是非運行時異常。 3. 預設的覆寫,只是 淺拷貝 ,也就是只拷貝 基本數據類型 ,而對於對象的引用數據類型,也只是複製 ...
  • 隨著網路的發展,在Web開發中,系統的國際化需求已經變得非常的普遍。本文主要講解SpringMVC框架對多語言的支持,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 題目背景 給一組 N 枚郵票的面值集合(如,{1 分,3 分})和一個上限 K —— 表示信封上能夠貼 K 張郵票。計算從 1 到 M 的最大連續可貼出的郵資。 給一組 N 枚郵票的面值集合(如,{1 分,3 分})和一個上限 K —— 表示信封上能夠貼 K 張郵票。計算從 1 到 M 的最大連續可 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...