[TOC] 1. 概述 消息隊列可認為是一個消息鏈表,隊列中的每個消息具有如下屬性: 消息優先順序,由發送者賦予 消息數據長度,可以為0 消息數據(如果消息數據長度大於0) Posix消息隊列主要用於線程間消息的傳遞: A線程向隊列中放置消息,B線程從隊列中取出消息 A線程向隊列寫入消息之前,不需要B ...
目錄
1. 概述
消息隊列可認為是一個消息鏈表,隊列中的每個消息具有如下屬性:
- 消息優先順序,由發送者賦予
- 消息數據長度,可以為0
- 消息數據(如果消息數據長度大於0)
Posix消息隊列主要用於線程間消息的傳遞:
- A線程向隊列中放置消息,B線程從隊列中取出消息
- A線程向隊列寫入消息之前,不需要B線程在該隊列上等待消息的到達
- A線程向隊列寫入消息之後,B線程可以在之後的某個時刻取出消息
- A線程只關心向隊列放入消息,B線程只關心從隊列取出消息,A、B兩個線程相互獨立、互不影響
2. Posix消息隊列
創建與打開
mq_open
用於創建一個新的消息隊列或打開一個已存在的消息隊列,編譯時需指定鏈接-lrt,下麵其他函數同理。
//成功返回消息隊列描述符,失敗返回-1
mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
- 當創建一個新的消息隊列時,attr參數用於給新隊列指定某些屬性,若attr為NULL,則使用預設屬性
- mq_open的返回值稱為消息隊列描述符,它的類型取決於系統實現,可能是整型或指針
- Linux下的Posix消息隊列創建在虛擬文件系統中,正常情況下是不可見的,需要掛載到
/dev/mqueue/
目錄才可以查看
mkdir /dev/mqueue
mount -t mqueue none /dev/mqueue
關閉與刪除
mq_close
用於關閉已打開的消息隊列,mq_unlink
用於從系統中刪除消息隊列。
//兩個函數返回值:成功返回0,失敗返回-1
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
關閉與刪除機制已在Posix信號量中講過,這裡不再贅述。
消息隊列屬性
獲取屬性
//成功返回0,失敗返回-1
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mq_getattr
用於獲取消息隊列的四個屬性,這四個屬性定義在struct mq_attr
結構體中。
struct mq_attr
{
long mq_flags; //非阻塞標誌,可設0或O_NONBLOCK,由且僅由mq_setattr設置
long mq_maxmsg; //隊列中最大消息條數,由mq_open在創建新隊列時設置
long mq_msgsize; //消息最大長度,由mq_open在創建新隊列時設置
long mq_curmsgs; //隊列中當前消息條數,只能獲取不能設置
};
設置屬性
//成功返回0,失敗返回-1
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oldattr);
在消息隊列的四個屬性中:
- mq_curmsgs只能獲取不能設置
- mq_flags只能通過mq_setattr設置,該函數的唯一作用就是設置或清除非阻塞標誌
mq_maxmsg
和mq_msgsize
只能在創建新隊列時由mq_open的attr參數設置- mq_maxmsg和mq_msgsize必須同時指定,否則mq_open創建新隊列會失敗
#include <mqueue.h>
#include <stdio.h>
int main()
{
struct mq_attr attr;
struct mq_attr attr1;
mqd_t mqdes;
/*
* 在我的系統上,消息隊列預設屬性為:mq_maxmsg = 10, mq_msgsize = 8192.
* 這裡顯式指定attr.mq_maxmsg = 5,mq_msgsize不賦值,會導致mq_open失敗.
*/
attr.mq_maxmsg = 5;
//attr.mq_msgsize = 8192;
if ((mqdes = mq_open("/mqueue1", O_RDWR | O_CREAT, 0666, &attr)) == -1)
{
printf("mq_open create new mqueue failed because attr.mq_msgsize not specified.\n");
}
mq_getattr(mqdes, &attr1);
printf("%ld %ld\n", attr1.mq_maxmsg, attr1.mq_msgsize);
mq_close(mqdes);
mq_unlink("/mqueue1");
return 0;
}
消息發送與接收
mq_send
用於向隊列中放入一個消息,mq_receive
用於從隊列中取走一個消息。
//成功返回0,失敗返回-1
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
//成功返回消息數據長度,失敗返回-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);
- prio是消息優先順序,範圍為[0, MQ_PRIO_MAX - 1],prio值越大,消息優先順序越高
- 如果不關心消息優先順序,就分別給mq_send和mq_receive的prio參數傳0和傳NULL
- mq_receive總是返回最高優先順序的最早消息
- mq_receive的參數len指的是接收緩衝區大小,它必須大於等於該隊列的mq_msgsize,否則會立即出錯返回
- 可以先調用mq_getattr獲得mq_msgsize的值,然後再動態分配接收緩衝區
3. 消息隊列限制
消息隊列共有4個屬性受到系統限制:
- msg_max
- msgsize_max
- MQ_OPEN_MAX
- MQ_PRIO_MAX
其中,前兩個限制和應用程式的開發密切相關,既要保證隊列不會被填滿,又要保證消息長度不會超過允許的最大值,必要時可以修改Linux內核源碼來改變限制值。
查看限制的方法:
cat /proc/sys/fs/mqueue/msg_max //struct mq_attr.mq_maxmsg <= msg_max
cat /proc/sys/fs/mqueue/msgsize_max //struct mq_attr.mq_msgsize <= msgsize_max
cat /proc/sys/fs/mqueue/queues_max
4. 生產者消費者問題——Posix消息隊列實現
不難看出,Posix消息隊列的基本使用模型就是一個典型的生產者消費者問題:
- 如果使用無優先順序的消息,那麼消息是按照先進先出的順序處理的
- 因此,Posix消息隊列也可以作為生產者消費者問題中的隊列緩衝區
單生產者 + 單消費者
我們把前面寫的生產者消費者代碼拿來稍微改一下,先來看一個單生產者 + 單消費者的例子。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <mqueue.h>
#define POSIX_QUEUE "/mqueue"
#define MAX_THREADS 1
#define MAX_ITEMS 1000000
struct Shared
{
mqd_t mqdes;
int nput;
int nval;
};
struct Shared shared;
void shared_init()
{
shared.mqdes = mq_open(POSIX_QUEUE, O_RDWR | O_CREAT, 0666, NULL); //在我的系統中,Posix消息隊列最大容量為10
mq_unlink(POSIX_QUEUE);
}
void shared_destroy()
{
mq_close(shared.mqdes);
}
void *produce(void *arg)
{
while (1)
{
if (shared.nput >= MAX_ITEMS)
{
pthread_exit(NULL);
}
mq_send(shared.mqdes, (char *)&shared.nval, sizeof(shared.nval), 0);
shared.nput++;
shared.nval++;
/* 線程tid_produce[i]每執行一次,就累加count[i]的值 */
*((int *)arg) += 1;
}
pthread_exit(NULL);
}
void *consume(void *arg)
{
struct mq_attr attr;
int nval;
int i;
mq_getattr(shared.mqdes, &attr);
printf("system defaut mq_maxmsg = %ld, mq_msgsize = %ld\n", attr.mq_maxmsg, attr.mq_msgsize);
for (i = 0; i < MAX_ITEMS; i++)
{
//消費者線程按順序取出消息,根據mq_getattr返回結果來設置mq_receive的參數len
mq_receive(shared.mqdes, (char *)&nval, attr.mq_msgsize, NULL);
if (nval != i)
{
printf("error: buff[%d] = %d\n", i, nval);
}
}
pthread_exit(NULL);
}
int main()
{
pthread_t tid_produce[MAX_THREADS];
pthread_t tid_consume;
int count[MAX_THREADS];
struct timeval start_time;
struct timeval end_time;
float time_sec;
int i;
shared_init();
gettimeofday(&start_time, NULL);
for (i = 0; i < MAX_THREADS; i++)
{
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
pthread_create(&tid_consume, NULL, consume, NULL);
for (i = 0; i < MAX_THREADS; i++)
{
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]); //輸出每個線程的執行次數
}
pthread_join(tid_consume, NULL);
gettimeofday(&end_time, NULL);
time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0;
printf("%d produce and %d consume total spend %.2f second\n", MAX_THREADS, 1, time_sec);
shared_destroy();
return 0;
}
註意觀察代碼和運行結果,可以發現生產者和消費者之間並沒有做同步處理,但仍然得到了正確結果,這是因為當沒有設置非阻塞標誌時,Posix消息隊列自帶隱式同步機制:
- 如果消息隊列滿,mq_send會阻塞,直到隊列中有空位置
- 如果消息隊列空,mq_receive會阻塞,直到隊列中有數據
而這正是單生產者 + 單消費者模型唯一需要處理的同步問題,因此不需要應用程式再進行顯式同步。
顯式同步,指的是使用互斥鎖、條件變數、信號量等方式進行的同步;Posix自帶的同步在內核中進行,對於應用程式來說是不可見的,因此稱其為隱式同步。
多生產者 + 單消費者
當有多個生產者時,Posix消息隊列自帶的同步機制就不夠用了,需要顯式處理生產者線程之間的同步問題,我們使用互斥鎖實現這個功能。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <mqueue.h>
#define POSIX_QUEUE "/mqueue"
#define MAX_THREADS 10
#define MAX_ITEMS 1000000
struct Shared
{
pthread_mutex_t mutex;
mqd_t mqdes;
int nput;
int nval;
};
struct Shared shared;
void shared_init()
{
pthread_mutex_init(&shared.mutex, NULL);
shared.mqdes = mq_open(POSIX_QUEUE, O_RDWR | O_CREAT, 0666, NULL); //在我的系統中,Posix消息隊列最大容量為10
mq_unlink(POSIX_QUEUE);
}
void shared_destroy()
{
pthread_mutex_destroy(&shared.mutex);
mq_close(shared.mqdes);
}
void *produce(void *arg)
{
while (1)
{
pthread_mutex_lock(&shared.mutex);
if (shared.nput >= MAX_ITEMS)
{
pthread_mutex_unlock(&shared.mutex);
pthread_exit(NULL);
}
//生產者線程依次累加nval的值,並以無優先順序消息方式放入消息隊列
mq_send(shared.mqdes, (char *)&shared.nval, sizeof(shared.nval), 0);
shared.nput++;
shared.nval++;
pthread_mutex_unlock(&shared.mutex);
/* 線程tid_produce[i]每執行一次,就累加count[i]的值 */
*((int *)arg) += 1;
}
pthread_exit(NULL);
}
void *consume(void *arg)
{
struct mq_attr attr;
int nval;
int i;
mq_getattr(shared.mqdes, &attr);
printf("system defaut mq_maxmsg = %ld, mq_msgsize = %ld\n", attr.mq_maxmsg, attr.mq_msgsize);
for (i = 0; i < MAX_ITEMS; i++)
{
mq_receive(shared.mqdes, (char *)&nval, attr.mq_msgsize, NULL); //根據mq_getattr返回結果來設置mq_receive的參數len
if (nval != i)
{
printf("error: buff[%d] = %d\n", i, nval);
}
}
pthread_exit(NULL);
}
int main()
{
pthread_t tid_produce[MAX_THREADS];
pthread_t tid_consume;
int count[MAX_THREADS];
struct timeval start_time;
struct timeval end_time;
float time_sec;
int i;
shared_init();
gettimeofday(&start_time, NULL);
for (i = 0; i < MAX_THREADS; i++)
{
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
pthread_create(&tid_consume, NULL, consume, NULL);
for (i = 0; i < MAX_THREADS; i++)
{
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]); //輸出每個線程的執行次數
}
pthread_join(tid_consume, NULL);
gettimeofday(&end_time, NULL);
time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0;
printf("%d produce and %d consume total spend %.2f second\n", MAX_THREADS, 1, time_sec);
shared_destroy();
return 0;
}
遇到的問題(原因暫時未知):
- 若在consume()的mq_receive()前後上鎖解鎖,程式會卡死
5. 效率對比
和生產者消費者一節中的解決方案相比,Posix消息隊列的效率比信號量差,比條件變數高:
- 10個生產者,互斥鎖 + Posix消息隊列,2.5S完成
- 10個生產者,互斥鎖 + 條件變數 + 隊列緩衝區,3.5S內完成
- 10個生產者,互斥鎖 + Posix有名信號量 + 隊列緩衝區,2S內完成
- 10個生產者,互斥鎖 + Posix無名信號量 + 隊列緩衝區,1.5S內完成
而且,同使用Posix消息隊列,10個生產者 + 互斥鎖需要2.5S,而單生產者無鎖1S以內就可以完成,可見互斥鎖的開銷使得多線程反而降低了效率。