RabbitMQ延遲消息學習

来源:https://www.cnblogs.com/xieshuang/archive/2018/12/14/10121891.html
-Advertisement-
Play Games

準備做一個禁言自動解除的功能,立馬想到了訂單的超時自動解除,剛好最近在看RabbitMQ的實現,於是想用它實現,查詢了相關文檔發現確實可以實現,動手編寫了這篇短文。 準備工作 1、Erlang安裝請參考 "windows下安裝Erlang" 2、mq安裝晴參考 "RabbitMQ安裝" 3、延遲消息 ...


準備做一個禁言自動解除的功能,立馬想到了訂單的超時自動解除,剛好最近在看RabbitMQ的實現,於是想用它實現,查詢了相關文檔發現確實可以實現,動手編寫了這篇短文。

準備工作

1、Erlang安裝請參考windows下安裝Erlang
2、mq安裝晴參考RabbitMQ安裝
3、延遲消息插件安裝rabbitmq_delayed_message_exchange

    #插件下載地址(選擇與mq版本匹配的插件版本)
    http://www.rabbitmq.com/community-plugins.html
    #安裝命令如下(在安裝目錄sbin下執行如下命令)
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

創建項目

我選擇的是在springboot中集成RabbitMQ,配置相對簡單很多。

項目創建好後,在application.properties中加入RabbitMQ參數:

#RabbitMQ config
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#Custom config
rabbitmq.exchange=test_exchange
rabbitmq.queue=test_queue_1

定義ConnectionFactory和RabbitTemplate

    package com.xsh.mq.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
    private String host;
    private int port;
    private String userName;
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
        cachingConnectionFactory.setUsername(userName);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

Exchange和Queue配置

    package com.xsh.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置隊列
 */
@Configuration
public class QueueConfig {

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    @Value("${rabbitmq.queue}")
    private String queueName;

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
       //使用的是CustomExchange,不是DirectExchange,另外CustomExchange的類型必須是x-delayed-message
        return new CustomExchange(exchangeName, "x-delayed-message",true, false,args);
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue(queueName, true);
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange()).with(queueName).noargs();
    }
}

消息發送

    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageServiceImpl {

    /**
     * 日誌
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
    /**
     * rabbitMQ模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    /**
     * 發送消息
     * @param queueName 隊列名稱
     * @param msg 消息內容
     * @param delay 延遲時長 預設3秒
     */
    public void sendMsg(String queueName,String msg,Integer delay) {
        if(null == delay){
            delay = 3000;
        }
        logger.info("》》》》發送消息");
        Integer finalDelay = delay;
        rabbitTemplate.convertAndSend(exchangeName, queueName, msg, message -> {
            //必須添加header x-delay
            message.getMessageProperties().setHeader("x-delay", finalDelay);
            return message;
        });
    }
}

這裡發送消息我定義了一個延遲參數,傳入的延遲是多少,消息就延遲多少,方便消息延遲不一樣

消費消息

    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {
    /**
     * 日誌
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

    @RabbitListener(queues = "${rabbitmq.queue}")
    public void receive(String msg) {
        logger.info("收到消息:{}", msg);
    }
}

測試發送接收

先運行springboot項目,然後編寫單元測試用例

      package com.xsh.mq;

  import com.xsh.mq.service.MessageServiceImpl;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;

  @RunWith(SpringRunner.class)
  @SpringBootTest
  public class MqApplicationTests {

      @Test
      public void contextLoads() {
      }

      @Autowired
      private MessageServiceImpl messageService;

      @Value("${rabbitmq.queue}")
      private String queueName;

      @Test
      public void send() {
          messageService.sendMsg(queueName, "delayMsg2", 1000 * 60 * 2);
          messageService.sendMsg(queueName, "delayMsg1", 1000 * 60);
          messageService.sendMsg(queueName, "delayMsg3", 1000 * 60*3);
      }

  }

這裡我發送了三條延遲消息,控制台結果如圖:

消費者接收到的消息為:

從執行結果來看,demo基本實現,RabbitMQ其他細節還有待繼續看。
參考文章:Scheduling Messages with RabbitMQ


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • previewImage.js好用的圖片預覽縮放保存插件 ...
  • ES5
    一、數組API實際開發中forEach和map會多一些=>是es6語法中的arrow function舉例:(x) => x + 6相當於function(x){return x + 6;}undefined轉成數字是NaN判斷:arr.every():判斷arr中每個元素是否都符合要求只有每個元素 ...
  • $img_dir = ROOT_PATH . 'public/upload/card/' . $data['jt_id']; //創建合成圖片存放位置 //自動創建文件夾 if (!file_exists($img_dir)) { mkdir($img_dir, 0755, true); } ... ...
  • 初次翻譯,部分內容並非按字面翻譯,是按本人理解進行了內容重組。如有錯誤望指正。 如下是變數定義和賦值的示例 變數存儲的是一個引用地址。如上的變數name指向了一個值為Bob的String對象。通過var 定義變數是未明確指定類型的,由運行時VM自動推斷,你也可以明確指定類型,如下代碼 如果變數無法確 ...
  • 數據來源: R語言自帶 co2 數據集 分析工具:R 3.5.0 & Rstudio 1.1.453 本篇分析只是一個簡單的教程,不作深究 兩個視圖方便查看 ts:time series,時間序列 其中trend為長期趨勢,seasonal為周期性趨勢,random為隨機變化 從結果上看,是個非平穩 ...
  • 一、定義 CSS:層疊樣式表,用來美化頁面 二、書寫位置(即引入方式) 1,內嵌式,寫在head標簽下的style標簽內部 2,外聯式,同樣寫在head標簽內部,但是用的是link標簽,邏輯是寫在外部的CSS文件里的 3,行內式,寫在元素的style屬性中 三、語法結構 四、選擇器分類 1,標簽選擇 ...
  • 對於那些不熟悉函數式編程的人來說,基本的Java lambda語法起初可能有點令人生畏。但是,一旦將lambda表達式分解為它們的組成部分,語法很快就會變得有意義並變得非常自然。 Java中lambda表達式的目標是實現單個方法。所有Java方法都有一個參數列表和一個正文,因此毫不奇怪這兩個元素是J ...
  • 1. 在競賽中,題目:給定兩個整型數a,b,將其交換後輸出。 最優解法:(直接反序輸出) 2. 帶有與、或等操作的表達式,若判定結果已經確定,則不再進行運算,這種策略成為短路(short-circuit)。或許讀者認為,用短路的方法計算邏輯表達式唯一的優點是速度更快,但其實不是。 3. if 和 e ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...