【c#】隊列(Queue)和MSMQ(消息隊列)的基礎使用

来源:https://www.cnblogs.com/yanbigfeg/archive/2018/09/19/9674238.html
-Advertisement-
Play Games

首先我們知道隊列是先進先出的機制,所以在處理併發是個不錯的選擇。然後就寫兩個隊列的簡單應用。 Queue 命名空間 命名空間:System.Collections,不在這裡做過多的理論解釋,這個東西非常的好理解。 可以看下官方文檔:https://docs.microsoft.com/zh-cn/d ...


 

    首先我們知道隊列是先進先出的機制,所以在處理併發是個不錯的選擇。然後就寫兩個隊列的簡單應用。

Queue

命名空間

    命名空間:System.Collections,不在這裡做過多的理論解釋,這個東西非常的好理解。

    可以看下官方文檔:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2

示例代碼

我這裡就是為了方便記憶做了一個基本的例子,首先創建了QueueTest類:

包含了獲取隊列的數量,入隊和出隊的實現

 1  public class QueueTest
 2     {
 3         public static Queue<string> q = new Queue<string>();
 4 
 5         #region 獲取隊列數量
 6         public int GetCount()
 7         {
 8 
 9             return q.Count;
10         }
11         #endregion
12 
13         #region 隊列添加數據
14         public void IntoData(string qStr)
15         {
16             string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
17             q.Enqueue(qStr);
18             Console.WriteLine($"隊列添加數據: {qStr};當前線程id:{threadId}");
19         }
20         #endregion
21 
22         #region 隊列輸出數據
23 
24         public string OutData()
25         {
26             string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
27             string str = q.Dequeue();
28             Console.WriteLine($"隊列輸出數據: {str};當前線程id:{threadId}");
29             return str;
30         }
31         #endregion
32 
33     }

為了模擬併發情況下也不會出現重覆讀取和插入混亂的問題所以寫了TaskTest類裡面開闢了兩個非同步線程進行插入和讀取:

這裡只是證明瞭多線程插入不會造成丟失。無憂證明併發的先進先出

 1     class TaskTest
 2     {
 3 
 4         #region 隊列的操作模擬
 5         public static void QueueMian()
 6         {
 7             QueueA();
 8             QueueB();
 9         }
10         private static async void QueueA()
11         {
12             QueueTest queue = new QueueTest();
13             var task = Task.Run(() =>
14             {
15                 for (int i = 0; i < 20; i++)
16                 {
17                     queue.IntoData("QueueA" + i);
18                 }
19             });
20             await task;
21             Console.WriteLine("QueueAA插入完成,進行輸出:");
22 
23             while (queue.GetCount() > 0)
24             {
25                 queue.OutData();
26             }
27         }
28 
29         private static async void QueueB()
30         {
31             QueueTest queue = new QueueTest();
32             var task = Task.Run(() =>
33             {
34                 for (int i = 0; i < 20; i++)
35                 {
36                     queue.IntoData("QueueB" + i);
37                 }
38             });
39             await task;
40             Console.WriteLine("QueueB插入完成,進行輸出:");
41 
42             while (queue.GetCount() > 0)
43             {
44                 queue.OutData();
45             }
46         }
47         #endregion
48 
49     }

效果展示

然後在main函數直接調用即可:

通過上面的截圖可以看出插入線程是無先後的。

這張圖也是線程無先後。

補充:通過園友的提問,我發現我一開始測試的不太仔細,只註意多線程下的插入,沒有註意到輸出其實不是跟插入的順序一致,對不起,這說明queue不是線程安全的,所以這個就當是入隊,出隊的基礎例子並不能說明併發。後面有一個補充的ConcurrentQueue隊列是說明瞭併發線程的先進先出。

MSMQ

msmq是微軟提供的消息隊列,本來在windows系統中就存在,但是預設沒有開啟。需要開啟。

開啟安裝

打開控制面板=>程式和功能=> 啟動或關閉windows功能 => Microsoft Message Queue(MSMQ)伺服器=>Microsoft Message Queue(MSMQ)伺服器核心

一般選擇:MSMQ Active Directory域服務繼承和MSMQ HTTP支持即可。

點擊確定等待安裝成功。

命名空間

需要引用System.Messaging.DLL

命名空間:System.Messaging

官方資料文檔:https://docs.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue?view=netframework-4.7.2

示例代碼

與上面queue同樣的示例方式,創建一個MSMQ類,實現創建消息隊列,查詢數據,入列,出列功能:

  1  /// <summary>
  2     /// MSMQ消息隊列
  3     /// </summary>
  4     class MSMQ
  5     {
  6         static string path = ".\\Private$\\myQueue";
  7         static MessageQueue queue;
  8         public static void Createqueue(string queuePath)
  9         {
 10             try
 11             {
 12                 if (MessageQueue.Exists(queuePath))
 13                 {
 14                     Console.WriteLine("消息隊列已經存在");
 15                     //獲取這個消息隊列
 16                     queue = new MessageQueue(queuePath);
 17                 }
 18                 else
 19                 {
 20                     //不存在,就創建一個新的,並獲取這個消息隊列對象
 21                     queue = MessageQueue.Create(queuePath);
 22                     path = queuePath;
 23                 }
 24             }
 25             catch (Exception e)
 26             {
 27                 Console.WriteLine(e.Message);
 28             }
 29 
 30         }
 31 
 32 
 33         #region 獲取消息隊列的數量
 34         public static int GetMessageCount()
 35         {
 36             try
 37             {
 38                 if (queue != null)
 39                 {
 40                     int count = queue.GetAllMessages().Length;
 41                     Console.WriteLine($"消息隊列數量:{count}");
 42                     return count;
 43                 }
 44                 else
 45                 {
 46                     return 0;
 47                 }
 48             }
 49             catch (MessageQueueException e)
 50             {
 51 
 52                 Console.WriteLine(e.Message);
 53                 return 0;
 54             }
 55 
 56 
 57         }
 58         #endregion
 59 
 60         #region 發送消息到隊列
 61         public static void SendMessage(string qStr)
 62         {
 63             try
 64             {
 65                 //連接到本地隊列
 66 
 67                 MessageQueue myQueue = new MessageQueue(path);
 68 
 69                 //MessageQueue myQueue = new MessageQueue("FormatName:Direct=TCP:192.168.12.79//Private$//myQueue1");
 70 
 71                 //MessageQueue rmQ = new MessageQueue("FormatName:Direct=TCP:121.0.0.1//private$//queue");--遠程格式
 72 
 73                 Message myMessage = new Message();
 74 
 75                 myMessage.Body = qStr;
 76 
 77                 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
 78 
 79                 //發生消息到隊列中
 80 
 81                 myQueue.Send(myMessage);
 82 
 83                 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
 84                 Console.WriteLine($"消息發送成功: {qStr};當前線程id:{threadId}");
 85             }
 86             catch (MessageQueueException e)
 87             {
 88                 Console.WriteLine(e.Message);
 89             }
 90         }
 91         #endregion
 92 
 93         #region 連接消息隊列讀取消息
 94         public static void ReceiveMessage()
 95         {
 96             MessageQueue myQueue = new MessageQueue(path);
 97 
 98 
 99             myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
100 
101             try
102 
103             {
104 
105                 //從隊列中接收消息
106 
107                 Message myMessage = myQueue.Receive(new TimeSpan(10));// myQueue.Peek();--接收後不消息從隊列中移除
108                 myQueue.Close();
109 
110                 string context = myMessage.Body.ToString();
111                 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
112                 Console.WriteLine($"--------------------------消息內容: {context};當前線程id:{threadId}");
113 
114             }
115 
116             catch (System.Messaging.MessageQueueException e)
117 
118             {
119 
120                 Console.WriteLine(e.Message);
121 
122             }
123 
124             catch (InvalidCastException e)
125 
126             {
127 
128                 Console.WriteLine(e.Message);
129 
130             }
131 
132         }
133         #endregion
134     }

這裡說明一下path這個欄位,這是消息隊列的文件位置和隊列名稱,我這裡寫的“.”(點)就是代表的位置MachineName欄位,,代表本機的意思

然後TaskTest類修改成這個樣子:

 1 class TaskTest
 2     {
 3 
 4         #region 消息隊列的操作模擬
 5         public static void MSMQMian()
 6         {
 7             MSMQ.Createqueue(".\\Private$\\myQueue");
 8             MSMQA();
 9             MSMQB();
10             Console.WriteLine("MSMQ結束");
11         }
12         private static async void MSMQA()
13         {
14             var task = Task.Run(() =>
15             {
16                 for (int i = 0; i < 20; i++)
17                 {
18                     MSMQ.SendMessage("MSMQA" + i);
19                 }
20             });
21             await task;
22             Console.WriteLine("MSMQA發送完成,進行讀取:");
23 
24             while (MSMQ.GetMessageCount() > 0)
25             {
26                 MSMQ.ReceiveMessage();
27             }
28         }
29 
30         private static async void MSMQB()
31         {
32             var task = Task.Run(() =>
33             {
34                 for (int i = 0; i < 20; i++)
35                 {
36                     MSMQ.SendMessage("MSMQB" + i);
37                 }
38             });
39             await task;
40             Console.WriteLine("MSMQB發送完成,進行讀取:");
41 
42             while (MSMQ.GetMessageCount() > 0)
43             {
44                 MSMQ.ReceiveMessage();
45             }
46         }
47         #endregion

 效果展示

本機查看消息隊列

創建成功的消息隊列我們可以在電腦上查看:我的電腦=>管理 =>電腦管理 =>服務與應用程式 =>消息隊列 =>專用隊列就看到我剛纔創建的消息隊列

 補充感謝

感謝 virtual1988 提出的queue不是線程安全這個問題,是我沒搞清楚。線程安全要使用ConcurrentQueue隊列。

謝謝提出的寶貴意見。

ConcurrentQueue

所以我有修改了一下寫了個ConcurrentQueue隊列的:

修改代碼如下:

 //public static Queue<string> q = new Queue<string>();
        public static ConcurrentQueue<string> q = new ConcurrentQueue<string>();
        //public static Queue q =Queue.Synchronized(new Queue());

        #region 獲取隊列數量
        public static int GetCount()
        {

            return q.Count;
        }
        #endregion

        #region 隊列添加數據
        public static void IntoData(string qStr)
        {
            string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
            q.Enqueue(qStr);
            System.Threading.Thread.Sleep(10);
            Console.WriteLine($"隊列添加數據: {qStr};當前線程id:{threadId}");
        }
        #endregion

        #region 隊列輸出數據
        public static string OutData2()
        {
            string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
            foreach (var item in q)
            {

                Console.WriteLine($"------隊列輸出數據: {item};當前線程id:{threadId}");
                string d="";
                q.TryDequeue( out d);
            }

            return "211";
        }
        #endregion
View Code

task類:

 #region 隊列的操作模擬
        public static async void QueueMian()
        {
            QueueA();
            QueueB();
        }
        private static async void QueueA()
        {
            var task = Task.Run(() =>
            {
                for (int i = 0; i < 20; i++)
                {
                    QueueTest.IntoData("QueueA" + i);
                }
            });
            await task;
            Console.WriteLine("QueueA插入完成,進行輸出:");
        }

        private static async void QueueB()
        {
            var task = Task.Run(() =>
            {
                for (int i = 0; i < 20; i++)
                {
                    QueueTest.IntoData("QueueB" + i);
                }
            });
            await task;
            Console.WriteLine("QueueB插入完成,進行輸出:");

        }

        public static void QueueC()
        {
            Console.WriteLine("Queue插入完成,進行輸出:");
            while (QueueTest.GetCount() > 0)
            {
                QueueTest.OutData2();
            }
        }
        #endregion
View Code

Main函數調用:

 static void Main(string[] args)
        {


            try
            {
                Stopwatch stopWatch = new Stopwatch();
                TaskTest.QueueMian();
                Console.ReadLine();
                TaskTest.QueueC();
                Console.ReadLine();
            }
            catch (Exception e)
            {

                throw;
            }
        }

插入效果:

輸出效果:

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ModelValidator主要是應用在ModelMetadata元數據的類型上或類型屬性上。它是驗證的基礎類型,所有的ModelValidatorProviders、DataAnnotationValidator、DataAnnotationValidatorProvider都是主要通過GetVa ...
  • winform 程式調用Windows.Devices.Bluetoot API 實現windows下BLE藍牙設備自動連接,收發數據功能。不需要使用win10的UWP開發。 ...
  • 文檔繼續完善整理中。。。。。。 c.DocumentFilter<SwaggerDocTag>(); /// <summary> /// Swagger註釋幫助類 /// </summary> public class SwaggerDocTag : IDocumentFilter { /// <s ...
  • 添加引用. 綁定命令. ...
  • //定義原子變數 int mituxInt = -1; //原子級別+1值,如果>=0,說明當前鎖為空,可以執行,避免重覆執行 if (Interlocked.Increment(ref mituxInt) <= 0) { if (_serverThread == null || (_serverT... ...
  • 終本案件:http://zxgk.court.gov.cn/zhongben/new_index.html 綜合執行人:http://zxgk.court.gov.cn/zhixing/new_index.html 裁判文書:http://wenshu.court.gov.cn 終本案件和執行人爬取 ...
  • 先看看為什麼要用鎖 需求:多線程處理值的加減 static int NoLockData = 0; public static void NoLockNormalTest(int threadIndex) { while (true)//這是腦殘設計,while(true) { //lock (lo ...
  • 圖表能夠很直觀的表現數據在某個時間段的變化趨勢,或者呈現數據的整體和局部之間的相互關係,相較於大篇幅的文本數據,圖表更增加了我們分析數據時選擇的多樣性,是我們挖掘數據背後潛在價值的一種更為有效地方式。在做數據彙報時,常用到PPT幻燈片來輔助工作,下麵的示例中將演示如何通過C#編程在PPT幻燈片中創建 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...