RabbitMQ延遲消息學習

来源:https://www.cnblogs.com/xieshuang/archive/2018/12/14/10121891.html
-Advertisement-
Play Games

準備做一個禁言自動解除的功能,立馬想到了訂單的超時自動解除,剛好最近在看RabbitMQ的實現,於是想用它實現,查詢了相關文檔發現確實可以實現,動手編寫了這篇短文。 準備工作 1、Erlang安裝請參考 "windows下安裝Erlang" 2、mq安裝晴參考 "RabbitMQ安裝" 3、延遲消息 ...


準備做一個禁言自動解除的功能,立馬想到了訂單的超時自動解除,剛好最近在看RabbitMQ的實現,於是想用它實現,查詢了相關文檔發現確實可以實現,動手編寫了這篇短文。

準備工作

1、Erlang安裝請參考windows下安裝Erlang
2、mq安裝晴參考RabbitMQ安裝
3、延遲消息插件安裝rabbitmq_delayed_message_exchange

    #插件下載地址(選擇與mq版本匹配的插件版本)
    http://www.rabbitmq.com/community-plugins.html
    #安裝命令如下(在安裝目錄sbin下執行如下命令)
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

創建項目

我選擇的是在springboot中集成RabbitMQ,配置相對簡單很多。

項目創建好後,在application.properties中加入RabbitMQ參數:

#RabbitMQ config
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#Custom config
rabbitmq.exchange=test_exchange
rabbitmq.queue=test_queue_1

定義ConnectionFactory和RabbitTemplate

    package com.xsh.mq.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
    private String host;
    private int port;
    private String userName;
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
        cachingConnectionFactory.setUsername(userName);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

Exchange和Queue配置

    package com.xsh.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置隊列
 */
@Configuration
public class QueueConfig {

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    @Value("${rabbitmq.queue}")
    private String queueName;

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
       //使用的是CustomExchange,不是DirectExchange,另外CustomExchange的類型必須是x-delayed-message
        return new CustomExchange(exchangeName, "x-delayed-message",true, false,args);
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue(queueName, true);
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange()).with(queueName).noargs();
    }
}

消息發送

    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageServiceImpl {

    /**
     * 日誌
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
    /**
     * rabbitMQ模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    /**
     * 發送消息
     * @param queueName 隊列名稱
     * @param msg 消息內容
     * @param delay 延遲時長 預設3秒
     */
    public void sendMsg(String queueName,String msg,Integer delay) {
        if(null == delay){
            delay = 3000;
        }
        logger.info("》》》》發送消息");
        Integer finalDelay = delay;
        rabbitTemplate.convertAndSend(exchangeName, queueName, msg, message -> {
            //必須添加header x-delay
            message.getMessageProperties().setHeader("x-delay", finalDelay);
            return message;
        });
    }
}

這裡發送消息我定義了一個延遲參數,傳入的延遲是多少,消息就延遲多少,方便消息延遲不一樣

消費消息

    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {
    /**
     * 日誌
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

    @RabbitListener(queues = "${rabbitmq.queue}")
    public void receive(String msg) {
        logger.info("收到消息:{}", msg);
    }
}

測試發送接收

先運行springboot項目,然後編寫單元測試用例

      package com.xsh.mq;

  import com.xsh.mq.service.MessageServiceImpl;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;

  @RunWith(SpringRunner.class)
  @SpringBootTest
  public class MqApplicationTests {

      @Test
      public void contextLoads() {
      }

      @Autowired
      private MessageServiceImpl messageService;

      @Value("${rabbitmq.queue}")
      private String queueName;

      @Test
      public void send() {
          messageService.sendMsg(queueName, "delayMsg2", 1000 * 60 * 2);
          messageService.sendMsg(queueName, "delayMsg1", 1000 * 60);
          messageService.sendMsg(queueName, "delayMsg3", 1000 * 60*3);
      }

  }

這裡我發送了三條延遲消息,控制台結果如圖:

消費者接收到的消息為:

從執行結果來看,demo基本實現,RabbitMQ其他細節還有待繼續看。
參考文章:Scheduling Messages with RabbitMQ


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • previewImage.js好用的圖片預覽縮放保存插件 ...
  • ES5
    一、數組API實際開發中forEach和map會多一些=>是es6語法中的arrow function舉例:(x) => x + 6相當於function(x){return x + 6;}undefined轉成數字是NaN判斷:arr.every():判斷arr中每個元素是否都符合要求只有每個元素 ...
  • $img_dir = ROOT_PATH . 'public/upload/card/' . $data['jt_id']; //創建合成圖片存放位置 //自動創建文件夾 if (!file_exists($img_dir)) { mkdir($img_dir, 0755, true); } ... ...
  • 初次翻譯,部分內容並非按字面翻譯,是按本人理解進行了內容重組。如有錯誤望指正。 如下是變數定義和賦值的示例 變數存儲的是一個引用地址。如上的變數name指向了一個值為Bob的String對象。通過var 定義變數是未明確指定類型的,由運行時VM自動推斷,你也可以明確指定類型,如下代碼 如果變數無法確 ...
  • 數據來源: R語言自帶 co2 數據集 分析工具:R 3.5.0 & Rstudio 1.1.453 本篇分析只是一個簡單的教程,不作深究 兩個視圖方便查看 ts:time series,時間序列 其中trend為長期趨勢,seasonal為周期性趨勢,random為隨機變化 從結果上看,是個非平穩 ...
  • 一、定義 CSS:層疊樣式表,用來美化頁面 二、書寫位置(即引入方式) 1,內嵌式,寫在head標簽下的style標簽內部 2,外聯式,同樣寫在head標簽內部,但是用的是link標簽,邏輯是寫在外部的CSS文件里的 3,行內式,寫在元素的style屬性中 三、語法結構 四、選擇器分類 1,標簽選擇 ...
  • 對於那些不熟悉函數式編程的人來說,基本的Java lambda語法起初可能有點令人生畏。但是,一旦將lambda表達式分解為它們的組成部分,語法很快就會變得有意義並變得非常自然。 Java中lambda表達式的目標是實現單個方法。所有Java方法都有一個參數列表和一個正文,因此毫不奇怪這兩個元素是J ...
  • 1. 在競賽中,題目:給定兩個整型數a,b,將其交換後輸出。 最優解法:(直接反序輸出) 2. 帶有與、或等操作的表達式,若判定結果已經確定,則不再進行運算,這種策略成為短路(short-circuit)。或許讀者認為,用短路的方法計算邏輯表達式唯一的優點是速度更快,但其實不是。 3. if 和 e ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...