Linux編程之自定義消息隊列

来源:http://www.cnblogs.com/skyfsm/archive/2017/01/09/6266404.html
-Advertisement-
Play Games

我這裡要講的並不是IPC中的消息隊列,我要講的是在進程內部實現自定義的消息隊列,讓各個線程的消息來推動整個進程的運動。進程間的消息隊列用於進程與進程之間的通信,而我將要實現的進程內的消息隊列是用於有序妥當處理來自於各個線程請求,避免一窩蜂的請求而導致消息的異常丟失。想想socket編程里的liste ...


我這裡要講的並不是IPC中的消息隊列,我要講的是在進程內部實現自定義的消息隊列,讓各個線程的消息來推動整個進程的運動。進程間的消息隊列用於進程與進程之間的通信,而我將要實現的進程內的消息隊列是用於有序妥當處理來自於各個線程請求,避免一窩蜂的請求而導致消息的異常丟失。想想socket編程里的listen函數吧,裡面要設置一個隊列長度的參數,其實來自網路的請求已經排成一個請求隊列了,只是這個隊列是系統幫我們做好了,我們看不到而已。如果系統不幫我們做這個等待隊列的話,那就需要我們程式員在應用層實現了。


進程內的消息隊列實現並不難,總的來說有以下幾點:
  • 自定義消息結構,並構造隊列
  • 一個線程負責依次從消息隊列中取出消息,並處理該消息
  • 多個線程產生事件,並將消息放進消息隊列,等待處理
長話短說,我們開始動手吧!     一、定義消息結構 先貼代碼再解釋:
typedef struct Msg_Hdr_s  
{  
    uint32 msg_type;  
    uint32 msg_len;  
    uint32 msg_src;  
    uint32 msg_dst;      
}Msg_Hdr_t;  
  
typedef struct Msg_s  
{  
    Msg_Hdr_t hdr;  
    uint8 data[100];  
} Msg_t;
下麵是我設計的消息格式內容的解釋:
  • msg_type:標記消息類型,當消息接收者看到該msg_type後就知道他要乾什麼事了
  • msg_len:消息長度,待擴展,暫時沒用到(以後會擴展為變長消息)
  • msg_src:消息的源地址,即消息的發起者
  • msg_dst:消息的目的地,即消息的接受者
  • data[100]:消息除去消息頭外可以攜帶的信息量,定義為100位元組
由該消息數據結構可以知道,這個消息是定長的,當然也可以實現為變長消息,但現在暫不實現,今天先把定長消息實現了,以後再完善變長消息。     二、構造迴圈隊列 隊列可以由鏈表實現,也可以由數組實現,這裡就使用數組實現的迴圈鏈表作為我們消息隊列的隊列模型。
typedef struct Queue_s  
{  
    int head;  
    int rear;  
    sem_t sem;  
    Msg_t data[QUEUE_SIZE];  
}Queue_t;  
  
int MsgQueueInit(Queue_t* Q)  
{  
    if(!Q)  
    {  
        printf("Invalid Queue!\n");  
        return -1;  
    }  
    Q->rear = 0;  
    Q->head = 0;  
    sem_init(&Q->sem, 0, 1);  
    return 0;      
}  
  
int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
{  
    if(!Q)  
    {  
        printf("Invalid Queue!\n");  
        return -1;  
    }  
    if(Q->rear == Q->head) //only one consumer,no need to lock head  
    {  
        printf("Empty Queue!\n");  
        return -1;  
    }  
    memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
    Q->head = (Q->head+1)%QUEUE_SIZE;  
    return 0;         
  
}  
  
int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
{  
    if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
    {  
        printf("Full Queue!\n");  
        return -1;  
    }  
    sem_wait(&Q->sem);  
    memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
    Q->rear = (Q->rear+1)%QUEUE_SIZE;  
    sem_post(&Q->sem);  
    return 0;  
} 
  迴圈隊列的實現想必大家都比較熟悉,但這裡需要提示的幾點是:
  • 隊列中應加入信號量或鎖來保證進隊時的互斥訪問,因為多個消息可能同時進隊,互相覆蓋其隊列節點
  • 這裡的信號量僅用於進隊而沒用於出隊,理由是消息處理者只有一個,不存在互斥的情形

 

三、構造消息處理者

if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
{  
    printf("create handler thread fail!\n");  
    return -1;          
}  
  
void msg_printer(Msg_t* msg)  
{  
    if(!msg)  
    {  
        return;  
    }  
    printf("%s: I have recieved a message!\n", __FUNCTION__);  
    printf("%s: msgtype:%d   msg_src:%d  dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
  
}  
  
void msg_handler()  
{  
    sleep(5);  //let's wait 5s when starts  
    while(1)  
    {  
        Msg_t msg;  
        memset(&msg, 0 ,sizeof(Msg_t));  
        int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
        if(res != 0)  
        {  
            sleep(10);  
            continue;  
        }  
        msg_printer(&msg);  
        sleep(1);  
    }  
}
我在進程里create了一個線程作為消息處理者(handler)來處理消息隊列的消息,甘進入該線程時先等個5秒鐘來讓生產者往隊列里丟些消息,然後再開始消息處理。當隊列沒消息可取時,就休息十秒,再去取消息。   這裡的消息處理很簡單,我只是簡單地將受到的消息列印一下,證明受到的消息正是其他線程發給我的。當然,你也可以在這裡擴展功能,根據受到的消息類型進一步決定該做什麼事。比如:
enum MSG_TYPE  
{  
    GO_HOME,  
    GO_TO_BED,  
    GO_TO_LUNCH,  
    GO_TO_CINAMA,  
    GO_TO_SCHOOL,  
    GO_DATEING,  
    GO_TO_WORK,//6  
};  
  
void handler()  
{  
    switch(msgtype)  
    {  
        case GO_HOME: go_home(); break;  
        case GO_TO_BED:  go_to_bed(); break;  
        .......  
    }  
}

這裡的handler就是一個簡單的狀態機了,根據給定的消息類型(事件)去做特定的事,推動狀態機的轉動。

 

四、構造消息生產者

if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
{  
    printf("create thread1 fail!\n");  
    return -1;  
}  
  
if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
{  
    printf("create thread2 fail!\n");  
    return -1;  
}      
  
if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
{  
    printf("create thread3 fail!\n");  
    return -1;  
}      
  
  
void msg_sender1()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD1;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread1 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}  
  
void msg_sender2()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD2;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread2 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}  
  
void msg_sender3()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD3;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread3 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}

這裡我create了三個線程來模擬消息生產者,每個生產者每隔1秒往消息隊列里寫消息。

 

五、跑起來看看

先貼完整的代碼: msg_queue.c:
  1 #include <stdio.h>  
  2 #include <pthread.h>  
  3 #include <semaphore.h>  
  4 #include <unistd.h>  
  5 #include <string.h>  
  6 #include "msg_def.h"  
  7   
  8 Queue_t MsgQueue;  
  9   
 10 int main(int argc, char* argv[])  
 11 {  
 12     int ret;  
 13     pthread_t thread1_id;  
 14     pthread_t thread2_id;  
 15     pthread_t thread3_id;  
 16     pthread_t handler_thread_id;  
 17   
 18     ret = MsgQueueInit((Queue_t*)&MsgQueue);  
 19     if(ret != 0)  
 20     {  
 21         return -1;  
 22     }  
 23   
 24     if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
 25     {  
 26         printf("create handler thread fail!\n");  
 27         return -1;          
 28     }  
 29   
 30   
 31     if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
 32     {  
 33         printf("create thread1 fail!\n");  
 34         return -1;  
 35     }  
 36   
 37     if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
 38     {  
 39         printf("create thread2 fail!\n");  
 40         return -1;  
 41     }      
 42   
 43     if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
 44     {  
 45         printf("create thread3 fail!\n");  
 46         return -1;  
 47     }      
 48   
 49   
 50     while(1)  
 51     {      
 52         sleep(1);  
 53     }  
 54   
 55     return 0;  
 56 }  
 57   
 58   
 59   
 60   
 61 int MsgQueueInit(Queue_t* Q)  
 62 {  
 63     if(!Q)  
 64     {  
 65         printf("Invalid Queue!\n");  
 66         return -1;  
 67     }  
 68     Q->rear = 0;  
 69     Q->head = 0;  
 70     sem_init(&Q->sem, 0, 1);  
 71     return 0;      
 72 }  
 73   
 74 int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
 75 {  
 76     if(!Q)  
 77     {  
 78         printf("Invalid Queue!\n");  
 79         return -1;  
 80     }  
 81     if(Q->rear == Q->head) //only one cosumer,no need to lock head  
 82     {  
 83         printf("Empty Queue!\n");  
 84         return -1;  
 85     }  
 86     memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
 87     Q->head = (Q->head+1)%QUEUE_SIZE;  
 88     return 0;         
 89   
 90 }  
 91   
 92 int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
 93 {  
 94     if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
 95     {  
 96         printf("Full Queue!\n");  
 97         return -1;  
 98     }  
 99     sem_wait(&Q->sem);  
100     memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
101     Q->rear = (Q->rear+1)%QUEUE_SIZE;  
102     sem_post(&Q->sem);  
103     return 0;  
104 }  
105   
106 void msg_printer(Msg_t* msg)  
107 {  
108     if(!msg)  
109     {  
110         return;  
111     }  
112     printf("%s: I have recieved a message!\n", __FUNCTION__);  
113     printf("%s: msgtype:%d   msg_src:%d  dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
114   
115 }  
116   
117 int msg_send()  
118 {  
119   
120     Msg_t msg;  
121     msg.hdr.msg_type = GO_HOME;  
122     msg.hdr.msg_src = THREAD1;  
123     msg.hdr.msg_dst = HANDLER;  
124     return MsgEnQueue((Queue_t*)&MsgQueue, &msg);      
125   
126 }  
127   
128 void msg_handler()  
129 {  
130     sleep(5);  //let's wait 5s when starts  
131     while(1)  
132     {  
133         Msg_t msg;  
134         memset(&msg, 0 ,sizeof(Msg_t));  
135         int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
136         if(res != 0)  
137         {  
138             sleep(10);  
139             continue;  
140         }  
141         msg_printer(&msg);  
142         sleep(1);  
143     }  
144 }  
145   
146   
147 void msg_sender1()  
148 {  
149     int i = 0;  
150     while(1)  
151     {  
152         if(i > 10)  
153         {  
154             i = 0;  
155         }  
156         Msg_t msg;  
157         msg.hdr.msg_type = i++;  
158         msg.hdr.msg_src = THREAD1;  
159         msg.hdr.msg_dst = HANDLER;  
160         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
161         printf("%s: Thread1 send a message!\n",__FUNCTION__);  
162         sleep(1);  
163     }  
164 }  
165   
166 void msg_sender2()  
167 {  
168     int i = 0;  
169     while(1)  
170     {  
171         if(i > 10)  
172         {  
173             i = 0;  
174         }  
175         Msg_t msg;  
176         msg.hdr.msg_type = i++;  
177         msg.hdr.msg_src = THREAD2;  
178         msg.hdr.msg_dst = HANDLER;  
179         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
180         printf("%s: Thread2 send a message!\n",__FUNCTION__);  
181         sleep(1);  
182     }  
183 }  
184   
185 void msg_sender3()  
186 {  
187     int i = 0;  
188     while(1)  
189     {  
190         if(i > 10)  
191         {  
192             i = 0;  
193         }  
194         Msg_t msg;  
195         msg.hdr.msg_type = i++;  
196         msg.hdr.msg_src = THREAD3;  
197         msg.hdr.msg_dst = HANDLER;  
198         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
199         printf("%s: Thread3 send a message!\n",__FUNCTION__);  
200         sleep(1);  
201     }  
202 }

 

msg_def.h:

 1 #include <stdio.h>  
 2 #include <pthread.h>  
 3 #include <semaphore.h>  
 4   
 5 typedef unsigned char uint8;  
 6 typedef unsigned short unit16;  
 7 typedef unsigned int uint32;  
 8   
 9 #define QUEUE_SIZE 1000  
10   
11 typedef struct Msg_Hdr_s  
12 {  
13     uint32 msg_type;  
14     uint32 msg_len;  
15     uint32 msg_src;  
16     uint32 msg_dst;      
17 }Msg_Hdr_t;  
18   
19 typedef struct Msg_s  
20 {  
21     Msg_Hdr_t hdr;  
22     uint8 data[100];  
23 } Msg_t;  
24   
25 typedef struct Queue_s  
26 {  
27     int head;  
28     int rear;  
29     sem_t sem;  
30     Msg_t data[QUEUE_SIZE];  
31 }Queue_t;  
32   
33 typedef struct Queue_s QueueNode;  
34   
35 enum MSG_TYPE  
36 {  
37     GO_HOME,  
38     GO_TO_BED,  
39     GO_TO_LUNCH,  
40     GO_TO_CINAMA,  
41     GO_TO_SCHOOL,  
42     GO_DATEING,  
43     GO_TO_WORK,//6  
44 };  
45   
46 enum SRC_ADDR  
47 {  
48     THREAD1,  
49     THREAD2,  
50     THREAD3,  
51     HANDLER,  
52 };  
53   
54   
55 int MsgQueueInit(Queue_t* Q);  
56 int MsgDeQueue(Queue_t* Q, Msg_t* msg);  
57 int MsgEnQueue(Queue_t* Q, Msg_t* msg);  
58 void msg_handler();  
59 void msg_sender1();  
60 void msg_sender2();  
61 void msg_sender3();  
62 void msg_printer(Msg_t* msg);  
63 int msg_send();

 

看看跑起來的現象:     Finish! 現在這套進程內的消息隊列的架構在實際工程中非常實用(當然實際工程的框架會複雜健壯得多),很多工程都需要這種基於事件推動的思想來保證每條請求都可以有條不絮地執行,所以這個框架也是有用武之地的,尤其配合狀態機非常適合!
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • SQL Server 2016支持哈希查找,用戶可以在記憶體優化表(Memory-Optimized Table)上創建Hash Index,使用Hash 查找演算法,實現數據的極速查找。在使用上,Hash Index 和B-Tree索引的區別是:Hash Index 是無序查找,Index Key必須 ...
  • 賬號是一種用來記錄單個用戶或者多個用戶的數據。RHEL中每一個合法的用戶都必須擁有賬號,才能使用RHEL。 在RHEL上的賬號可以分為兩類: 用戶賬號:用來存儲單一用戶的數據,你也可以使用一個用戶賬號來存儲某一個用戶的數據。 組賬號:用來存儲多個用戶的信息,每一個組賬號都可以記錄一組用戶的數據。 在 ...
  • 首先要安裝VirtualBox的增強版功能(VBoxGuestAdditions) 在 設備 >安裝增強版功能 >運行,重啟電腦。 1、Linux本地的共用文件夾建立 mkdir /mnt/localShare 2、/etc/下的fstab 修改添加 win_share /mnt/localShar ...
  • 參考s3c2410fb.c總結出框架 1.代碼分析 1.1 入口函數 註冊一個platform_driver結構體,如果存在同名的設備dev時,將調用probe函數。 搜索s3c2410-lcd可得下麵的s3c_device_lcd結構體 1.2 probe函數(只列出關鍵性代碼) 由此可知,其主要 ...
  • 一、 下載mysql5.7 http://mirrors.sohu.com/mysql/MySQL-5.7/ Linux下載: 輸入命令:wget http://mirrors.sohu.com/mysql/MySQL-5.7/mysql-5.7.17-linux-glibc2.5-x86_64.t ...
  • 我們平時在做web開發運行web伺服器或運行某個應用時會報錯,提示該應用的埠號已被占用,我們可以用以下的方法解決。 解決方法一:重新為應用配置埠。 解決方法二:找到占用埠的應用並關閉該應用釋放占用的埠: 1、win+r運行cmd或在開菜單的運行中運行 2、運行命令 netstat -aon| ...
  • 系統window8.1 1、安裝IIS組件:點開始菜單—選擇控制面板——程式——打開或關閉WINDOWS功能——展開Internet信息服務,勾選FTP伺服器(包括FTP服務和FTP擴展性),點確定。 由於我的電腦已將安裝了IIS服務所以找不到了,不知道你電腦有沒有安裝可以在 控制面板\所有控制面板 ...
  • 運行級別 說明 0 系統關機狀態 1 單用戶工作狀態,用於root對系統進行維護,此時不予許其他用戶使用主機。(類似於windows 的安全模式) 2 多用戶狀態(沒有NFS) 3 多用戶狀態(有NFS),主機做為伺服器常在該模式下工作 4 系統未定義 5 多用戶狀態,並且在系統啟動後運行xwind ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...