.NET6使用RabbitMQ學習

来源:https://www.cnblogs.com/fantasy-ke/archive/2023/07/14/17555153.html
-Advertisement-
Play Games

# .NET6使用RabbitMQ學習 [TOC] ## 前提 前段時間上班無事,上網衝浪看到了消息隊列RabbitMQ,就想著學習一下,網上看了點資料在嗶哩嗶哩上看的到codeman講的一個rabbitmq的視頻,就跟著仔細學習一下,敲一下代碼。視頻地址: [rabbitmq視頻](【【2021最 ...


.NET6使用RabbitMQ學習

目錄

前提

前段時間上班無事,上網衝浪看到了消息隊列RabbitMQ,就想著學習一下,網上看了點資料在嗶哩嗶哩上看的到codeman講的一個rabbitmq的視頻,就跟著仔細學習一下,敲一下代碼。視頻地址: rabbitmq視頻

RabbitMq介紹

什麼是消息隊列

MQ全稱為Message Queue,即消息隊列。“消息隊列”是在消息的傳輸過程中保存消息的容器。它是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是非同步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。

image-20230713224815414

應用場景

削峰填谷

在一個時間段很多用戶同時進行請求我們的A系統,我的MQ容器就可以用來存儲請求按照每秒多少的請求進行發送,減輕伺服器的壓力。

image-20230713224326738

  • 使用了MQ之後,限制消息消費的速度為3000,這樣一來,高峰就被“削”掉了,但是因為消息積壓,在高峰期過後一段時間內,消費消息的速度還是會維持在3000,直到消費完擠壓的消息,這就叫做“填谷”。

  • 使用MQ後,可以提供系統穩定性。

image-20230713224647673

非同步提速

  • 在不使用MQ的情況下我們正常用戶通過訂單系統進行下單,我們需要900多ms,這就會出現用戶的體驗不好。

    image-20230713225203237

  • 在使用MQ的情況出現了總耗時只要25ms就給到了用戶回應

    這樣提升了用戶體驗感

    image-20230713225418594

所有的問題當你解決一個問題就會出現另外的問題,外部依賴多系統的穩定性就越差,MQ但凡掛了,系統就會出問題,後面就會使用mq集群來解決這一問題。

消息模型

點對點模式

image-20230713230146477

image-20230714205804589

在上圖的模型中,有以下概念:

  • Producer:生產者,也就是要發送消息的程式
  • Consumer:消費者:消息的接受者,會一直等待消息到來。
  • Queue:消息隊列。可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
  • 點對點模式只會有一個消費者進行消費
代碼附上

image-20230713231805945

新增兩個項目一個生產者 Z.RabbitMq.Producer,一個消費者Z.RabbitMQ.Consumer01

  1. 項目 Z.RabbitMq.Producer新增HelloProducer
  • public class HelloProducer
        {
            public static void HelloWorldShow()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "127.0.0.1";
                factory.Port = 5672;
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "my_vhost";
    
                // 獲取TCP 長連接
                using (var connection = factory.CreateConnection())
                {
                    // 創建通信“通道”,相當於TCP中的虛擬連接
                    using (var channel = connection.CreateModel())
                    {
                        /*
                         * 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列
                         * 第一個參數:隊列名稱ID
                         * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失
                         * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
                         * 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列
                         * 其他額外參數為null
                         */
                        channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
                        Console.ForegroundColor = ConsoleColor.Red;
                        string message = "hello CodeMan 666";
                        var body = Encoding.UTF8.GetBytes(message);
    
                        /*
                         * exchange:交換機,暫時用不到,在進行發佈訂閱時才會用到
                         * 路由key
                         * 額外的設置屬性
                         * 最後一個參數是要傳遞的消息位元組數組
                         */
                        channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body);
                        Console.WriteLine($"producer消息:{message}已發送");
                    }
                }
            }
        }
    
  1. 項目 Z.RabbitMQ.Consumer01新增HelloConsumer
  • public class HelloConsumer
        {
            public static void HelloWorldShow()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "127.0.0.1";
                factory.Port = 5672;//5672是RabbitMQ預設的埠號
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "my_vhost";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        /*
                         * 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列
                         * 第一個參數:隊列名稱ID
                         * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失
                         * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
                         * 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列
                         * 其他額外參數為null
                         */
                        //RabbitConstant.QUEUE_HELLO_WORLD 對應的生產者一樣名稱 "helloworld.queue"
                        channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
                        Console.ForegroundColor = ConsoleColor.Cyan;
    
                        EventingBasicConsumer consumers = new EventingBasicConsumer(channel);
                        // 觸發事件
                        consumers.Received += (model, ea) =>
                        {
                            var body = ea.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
    
                            // false只是確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息
                            channel.BasicAck(ea.DeliveryTag, false);
                            Console.WriteLine($"Consumer01接收消息:{message}");
                        };
                        /*
                         * 從MQ伺服器中獲取數據
                         * 創建一個消息消費者
                         * 第一個參數:隊列名
                         * 第二個參數:是否自動確認收到消息,false代表手動確認消息,這是MQ推薦的做法
                         * 第三個參數:要傳入的IBasicConsumer介面
                         *
                         */
                        
                        //RabbitConstant.QUEUE_HELLO_WORLD ==  helloworld.queue
                        channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers);
                        Console.WriteLine("Press [Enter] to exit");
                        Console.Read();
                    }
                }
            }
        }
    

work消息模型

工作隊列或者競爭消費者模式

image-20230714205741999

work queues與入門程式相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息,但是一個消息只能被一個消費者獲取。

接下來我們來模擬這個流程:

P:生產者:任務的發佈者

C1:消費者1:領取任務並且完成任務,假設完成速度較慢(模擬耗時)

C2:消費者2:領取任務並且完成任務,假設完成速度較快

代碼附上

新增一個工具類用來獲取rabbitmq的連接信息

public class RabbitUtils
{
    public static ConnectionFactory GetConnection()
    {
        var factory = new ConnectionFactory();
        factory.HostName = "127.0.0.1";
        factory.Port = 5672;//5672是RabbitMQ預設的埠號
        factory.UserName = "admin";
        factory.Password = "admin";
        factory.VirtualHost = "my_vhost";
        return factory;
    }
}
  • 消費者1(C1)在剛剛的 Z.RabbitMQ.Consumer01新增SmsReceive

    Program.cs中的main函數中進行調用 SmsReceive.Sender();

    消費者1 延遲30ms接受到信息

    public class SmsReceive
    {
      public static void Sender()
      {
          //使用工具類創建連接
          var connection = RabbitUtils.GetConnection().CreateConnection();
    
          var channel = connection.CreateModel();
    
          channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
          // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
          // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息後(確認後),在從隊列中獲取一個新的
          channel.BasicQos(0, 1, false);
    
          var consumer = new EventingBasicConsumer(channel);
    
          consumer.Received += (model, ea) =>
          {
              var body = ea.Body.ToArray();
              var message = Encoding.UTF8.GetString(body);
              Thread.Sleep(30);
              Console.WriteLine($"SmsSender-發送簡訊成功:{message}");
              channel.BasicAck(ea.DeliveryTag, false);
          };
    
          channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
          Console.WriteLine("Press [Enter] to exit");
          Console.Read();
      }
    }
    
  • 消費者2(C2)在剛剛的 Z.RabbitMQ.Consumer02新增SmsReceive

    image-20230714210550189

    消費者1 延遲60ms接受到信息

    public class SmsReceive
    {
        public static void Sender()
        {
            var connection = RabbitUtils.GetConnection().CreateConnection();
            var channel = connection.CreateModel();
    
            channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
            // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
            // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息後(確認後),在從隊列中獲取一個新的
            channel.BasicQos(0, 1, false);//處理完一個取一個
    
            var consumer = new EventingBasicConsumer(channel);
    
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Thread.Sleep(60);
                Console.WriteLine($"SmsSender-發送簡訊成功:{message}");
                channel.BasicAck(ea.DeliveryTag, false);
            };
    
            channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.Read();
        }
    }
    
  • 生產者Z.RabbitMq.Producer中創建SmsSender類在main函數進行調用

    • 發送100條車票訂閱的消息
    public class SmsSender
    {
        public static void Sender()
        {
            using (var connection = RabbitUtils.GetConnection().CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
                    for (int i = 0; i < 100; i++)
                    {
                        Sms sms = new Sms("乘客" + i, "139000000" + i, "您的車票已預定成功");
                        string jsonSms = JsonConvert.SerializeObject(sms);
                        var body = Encoding.UTF8.GetBytes(jsonSms);
                        channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body);
                        Console.WriteLine($"正在發送內容:{jsonSms}");
                    }
                    Console.WriteLine("發送數據成功");
                }
            }
        }
    }
    

運行結構如下

image-20230714211516566

能者多勞
  • 消費者1比消費者2的效率要快,一次任務的耗時較短
  • 消費者2大量時間處於空閑狀態,消費者1一直忙碌

通過channel.BasicAck(ea.DeliveryTag, false);來完成能者多勞的效果,在完成上一次請求之後再去取下一條消息,這就會出現伺服器快的消費的更多,慢的消費的更少。

發佈訂閱模式

Publish/subscribe(交換機類型:Fanout,也稱為廣播 )

image-20230714204841106

image-20230714212152060

和前面兩種模式不同:

  • 聲明Exchange,不再聲明Queue
  • 發送消息到Exchange,不再發送到Queue,通過exchange發送到queue上
消費者1收到的天氣

項目.RabbitMq.Consumer01 創建WeatherFanout使用exchange(交換機)

public class WeatherFanout
{
    public static void Weather()
    {
        using (var connection = RabbitUtils.GetConnection().CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
                // 聲明隊列信息
                channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
                /*
                         * queueBind 用於將隊列與交換機綁定
                         * 參數1:隊列名
                         * 參數2:交換機名
                         * 參數3:路由Key(暫時用不到)
                         */
                channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += ((model, ea) =>
                                      {
                                          var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                          Console.WriteLine($"百度收到的氣象信息:{message}");
                                          channel.BasicAck(ea.DeliveryTag, false);
                                      });

                channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
                Console.WriteLine("Press [Enter] to exit");
                Console.Read();
            }
        }
    }
}
消費者2收到的天氣

項目.RabbitMq.Consumer02 創建WeatherFanout使用exchange(交換機)

代碼與消費者01一樣

生產者發送天氣

生產者把消息推送到交換機上

public class WeatherFanout
{
    public static void Weather()
    {
        using (var connection = RabbitUtils.GetConnection().CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string message = "20度";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body);
                Console.WriteLine("天氣信息發送成功!");
            }
        }
    }
}

最後得到效果

image-20230714213059332

Routing 路由模型

image-20230714213633197

P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。

X:Exchange(交換機),接收生產者的消息,然後把消息遞交給 與routing key完全匹配的隊列

C1:消費者,其所在隊列指定了需要routing key 為 error 的消息

C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

  • 隊列與交換機的綁定,不能是任意綁定,而是要指定一個RoutingKey

  • 消息的發送方在向Exchange發送消息時,也必須指定消息的RoutingKey

  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的RoutingKey進行判斷,只有隊列的RoutingKey與消息的RoutingKey完全一致,才會接收消息

生產者
 public class WeatherDirect
 {
     public static void Weather()
     {
         Dictionary<string, string> area = new Dictionary<string, string>();
         area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
         area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
         area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
         area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");

         using (var connection = RabbitUtils.GetConnection().CreateConnection())
         {
             using (var channel = connection.CreateModel())
             {
                 foreach (var item in area)
                 {
                     channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
                                          null, Encoding.UTF8.GetBytes(item.Value));
                 }

                 Console.WriteLine("氣象信息發送成功!");
             }
         }
     }
 }
消費者1

接受百度路由的路由消息

public class WeatherDirect
{
    public static void Weather()
    {
        using (var connection = RabbitUtils.GetConnection().CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
                channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
                /*
                    * queueBind 用於將隊列與交換機綁定
                    * 參數1:隊列名
                    * 參數2:交換機名
                    * 參數3:路由Key(暫時用不到)
                    */
                channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525");
                channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += ((model, ea) =>
                                      {
                                          var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                          Console.WriteLine($"百度收到的氣象信息:{message}");
                                          channel.BasicAck(ea.DeliveryTag, false);
                                      });

                channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
                Console.WriteLine("Press [Enter] to exit");
                Console.Read();
            }
        }
    }
}
消費者2

接受新浪的路由信息

public class WeatherDirect
{
    public static void Weather()
    {
        using (var connection = RabbitUtils.GetConnection().CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
                // 聲明隊列信息
                channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
                /*
                     * queueBind 用於將隊列與交換機綁定
                     * 參數1:隊列名
                     * 參數2:交換機名
                     * 參數3:路由Key
                     */
                channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525");
                channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525");
                channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += ((model, ea) =>
                                      {
                                          var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                          Console.WriteLine($"新浪收到的氣象信息:{message}");
                                          channel.BasicAck(ea.DeliveryTag, false);
                                      });

                channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
                Console.WriteLine("Press [Enter] to exit");
                Console.Read();
            }
        }
    }
}
最後得到的效果
  • 新浪接收對應新浪的routingkey的信息
  • 百度接收對應百度的routingkey的信息

image-20230714214750549

Topics 通配符模式

image-20230714215026047

routingkey支持通配符匹配格式
  • 通配符格式
    • Topic類型與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過
    • Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符
    • RoutingKey一般都是由一個或多個單片語成,多個單詞之間以“.”分隔,例如:item.insert
    • 通配符規則:#匹配一個或多個詞,*恰好匹配一個詞,例如item.#能夠匹配item.insert.user或者item.insert,item.只能匹配item.insert或者item.user
生產者
public class WeatherTopic
{
    public static void Weather()
    {
        Dictionary<string, string> area = new Dictionary<string, string>();
        area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
        area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
        area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
        area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");

        using (var connection = RabbitUtils.GetConnection().CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                foreach (var item in area)
                {
                    channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
                                         null, Encoding.UTF8.GetBytes(item.Value));
                }

                Console.WriteLine("氣象信息發送成功!");
            }
        }
    }
}
消費者1

獲取交換機中通配符為china.#的信息

  • ("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
  • ("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
  • ("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
public class WeatherTopic
{
    public static void Weather()
    {
        using (var connection = RabbitUtils.GetConnection().CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
                // 聲明隊列信息
                channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
                /*
                     * queueBind 用於將隊列與交換機綁定
                     * 參數1:隊列名
                     * 參數2:交換機名
                     * 參數3:路由Key(暫時用不到)
                     */
                channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += ((model, ea) =>
                                      {
                                          var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                          Console.WriteLine($"百度收到的氣象信息:{message}");
                                          channel.BasicAck(ea.DeliveryTag, false);
                                      });

                channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
                Console.WriteLine("Press [Enter] to exit");
                Console.Read();
            }
        }
    }
}
消費者2

獲取交換機中通配符為china.hubei.*.20210525的信息

  • ("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據")
  • ("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據")
public class WeatherTopic
{
    public static void Weather()
    {
        using (var connection = RabbitUtils.GetConnection().CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                /*
                     * 生產者發送消息
                     * 隊列名稱
                     * 交換機名稱
                     * 路由key
                     *
                     */
                channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
                // 聲明隊列信息
                channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
                /*
                     * queueBind 用於將隊列與交換機綁定
                     * 參數1:隊列名
                     * 參數2:交換機名
                     * 參數3:路由Key(暫時用不到)
                     */
                channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += ((model, ea) =>
                                      {
                                          var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                          Console.WriteLine($"新浪收到的氣象信息:{message}");
                                          channel.BasicAck(ea.DeliveryTag, false);
                                      });

                channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
                Console.WriteLine("Press [Enter] to exit");
                Console.Read();
            }
        }
    }
}
最後得到的效果
  • 百度獲取china.#的信息
  • 新浪獲取china.hubei.*.20210525的信息

image-20230714220155754

RPC

image-20230714220336915

基本概念:
  • Callback queue 回調隊列,客戶端向伺服器發送請求,伺服器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那麼客戶在向伺服器發送請求時,同時發送一個回調隊列地址reply_to。

  • Correlation id 關聯標識,客戶端可能會發送多個請求給伺服器,當伺服器處理完後,客戶端無法辨別在回調隊列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在發送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id欄位的值就可以分辨此響應屬於哪個請求。

流程說明:
  • 當客戶端啟動的時候,它創建一個匿名獨享的回調隊列。
  • 在 RPC 請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另一個是設置唯一值的 correlation_id 屬性。
  • 將請求發送到一個 rpc_queue 隊列中。
  • 伺服器等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工作並且將帶有執行結果的消息發送給 reply_to 欄位指定的隊列。
  • 客戶端等待回調隊列里的數據。當有消息出現的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應用

分享幾題面試題

RabbitMQ中消息可能有的幾種狀態?

  1. alpha: 消息內容(包括消息體、屬性和 headers) 和消息索引都存儲在記憶體中 。

    1. beta: 消息內容保存在磁碟中,消息索引保存在記憶體中。
    2. gamma: 消息內容保存在磁碟中,消息索引在磁碟和記憶體中都有 。
    3. delta: 消息內容和索引都在磁碟中 。
  2. 死信隊列?

    DLX,全稱為 Dead-Letter-Exchange,死信交換器,死信郵箱。當消息在一個隊列中變成死信 (dead message) 之後,它能被重新被髮送到另一個交換器中,這個交換器就是 DLX,綁定 DLX 的隊列就稱之 為死信隊列。

  3. 導致的死信的幾種原因?

    1. 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
    2. 消息TTL過期。
    3. 隊列滿了

到這裡就結束,大家如果需要看視頻學習就是點最上面的鏈接就行了

想要源碼的可以加QQ群831181779 @做夢達人


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • python的開發工具有很多款,很多都是非常好用的,其中vscode作為其中一款Python的開發工具,是非常輕量級的,今天我們來介紹一下vs code的下載與安裝。 # vscode的下載與安裝 首先需要到vscode的官網,這個谷歌或者百度一下就可以搜到,然後根據你的系統下載你對應的版本,我這裡 ...
  • 第三方鏡像是在Docker Hub或其他容器註冊表上提供的預構建Docker容器鏡像。這些鏡像由個人或組織創建和維護,可以作為您容器化應用程式的起點。 ### 查找第三方鏡像 [**Docker Hub**](https://hub.docker.com/) 是最大和最受歡迎的容器鏡像註冊表,包含官 ...
  • ### 歡迎訪問我的GitHub > 這裡分類和彙總了欣宸的全部原創(含配套源碼):[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) ### 本篇概覽 - 本文是《Java擴展Nginx》系列的第 ...
  • # Scala基礎篇 ## 數據類型 下表中列出的數據類型都是對象,可以直接對它們調用方法。 | 數據類型 | 描述 | | | | | Byte | 8位有符號補碼整數。數值區間為 -128 到 127 | | Short | 16位有符號補碼整數。數值區間為 -32768 到 32767 | | ...
  • 在Revit自帶的導出功能中,我們可以知道,Revit可以導出如下格式文件: 他們分別對應的API在Document類下麵,主要包含以下方法 1 Export(String, String, MassGBXMLExportOptions) 從體量模型文檔中導出gbXML文件。 2 Export(St ...
  • 上次老周扯了有關主、從實體的話題,本篇咱們再挖一下,主、從實體之間建立的關係,跟咱們常用的一對一、一對多這些關係之間有什麼不同。 先看看咱們從學習資料庫開始就特熟悉的常用關係——多對多、一對一、一對多說起。數據實體之間會建立什麼樣的關係,並不是規則性的,而是要看數據的功能。比如你家養的狗狗和水果(你 ...
  • Redis是一個開源的、高性能的、基於記憶體的鍵值資料庫,它支持多種數據結構,如字元串、列表、集合、散列、有序集合等。其中,Redis的散列(Hash)結構是一個常用的結構,今天跟大家分享一個我的日常操作,如何使用Redis的散列(Hash)結構來緩存和查詢對象的屬性值,以及如何用Lambda表達式樹 ...
  • # 使用Back推送消息到你的iPhone # 前言 我的好友看了我的博客,給我提了個需求,讓我搞個網站通知,我開始以為就是評論回覆然後發送郵件通知。不過他告訴我網站通知是,當有人評論或者留言後,會通知到我這邊來,消息是實時通知的,他說用的是Back,不需要發郵件,然後發了個GitHub鏈接給我,我 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...