RabbitMQ-Direct模式

来源:https://www.cnblogs.com/zhengyazhao/archive/2019/05/15/10869982.html
-Advertisement-
Play Games

簡介 RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息中間件,基於Erlang語言編寫。 P:(producling)生產者,生產只意味著發送消息。 Q: (queue_name)隊列,隊列是位於rabbitmq中的post box的名稱 C: (Consuming)消費者,消費者主要 ...


簡介

 RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息中間件,基於Erlang語言編寫。

AMQP是什麼

AMQP 0-9-1(高級消息隊列協議)是一種消息傳遞協議,它允許一致的客戶端應用程式與一致的消息傳遞中間件代理進行通信。

消息傳遞代理接收來自發佈者(發佈它們的應用程式,也稱為生產者)的消息,並將它們路由到消費者(處理它們的應用程式)。

 由於它是一個網路協議,發佈者、消費者和代理都可以駐留在不同的機器上。

 AMQP 0-9-1模型簡介

AMQP 0-9-1模型具有以下世界視圖:消息發佈到交換,這通常與郵局或郵箱進行比較。交換然後使用名為綁定的規則將消息副本分發到隊列。然後,代理將消息傳遞給訂閱隊列的消費者,或者消費者根據需要從隊列獲取/拉取消息。

 

發佈消息時,發佈者可以指定各種消息屬性(消息元數據)。有些元數據可以由代理使用,但是,其餘的元數據對代理是完全不透明的,只能由接收消息的應用程式使用。

 網路不可靠,應用程式可能無法處理消息,因此AMQP 0-9-1模型具有消息確認的概念:當消息傳遞給消費者時,消費者會自動或在應用程式開發人員選擇時立即通知代理。當消息確認正在使用時,代理將僅在收到消息(或消息組)通知時從隊列中完全刪除消息。

 例如,在某些情況下,當消息無法路由時,消息可能會返回給發佈者、丟棄,或者,如果代理實現擴展,則將消息放入所謂的“死信隊列”。發佈者通過使用某些參數發佈消息來選擇如何處理這種情況。

 隊列、交換和綁定統稱為AMQP實體。

 

交換和交換類型

交換機是發送消息的實體,交換機接收消息並將消息路由到零個或者多個隊列當中,使用的路由演算法取決於綁定的交換類型和規則,因此AMQP 0-9-1提供了以下四種交換類型:

  • Direct exchange    
  • Fanout exchange  
  • Topic exchange      
  • Headers exchang    

除了交換類型之外,還使用許多屬性聲明交換其中最重要的是:

  • 耐久性(Durability)       :交易所在經紀人重啟後仍能存活
  • 自動刪除(Auto-delete):當最後一個隊列與其解除綁定時,將刪除Exchange
  • 參數(arguments)         :可選,由插件和特定於代理的功能使用

交換可以是持久的,也可以是暫時的。持久性交易所能在經紀重啟後存活下來,而短暫性交易所則不能(當經紀重新上線時,必須重新申報)。並非所有的場景和用例都需要持久的交換。

 

本文主要記錄了Direct模式學習RabbitMQ

 

P:(producling)生產者,生產只意味著發送消息。

 

Q: (queue_name)隊列,隊列是位於rabbitmq中的post box的名稱

 

C: (Consuming)消費者,消費者主要是等待接收消息的程式

 

 

開發準備

  • netCoreTset.core:該工程主要封裝了RabbitMQ的公用方法
  • RabbitMQClient    :該工程為生產者
  • RabbitMQServer  :該工程為消費者

 

1.創建netCoreTset.core類庫項目

 

1.1 安裝項目依賴

 

2.定義介面

using netCoreTest.core.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
    public interface IConnectionServer
    {
      
        /// <summary>
        /// 連接服務
        /// </summary>
        void Connection();
        /// <summary>
        /// 創建消息隊列
        /// </summary>
        /// <param name="queName">隊列名稱</param>
        void CreateQueueDir();
        /// <summary>
        /// 關閉連接
        /// </summary>
        void CloseConnection();
        /// <summary>
        /// 關閉通道
        /// </summary>
        void CloseChannel();


    }
}
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
    public interface IMessageService
    {
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="msg">消息內容</param>
        void SendMsg(string msg);
        /// <summary>
        /// 獲取消息
        /// </summary>
        /// <returns></returns>
        string GetMsg();
    }
}
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
   public interface IRabbitMqService:IMessageService,IConnectionServer
    {
    }
}

 

 3.編寫RabbitMQ輔助類

using netCoreTest.core.Iserver;
using netCoreTest.core.Model;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core
{
    public class RabbitMQModel : IRabbitMqService
    {

        private readonly ConnectionFactory factory = null;
        private IModel channel;
        private IConnection connetction;
        readonly string exchangeName;//交換機名稱
        readonly string routeKey;//路由名稱
        readonly string queueName;///隊列名稱
        public RabbitMQModel(HostModel model)
        {
            /// <summary>
            /// 創建連接工廠
            /// </summary>
            factory = new ConnectionFactory
            {
                UserName = model.UserName,
                Password = model.PassWord,
                HostName = "localhost",
                Port = model.Port,
            };
            exchangeName = model.ExChangeModel.ExChangeName;
            routeKey = model.ExChangeModel.RouteKey;
            queueName = model.ExChangeModel.QueueName;
        }
        /// <summary>
        /// 創建連接
        /// </summary>
        public void Connection()
        {
            try
            {
                //創建連接
                connetction = factory.CreateConnection();
                //創建通道
                channel = connetction.CreateModel();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        public void CreateQueueDir()
        {
            //定義一個direct類型的交換機
            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
            //定義一個隊列
            channel.QueueDeclare(queueName, false, false, false, null);
            //將隊列綁定交換機
            channel.QueueBind(queueName, exchangeName, routeKey, null);
        }public void SendMsg(string msg)
        {
            var sendBytes = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
        }

        public void CloseChannel()
        {
            channel.Close();
        }

        public void CloseConnection()
        {
            connetction.Close();
        }

        public string GetMsg()
        {
            //事件基本消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            string msg = null;
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                msg = message;
                Console.WriteLine($"收到消息: {message}");
                //確認該消息已被消費
                channel.BasicAck(ea.DeliveryTag, false);
            };
            //啟動消費者 設置為手動應答消息
            channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine("消費者已啟動");
            Console.ReadKey();
            CloseConnection();
            CloseChannel();
            return msg;
        }


    }
}

4.創建direct模式發送類

using netCoreTest.core.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.ExchangeTypeModel
{

    /// <summary>
    /// Direct模式發送
    /// </summary>
    public class DirectPost
    {


        RabbitMQModel rabbitMQModel;

        public DirectPost()
        {
            HostModel hostModel = new HostModel();
            hostModel.UserName = "admin";
            hostModel.PassWord = "admin";
            hostModel.Host = "127.0.0.1";
            hostModel.Port = 5672;
            hostModel.ExChangeModel =new ExChangeModel {
                ExChangeName = "ClentName",
                QueueName = "Clent",
                RouteKey = "ClentRoute"
            };
            rabbitMQModel = new RabbitMQModel(hostModel);
            rabbitMQModel.Connection();

        }
        public void CreateQueue()
        {
            rabbitMQModel.CreateQueueDir();
        }
        public void SendMsg(string msg)
        {
            rabbitMQModel.SendMsg(msg);
        }
        public void GetMsg()
        {
            rabbitMQModel.GetMsg();
        }
    }
}

5.創建RabbitMQClient控制台應用程式

 

 

using netCoreTest.core;
using netCoreTest.core.ExchangeTypeModel;
using netCoreTest.core.Model;
using RabbitMQ.Client;
using System;

namespace RabbitMQClient
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("消息生產者開始生產數據!");
            Console.WriteLine("輸入exit退出!");
            DirectPost directPost = new DirectPost();
            directPost.CreateQueue();
            string input;
           
            do
            {
                input = Console.ReadLine();
                directPost.SendMsg(input);

            } while (input.Trim().ToLower() != "exit");


        }
    }
}

6.創建RabbitMQService控制台應用程式

using netCoreTest.core;
using netCoreTest.core.ExchangeTypeModel;
using netCoreTest.core.Model;
using System;
using System.Text;

namespace RabbitMQServer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");

            DirectPost directPost = new DirectPost();
            directPost.GetMsg();
        

        }
    }
}

7.執行RabbitMQclient和RabbitMQserver

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Ocelot(二) 請求聚合與負載均衡 作者:markjiang7m2 原文地址: 源碼地址:https://gitee.com/Sevenm2/OcelotDemo 在上一篇Ocelot的文章中,我已經給大家介紹了何為Ocelot以及如何簡單使用它的路由功能,如果你還沒有不瞭解Ocelot為何物, ...
  • 在C#開發過程中字元串String類處理過程中,有時字元串長度不夠時,需要在左側指定特定的字元來補足字元串長度,此時可以使用String類下的PadLeft方法對字元串的左邊進行按特定的字元和特定的長度進行補足。MSDN上對PadLeft函數的解釋是:返回指定長度的新字元串,其中當前字元串的開頭用空 ...
  • 在C#開發過程中字元串String類處理過程中,有時字元串長度不夠時,需要在右側側指定特定的字元來補足字元串長度,此時可以使用String類下的PadRight方法對字元串結尾按特定的字元補足位數。MSDN上對PadRight函數的解釋是:返回指定長度的新字元串,其中當前字元串的末尾用空格或指定的U ...
  • C# explicit interface implementation 某個類要實現兩個包含相同方法名的介面, 應該如何實現這兩個方法? 以上Demo中共有3個擁有相同方法名的interface,Program類繼承了這三個介面,使用explicit interface implementatio ...
  • 遇到的錯誤:The destination storage credentials must contain the account key credentials,參數名: destinationStorageCredentials 解決方法:AccountName與AccountKey參數值錯誤 ...
  • 需求 某航空公司物流單信息查詢,是一個post請求。通過後臺模擬POST HTTP請求發現無法獲取頁面數據,通過查看航空公司網站後,發現網站使用避免CSRF攻擊機制,直接發揮40X錯誤。 關於CSRF 讀者自行百度 網站HTTP請求分析 Headers Form Data 在head里包含了cook ...
  • 前面提到了微信功能變數名稱的封禁原理和檢測手段以及一些基本防範的方案。詳見《微信功能變數名稱檢測、防封,微信跳轉技術揭秘(一) -- 功能變數名稱檢測原理及防封方案》 要想讓地址在微信里不被封禁,或者說 儘可能的存活時間久一點,那麼需要註意以下幾點: 1、微信里的入口功能變數名稱最好用備案功能變數名稱。並且要加白名單2、不要QQ和微信同時 ...
  • 1、首先新建一個類庫,然後通過NuGet安裝下麵三個包 2、然後在程式包管理器控制臺中運行以下代碼(ps:記得預設項目選擇剛纔新建的項目,同時設置為啟動項) 3、如果你已經有資料庫表的話,接下系統會自動生成對應表的Model層 其中XXXContext中OnConfiguring方法,寫有資料庫連接 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...