聊聊RabbitMQ那一些事兒之一基礎應用

来源:https://www.cnblogs.com/xiaoXuZhi/archive/2020/03/02/RabbitMQ_01.html
-Advertisement-
Play Games

RabbitMQ的簡介,關鍵詞,以及是如果運行的,其運作模式有哪一些,在使用的時候需要註意一些什麼。帶著這一些問題,一起進入到文章中去。 ...


聊聊RabbitMQ那一些事兒之一基礎應用

  Hi,各位熱愛技術的小伙伴您們好,今年的疫情害人啊,真心祝願您和您的家人大家都平平安安,健健康康。年前到現在一直沒有總結點東西,寫點東西,不然久了自己感覺自己都要被廢啦。這個周末花了一些時間來梳理了一下RabbitMQ的相關知識點。先來一個基礎篇,先用起來。我也是一個邊學習邊梳理的過程,如果有什麼梳理的不妥之處,多多指點,相互學習,謝謝!

  在使用前,我們首先第一件事情就是環境搭建。至於RabbitMQ的環境搭建,我就不在此啰嗦了,網上一搜一大堆,還沒有搭建環境的小伙伴,可以網上找度娘哈,嘿嘿。

一、什麼是MQ

  MQ簡單的說就是隊列,隊列的特性就是先進先出。我們其實可以把隊列理解為一個消息管道,通過消息管道實現消息傳遞。最終達到不同的進程間、不同服務間的通訊需要。

  在一個程式中,我們 可以通過MQ實現不同進程間的通訊。在不同程式/服務間,我們同樣可以通過MQ來實現相互通訊,這也是本文的重點,這個時候就該今天的主角登場了。

二、RabbitMQ介紹

  RabbitMQ是一個開源的,在AMQP基礎完整的,可復用的企業消息系統。我個人的簡單的理解就是,實現消息的接收、存儲、管理、分發。在操作系統支持上,支持主流的操作系統(Linux、Windows);在開發語言介面支持上,支持所有的主流開發語言;在性能上,支持消息持久化、集群化、高併發等等。

三、RabbitMQ關鍵詞介紹

  Broker(Server):接受客戶端連接,實現AMQP消息隊列和路由功能的進程,我們可以把Broker叫做RabbitMQ伺服器。

  Virtual Host:一個虛擬概念,其實簡單的理解你可以認為是在邏輯上對MQ進行分區隔離,這樣避免不同業務的MQ直接交叉感染。一個Virtual Host裡面可以有若幹個Exchange和Queue,主要用於許可權控制,隔離應用。如應用程式A使用VhostA,應用程式B使用VhostB,那麼我們在VhostA中只存放應用程式A的exchange,queue和消息,應用程式A的用戶只能訪問VhostA,不能訪問VhostB中的數據。

  Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給伺服器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四種,不同類型的Exchange路由規則是不一樣的(這些以後會詳細介紹)。

  Queue:消息隊列,用於存儲還未被消費者消費的消息,隊列是先進先出的,預設情況下先存儲的消息先被處理。

  Message:就是消息,由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先順序是多少等,Body是真正傳輸的數據,內容格式為byte[]。

  Connection:連接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連接。

  Channel:道,僅僅創建了客戶端到Broker之間的連接Connection後,客戶端還是不能發送消息的。需要在Connection的基礎上創建Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令,一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的。

四、RabbitMQ三大角色介紹

  通過上面的一些簡單介紹,我相信你對MQ有了一個初步的印象。也許你會雲里霧裡的,到底是怎麼運行起來的啊,來一個實際點的。哈哈,不急,下麵馬上進入RabbitMQ跑起來階段。其實要跑起來,我們還要簡單介紹一下RabbitMQ重要的三個角色:生產者、伺服器、消費者。

  生產者:也就是消息生產方,通過RabbitMQ提高的API,將消息推送到RabbitMQ伺服器。

  伺服器:RabbitMQ的服務中心,接收生產者生產的消息,並根據分發規則,將消息推送到對應的消費者。

  消費者:顧名思義,就是消息的最終接收處理者。

  這樣一來,我相信大家腦海裡面已經有一個畫面了,生產者--生成消息-->伺服器--轉發-->消費者(最終處理消息)。這就是一個消息的整體流程和生命周期。

 

五、RabbitMQ跑起來

  通過上面的介紹,我們應該知道MQ的簡單的消息交互的流程。有了這個基礎,下麵我們就分類來介紹一下三大角色的數據交付方式。整體上來說,數據交互方式上有以下5種方式(5種工作模式),在網上找了一張圖,很方便的供大家參考。

  其實通過上面的圖,我們會發現,前兩種情況,消費者和生成者之間都是直接通過連接,後面三種情況,消費者和生產者直接有一層交換機(Exchange)。這樣一來,我們可以從整體上分為兩個大類:其一、消息直推隊列;其二、消息推送給交換機,交換機根據路由規則轉發至隊列。

  其實在實際的工作中,第一大類,我們是不會使用到的,都是採用的第二大類來實現實際的項目開發需求。但是第一大類,能夠很好的將我們先領我們入門,先簡單的把程式跑起來。由於時間原因,今天我們也就先實現第一大類的兩種情況,第二大類的,明後天在專門的文章來詳細介紹。

簡單模式:

簡單模式就是只有一個生產者,一個消費者。這個很簡單,下麵用一個實際例子來說明。直接貼代碼:

生產者代碼: 

/// <summary>
 /// 消息生成者
 /// </summary>
public class Program
{
    static void Main(string[] args)
    {
        // rabbitMQ鏈接對象
        var factory = new ConnectionFactory();
        // RabbitMQ服務在本地運行
        factory.HostName = "192.168.1.1";
        // RabbitMQ服務埠
        factory.Port = 5672;
        // 用戶名
        factory.UserName = "guest";
        // 密碼
        factory.Password = "guest";
        // 虛擬主機名稱
        factory.VirtualHost = "/";

        // 隊列名稱
        string queueName = "hello";

        // 創建鏈接
        using (var connection = factory.CreateConnection())
        {
            // 創建通道
            using (var channel = connection.CreateModel())
            {
                // 創建一個名稱為hello的消息隊列--當然一步也可以通過RabbitMQ管理後臺添加
                // 當已經存在該隊列時,不會重覆添加,但是如果已存在的隊列和新建的隊列存在屬性差異時,會創建失敗,會拋異常,所以在實際使用時,如果要通過程式創建隊列,最好要捕捉異常,避免因為這樣的問題而導致程式崩潰。
                channel.QueueDeclare(queueName, false, false, false, null);
                Console.WriteLine("我是生成者");

                while (true)
                {
                    Console.WriteLine("請輸入你要發送的消息,並按Enter鍵結束");

                    // 接收用戶輸入的消息
                    string message = Console.ReadLine();
                    // 消息編碼
                    var body = Encoding.UTF8.GetBytes(message);
                    // 向消息伺服器推送消息
                    channel.BasicPublish("", queueName, null, body);

                    Console.WriteLine($"已發送 {System.DateTime.Now.ToString("HH:mm:ss")}: {message}");
                }
            }
        }
    }
}

  消費者代碼:

 /// <summary>
 /// 消息消費者
 /// </summary>
 public class Program
 {
     static void Main(string[] args)
     {
         // rabbitMQ鏈接對象
         var factory = new ConnectionFactory();
         // RabbitMQ服務在本地運行
         factory.HostName = "192.168.1.1";
         // RabbitMQ服務埠
         factory.Port = 5672;
         // 用戶名
         factory.UserName = "guest";
         // 密碼
         factory.Password = "guest";
         // 虛擬主機名稱
         factory.VirtualHost = "/";

         // 隊列名稱
         string queueName = "hello";

         // 創建鏈接
         using (var connection = factory.CreateConnection())
         {
             // 創建通道
             using (var channel = connection.CreateModel())
             {

                 // 創建一個名稱為hello的消息隊列--當然一步也可以通過RabbitMQ管理後臺添加
                 // 當已經存在該隊列時,不會重覆添加,但是如果已存在的隊列和新建的隊列存在屬性差異時,會創建失敗,會拋異常,所以在實際使用時,如果要通過程式創建隊列,最好要捕捉異常,避免因為這樣的問題而導致程式崩潰。
                 channel.QueueDeclare(queueName, false, false, false, null);
                 Console.WriteLine("我是消費者");

                 // 創建一個消費者
                 var consumer = new EventingBasicConsumer(channel);
                 // 訂閱對應的消息 autoAck:是否自動確認
                 channel.BasicConsume(queueName, autoAck:false, consumer);

                 consumer.Received += (model, ea) =>
                 {
                     var body = ea.Body;
                     var message = Encoding.UTF8.GetString(body);
                     Console.WriteLine($"已接收 {System.DateTime.Now.ToString("HH:mm:ss")}: {message}");

                     // 為了模擬推送過程,在此程式休息1分鐘
                     Thread.Sleep(6000);
                     // 確認消費
                     channel.BasicAck(ea.DeliveryTag, false);
                 };
                 Console.ReadLine();
             }
         }
     }
 }

  

運行結果:

 

  通過實際的運行結果圖,我們很清楚的知道,生產者的消息發生順序,和消費者消費的順序是一直的,這也就MQ的基本原理所在。

上面介紹了簡單模式,下麵我在來介紹一下比簡單模式複雜一點的工作模式。

工作模式:

  我理解的簡單模式,只是帶我們入門,讓我們明白MQ的運行效果是咋樣的。但是在實際工作中,不可能只會有一個消費者,在實際的生產環境中生產者、消費者都可能會有多個存在,這也就是我們說的工作模式。那麼,有多個生成的者的時候,不同的生產者之間又是怎麼來消費消息的呢?下麵我們先通過實踐的例子來說明:

  具體的代碼和上面的代碼是一樣的,我們可以直接開兩個消費者就可以實現數據模擬,直接看運行結果:

  同上面的實際運行結果我們可以簡單的得出以下結論:

  當一個隊列有多個消費者時,在生成的實時消息時,消息隊列伺服器會輪詢的均勻的分發給每一個消費者。

  哈哈哈,註意了,上面的結論我說的是實時消息哦,這裡面就包含了一個坑,在實際的使用過程中要特別註意。那就是歷史消息處理上,在實際項目使用過程中,我們經常會遇到,當消費者打開時,隊列中已經有很多消息待消費,這個時候又該如何保證多個消費均勻分配消息呢?避免忙綠的消費者累死現象。其實很簡單,只需在消費端加上如下一個配置即可:

 
 // 通過Qos設置每次接收消息的條數
 // 三個參數說明
 // prefetchSize:為預取的長度,一般設置為0即可,表示長度不限
 // prefetchCount:表示預取的條數,即發送的最大消息條數
 // global表示是否在Connection中全局設置,true表示Connetion下的所有channel都設置為這個配置。
 channel.BasicQos(prefetchSize: 0,
                  prefetchCount: 1,
                  global: false);

  

  上面的配置中,最關鍵的一個參數就是prefetchCount,當我們設置為1時,就是能夠實現均勻的分發。下麵分別對prefetchCount設置不同的值,來看看不同的效果:
  實例一:將prefetchCount設置為10,並生成3條歷史消息,然後同時打開兩個消費者,看看3條消息的分發消費情況:

  通過圖,我們得出,3條歷史消息全部推送給了一個消費者,這樣就導致了一個消費者累死,一個消費者閑的慌。
  實例二:將prefetchCount設置為1,並生成4條歷史消息,然後同時打開兩個消費者,看看3條消息的分發消費情況:


  通過圖,我們得出,4條歷史消息平均的分發給了兩個消費者,這也是我們想要的效果。
  所以在實際工作中,一定要註意這一個細節,不然有可能導致在伺服器重啟時,有的伺服器直接卡死現象。
  好了,時間不早了,今天就先寫到這,明天我們繼續分享後面的幾種模式。在分析完每一種模式後,我還好結合實際,封裝一個dll出來,供大家參考,到時候也會直接把源碼提出來。歡迎大家關註,持續交流。疫情無情,我們學習不能停。加油吧,每一個小伙伴!​

 

 

END
為了更高的交流,歡迎大家關註我的公眾號,掃描下麵二維碼即可關註,謝謝:

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 閱讀目錄 手工操作 —— 穿孔卡片 批處理 —— 磁帶存儲和批處理系統 多道程式系統 分時系統 實時系統 通用操作系統 操作系統的進一步發展 操作系統的作用 手工操作 —— 穿孔卡片 1946年第一臺電腦誕生 20世紀50年代中期,電腦工作還在採用手工操作方式。此時還沒有操作系統的概念。 程式員 ...
  • 講有監督學習的線性回歸。 線性回歸是利用數理統計中的回歸分析,來確定兩種或兩種以上變數間相互依賴的定量關係的一種統計分析方法。 只有一個自變數的回歸稱簡單回歸,大於一個變數的情況稱多元回歸。 用途:預測、分析變數與因變數關係的強度。 實例:對房屋尺寸與房價進行線性回歸,預測房價。 分析:數據可視化, ...
  • 1. 什麼事面向對象?主要特征是什麼? 面向對象是程式的一種設計方式,它利於提高程式的重用性,使程式結構更加清晰。主要特征:封裝、繼承、多態。 更多學習內容請訪問: 怎麼從一名碼農成為架構師的必看知識點:目錄大全(不定期更新) 2. SESSION 與 COOKIE的區別是什麼,請從協議,產生的原因 ...
  • 1.代碼 2.定義類 3.註釋 4.定義變數 5.聲明方法 6.常用數據類型 7.運算符 1. 算數運算符 | 操作符 | 名稱 | 描述 | | | | | | + | 加法 | 相加運算符兩側的值 | | – | 減法 | 左操作數減去右操作數 | | | 乘法 | 相乘操作符兩側的值 | | ...
  • 7 Python是如何進行記憶體管理的? http://developer.51cto.com/art/201007/213585.htm Python引用了一個記憶體池(memory pool)機制,即Pymalloc機制(malloc:n.分配記憶體),用於管理對小塊記憶體的申請和釋放 記憶體池(memo ...
  • 通過本文你將學習如何使用Spring Boot和WebSocket API開發一個簡單的群聊天應用。 WebSocket是HTML5開始提供的一種在單個TCP連接上進行全雙工通訊的協議。WebSocket使得客戶端和伺服器之間的數據交換變得更加簡單,允許伺服器主動向客戶端推送數據。在WebSocke ...
  • 項目背景 2019年新型冠狀病毒感染的肺炎疫情發生以來,牽動人心,舉國哀痛,口罩、酒精、消毒液奇貨可居。 搶不到口罩,怎麼辦?作為技術人今天分享如何使用Python實現自動戴口罩系統,來安慰自己,系統效果如下所示: 本系統的實現原理是藉助 Dlib模塊的Landmark人臉68個關鍵點檢測庫輕鬆識別 ...
  • 場景 業務場景:下單時扣減庫存,由於比較簡單,商品和庫存都放到了一個上下文中 學習多個上下文之間的交互,協作,訂單上下文生成訂單時,扣減商品上下文中的庫存 第一個方法 第一個方法的技術選型,Redis,WebApiClient,EF CORE, Polly, ExceptionLess,雪花演算法 把 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...