Posix消息隊列

来源:https://www.cnblogs.com/songhe364826110/archive/2019/09/16/11529908.html
-Advertisement-
Play Games

[TOC] 1. 概述 消息隊列可認為是一個消息鏈表,隊列中的每個消息具有如下屬性: 消息優先順序,由發送者賦予 消息數據長度,可以為0 消息數據(如果消息數據長度大於0) Posix消息隊列主要用於線程間消息的傳遞: A線程向隊列中放置消息,B線程從隊列中取出消息 A線程向隊列寫入消息之前,不需要B ...


目錄

1. 概述

消息隊列可認為是一個消息鏈表,隊列中的每個消息具有如下屬性:

  • 消息優先順序,由發送者賦予
  • 消息數據長度,可以為0
  • 消息數據(如果消息數據長度大於0)

Posix消息隊列主要用於線程間消息的傳遞:

  • A線程向隊列中放置消息,B線程從隊列中取出消息
  • A線程向隊列寫入消息之前,不需要B線程在該隊列上等待消息的到達
  • A線程向隊列寫入消息之後,B線程可以在之後的某個時刻取出消息
  • A線程只關心向隊列放入消息,B線程只關心從隊列取出消息,A、B兩個線程相互獨立、互不影響

2. Posix消息隊列

創建與打開

mq_open用於創建一個新的消息隊列或打開一個已存在的消息隊列,編譯時需指定鏈接-lrt,下麵其他函數同理。

//成功返回消息隊列描述符,失敗返回-1
mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
  • 當創建一個新的消息隊列時,attr參數用於給新隊列指定某些屬性,若attr為NULL,則使用預設屬性
  • mq_open的返回值稱為消息隊列描述符,它的類型取決於系統實現,可能是整型或指針
  • Linux下的Posix消息隊列創建在虛擬文件系統中,正常情況下是不可見的,需要掛載到/dev/mqueue/目錄才可以查看
mkdir /dev/mqueue 
mount -t mqueue none /dev/mqueue

關閉與刪除

mq_close用於關閉已打開的消息隊列,mq_unlink用於從系統中刪除消息隊列。

//兩個函數返回值:成功返回0,失敗返回-1
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);

關閉與刪除機制已在Posix信號量中講過,這裡不再贅述。

消息隊列屬性

獲取屬性

//成功返回0,失敗返回-1
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

mq_getattr用於獲取消息隊列的四個屬性,這四個屬性定義在struct mq_attr結構體中。

struct mq_attr
{
    long mq_flags;    //非阻塞標誌,可設0或O_NONBLOCK,由且僅由mq_setattr設置
    long mq_maxmsg;   //隊列中最大消息條數,由mq_open在創建新隊列時設置
    long mq_msgsize;  //消息最大長度,由mq_open在創建新隊列時設置
    long mq_curmsgs;  //隊列中當前消息條數,只能獲取不能設置
};

設置屬性

//成功返回0,失敗返回-1
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oldattr);

在消息隊列的四個屬性中:

  • mq_curmsgs只能獲取不能設置
  • mq_flags只能通過mq_setattr設置,該函數的唯一作用就是設置或清除非阻塞標誌
  • mq_maxmsgmq_msgsize只能在創建新隊列時由mq_open的attr參數設置
  • mq_maxmsg和mq_msgsize必須同時指定,否則mq_open創建新隊列會失敗
#include <mqueue.h>
#include <stdio.h>

int main()
{
    struct mq_attr attr;
    struct mq_attr attr1;
    mqd_t mqdes;
    
    /*
     * 在我的系統上,消息隊列預設屬性為:mq_maxmsg = 10, mq_msgsize = 8192.
     * 這裡顯式指定attr.mq_maxmsg = 5,mq_msgsize不賦值,會導致mq_open失敗.
    */
    attr.mq_maxmsg = 5;
    //attr.mq_msgsize = 8192;
     
    if ((mqdes = mq_open("/mqueue1", O_RDWR | O_CREAT, 0666, &attr)) == -1)
    {
        printf("mq_open create new mqueue failed because attr.mq_msgsize not specified.\n");
    }
    
    mq_getattr(mqdes, &attr1);   
    printf("%ld %ld\n", attr1.mq_maxmsg, attr1.mq_msgsize);
    
    mq_close(mqdes);
    mq_unlink("/mqueue1"); 
    
    return 0;
}

消息發送與接收

mq_send用於向隊列中放入一個消息,mq_receive用於從隊列中取走一個消息。

//成功返回0,失敗返回-1
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

//成功返回消息數據長度,失敗返回-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);
  • prio是消息優先順序,範圍為[0, MQ_PRIO_MAX - 1],prio值越大,消息優先順序越高
  • 如果不關心消息優先順序,就分別給mq_send和mq_receive的prio參數傳0和傳NULL
  • mq_receive總是返回最高優先順序的最早消息
  • mq_receive的參數len指的是接收緩衝區大小,它必須大於等於該隊列的mq_msgsize,否則會立即出錯返回
  • 可以先調用mq_getattr獲得mq_msgsize的值,然後再動態分配接收緩衝區

3. 消息隊列限制

消息隊列共有4個屬性受到系統限制:

  • msg_max
  • msgsize_max
  • MQ_OPEN_MAX
  • MQ_PRIO_MAX

其中,前兩個限制和應用程式的開發密切相關,既要保證隊列不會被填滿,又要保證消息長度不會超過允許的最大值,必要時可以修改Linux內核源碼來改變限制值。

查看限制的方法:

cat /proc/sys/fs/mqueue/msg_max        //struct mq_attr.mq_maxmsg <= msg_max 
cat /proc/sys/fs/mqueue/msgsize_max    //struct mq_attr.mq_msgsize <= msgsize_max
cat /proc/sys/fs/mqueue/queues_max

4. 生產者消費者問題——Posix消息隊列實現

不難看出,Posix消息隊列的基本使用模型就是一個典型的生產者消費者問題:

  • 如果使用無優先順序的消息,那麼消息是按照先進先出的順序處理的
  • 因此,Posix消息隊列也可以作為生產者消費者問題中的隊列緩衝區

單生產者 + 單消費者

我們把前面寫的生產者消費者代碼拿來稍微改一下,先來看一個單生產者 + 單消費者的例子。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <mqueue.h>

#define POSIX_QUEUE   "/mqueue"
#define MAX_THREADS   1
#define MAX_ITEMS     1000000

struct Shared
{
    mqd_t mqdes;
    int nput;
    int nval;
};

struct Shared shared;

void shared_init()
{
    shared.mqdes = mq_open(POSIX_QUEUE, O_RDWR | O_CREAT, 0666, NULL); //在我的系統中,Posix消息隊列最大容量為10
    mq_unlink(POSIX_QUEUE);
}

void shared_destroy()
{
    mq_close(shared.mqdes);
}

void *produce(void *arg)
{
    while (1)
    {
        if (shared.nput >= MAX_ITEMS)
        {
            pthread_exit(NULL);
        }

        mq_send(shared.mqdes, (char *)&shared.nval, sizeof(shared.nval), 0);

        shared.nput++;
        shared.nval++;

        /* 線程tid_produce[i]每執行一次,就累加count[i]的值 */
        *((int *)arg) += 1;
    }

    pthread_exit(NULL);
}

void *consume(void *arg)
{
    struct mq_attr attr;
    int nval;
    int i;

    mq_getattr(shared.mqdes, &attr);
    printf("system defaut mq_maxmsg = %ld, mq_msgsize = %ld\n", attr.mq_maxmsg, attr.mq_msgsize);

    for (i = 0; i < MAX_ITEMS; i++)
    {
        //消費者線程按順序取出消息,根據mq_getattr返回結果來設置mq_receive的參數len
        mq_receive(shared.mqdes, (char *)&nval, attr.mq_msgsize, NULL);

        if (nval != i)
        {
            printf("error: buff[%d] = %d\n", i, nval);
        }
    }

    pthread_exit(NULL);
}

int main()
{
    pthread_t tid_produce[MAX_THREADS];
    pthread_t tid_consume;
    int count[MAX_THREADS];
    struct timeval start_time;
    struct timeval end_time;
    float time_sec;
    int i;

    shared_init();
    gettimeofday(&start_time, NULL);

    for (i = 0; i < MAX_THREADS; i++)
    {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }

    pthread_create(&tid_consume, NULL, consume, NULL);

    for (i = 0; i < MAX_THREADS; i++)
    {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]); //輸出每個線程的執行次數
    }

    pthread_join(tid_consume, NULL);

    gettimeofday(&end_time, NULL);
    time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0;
    printf("%d produce and %d consume total spend %.2f second\n", MAX_THREADS, 1, time_sec);

    shared_destroy();

    return 0;
}

註意觀察代碼和運行結果,可以發現生產者和消費者之間並沒有做同步處理,但仍然得到了正確結果,這是因為當沒有設置非阻塞標誌時,Posix消息隊列自帶隱式同步機制:

  • 如果消息隊列滿,mq_send會阻塞,直到隊列中有空位置
  • 如果消息隊列空,mq_receive會阻塞,直到隊列中有數據

這正是單生產者 + 單消費者模型唯一需要處理的同步問題,因此不需要應用程式再進行顯式同步。
顯式同步,指的是使用互斥鎖、條件變數、信號量等方式進行的同步;Posix自帶的同步在內核中進行,對於應用程式來說是不可見的,因此稱其為隱式同步。

多生產者 + 單消費者

當有多個生產者時,Posix消息隊列自帶的同步機制就不夠用了,需要顯式處理生產者線程之間的同步問題,我們使用互斥鎖實現這個功能。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <mqueue.h>

#define POSIX_QUEUE   "/mqueue"
#define MAX_THREADS   10
#define MAX_ITEMS     1000000

struct Shared
{
    pthread_mutex_t mutex;
    mqd_t mqdes;
    int nput;
    int nval;
};

struct Shared shared;

void shared_init()
{
    pthread_mutex_init(&shared.mutex, NULL);
    shared.mqdes = mq_open(POSIX_QUEUE, O_RDWR | O_CREAT, 0666, NULL); //在我的系統中,Posix消息隊列最大容量為10
    mq_unlink(POSIX_QUEUE);
}

void shared_destroy()
{
    pthread_mutex_destroy(&shared.mutex);
    mq_close(shared.mqdes);
}

void *produce(void *arg)
{
    while (1)
    {
        pthread_mutex_lock(&shared.mutex);

        if (shared.nput >= MAX_ITEMS)
        {
            pthread_mutex_unlock(&shared.mutex);
            pthread_exit(NULL);
        }

        //生產者線程依次累加nval的值,並以無優先順序消息方式放入消息隊列
        mq_send(shared.mqdes, (char *)&shared.nval, sizeof(shared.nval), 0);

        shared.nput++;
        shared.nval++;

        pthread_mutex_unlock(&shared.mutex);

        /* 線程tid_produce[i]每執行一次,就累加count[i]的值 */
        *((int *)arg) += 1;
    }

    pthread_exit(NULL);
}

void *consume(void *arg)
{
    struct mq_attr attr;
    int nval;
    int i;

    mq_getattr(shared.mqdes, &attr);
    printf("system defaut mq_maxmsg = %ld, mq_msgsize = %ld\n", attr.mq_maxmsg, attr.mq_msgsize);

    for (i = 0; i < MAX_ITEMS; i++)
    {
        mq_receive(shared.mqdes, (char *)&nval, attr.mq_msgsize, NULL); //根據mq_getattr返回結果來設置mq_receive的參數len

        if (nval != i)
        {
            printf("error: buff[%d] = %d\n", i, nval);
        }
    }

    pthread_exit(NULL);
}

int main()
{
    pthread_t tid_produce[MAX_THREADS];
    pthread_t tid_consume;
    int count[MAX_THREADS];
    struct timeval start_time;
    struct timeval end_time;
    float time_sec;
    int i;

    shared_init();
    gettimeofday(&start_time, NULL);

    for (i = 0; i < MAX_THREADS; i++)
    {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }

    pthread_create(&tid_consume, NULL, consume, NULL);

    for (i = 0; i < MAX_THREADS; i++)
    {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]); //輸出每個線程的執行次數
    }

    pthread_join(tid_consume, NULL);

    gettimeofday(&end_time, NULL);
    time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0;
    printf("%d produce and %d consume total spend %.2f second\n", MAX_THREADS, 1, time_sec);

    shared_destroy();

    return 0;
}



遇到的問題(原因暫時未知):

  • 若在consume()的mq_receive()前後上鎖解鎖,程式會卡死

5. 效率對比

和生產者消費者一節中的解決方案相比,Posix消息隊列的效率比信號量差,比條件變數高:

  • 10個生產者,互斥鎖 + Posix消息隊列,2.5S完成
  • 10個生產者,互斥鎖 + 條件變數 + 隊列緩衝區,3.5S內完成
  • 10個生產者,互斥鎖 + Posix有名信號量 + 隊列緩衝區,2S內完成
  • 10個生產者,互斥鎖 + Posix無名信號量 + 隊列緩衝區,1.5S內完成

而且,同使用Posix消息隊列,10個生產者 + 互斥鎖需要2.5S,而單生產者無鎖1S以內就可以完成,可見互斥鎖的開銷使得多線程反而降低了效率。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、常量、只讀欄位、靜態欄位和靜態只讀欄位對比 常量、只讀欄位、靜態欄位和靜態只讀欄位對比表: 常量、只讀欄位、靜態欄位和靜態只讀欄位適用數據: 1、常量適用於定義時就已知且不能改變的數據。 2、只讀欄位適用於通過第三方在運行時賦值且不能改變的數據(對象獨享)。 3、靜態只讀欄位適用於通過第三方在運 ...
  • 一、前言 上一篇我們對錶達式樹有了初步的認識,這裡我們將對錶達式樹進行遍歷,只有弄清楚了他的運行原理,我們才可以對他進行定製化修改。 表達式系列目錄 C# 表達式樹講解(一) C# 表達式樹遍歷(二) C# 表達式樹分頁擴展(三) C# 表達式樹Lambda擴展(四) 二、表達式樹的遍歷 要查看表達 ...
  • <Window x:Class="WpfApp53.MainWindow" xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" xmlns:x="http://schemas.microsoft.com/winfx/20... ...
  • 此處項目路徑是:C:\GetPathInfo\ ...
  • asp.net core 使用 signalR(一) Intro SignalR 是什麼? ASP.NET Core SignalR 是一個開源代碼庫,它簡化了嚮應用添加實時 Web 功能的過程。 實時 Web 功能使伺服器端代碼能夠即時將內容推送到客戶端。 SignalR 的適用對象: 需要來自服 ...
  • 如果已經看過本章節:目錄傳送門:這是目錄鴨~ 1.場景搭建: 首先我們去AssetStore逛淘寶~~~ 我淘到的是這個資源,其他好看的場景(消耗不高的都行)。 然後我們導入了這個資源後,把資源根文件夾的名字改為Select,把Demo場景文件的名字改為Selection,我這樣修改的emmm... ...
  • static void Main(string[] args) { int i = 0; Parallel.For(0, 100, (x) => { Console.WriteLine(i); i++; }); Console.WriteLine($"i is {i}"); Console.Read ...
  • 本教程僅用作個人學習,請勿用於商業獲利,造成後果自負!!! Pycharm安裝 在這插一個小話題哈,Pycharm只是一個編譯器,並不能代替Python,如果要使用Python,還是需要安裝Python的哈 1、Pycharm下載安裝 Pycharm下載 Pycharm官網:http://www.j ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...