SpringBoot之ActiveMQ實現延遲消息

来源:https://www.cnblogs.com/muyl/archive/2019/11/12/11845882.html
-Advertisement-
Play Games

一、安裝activeMQ ​ 安裝步驟參照網上教程,本文不做介紹 二、修改activeMQ配置文件 ​ broker新增配置信息 schedulerSupport="true" 三、創建SpringBoot工程 ]() 1. 配置ActiveMQ工廠信息,信任包必須配置否則會報錯 2. 消息實體類 ...


一、安裝activeMQ

​ 安裝步驟參照網上教程,本文不做介紹

二、修改activeMQ配置文件

​ broker新增配置信息 schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

三、創建SpringBoot工程

file

  1. 配置ActiveMQ工廠信息,信任包必須配置否則會報錯
package com.example.demoactivemq.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/**
 * @author shanks on 2019-11-12
 */
@Configuration
public class ActiveMqConfig {

    @Bean
    public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        // 設置信任序列化包集合
        List<String> models = new ArrayList<>();
        models.add("com.example.demoactivemq.domain");
        factory.setTrustedPackages(models);

        return factory;
    }

}
  1. 消息實體類
package com.example.demoactivemq.domain;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * @author shanks on 2019-11-12
 */

@Builder
@Data
public class MessageModel implements Serializable {
    private String titile;
    private String message;
}
  1. 生產者
package com.example.demoactivemq.producer;


import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.io.Serializable;


/**
 * 消息生產者
 *
 * @author shanks
 */
@Service
@Slf4j
public class Producer {

    public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");

    @Autowired
    private JmsMessagingTemplate template;

    /**
     * 發送消息
     *
     * @param destination destination是發送到的隊列
     * @param message     message是待發送的消息
     */
    public <T extends Serializable> void send(Destination destination, T message) {
        template.convertAndSend(destination, message);
    }

    /**
     * 延時發送
     *
     * @param destination 發送的隊列
     * @param data        發送的消息
     * @param time        延遲時間
     */
    public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 獲取連接工廠
        ConnectionFactory connectionFactory = template.getConnectionFactory();
        try {
            // 獲取連接
            connection = connectionFactory.createConnection();
            connection.start();
            // 獲取session,true開啟事務,false關閉事務
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建一個消息隊列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //設置延遲時間
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 發送消息
            producer.send(message);
            log.info("發送消息:{}", data);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 消費者
package com.example.demoactivemq.producer;


import com.example.demoactivemq.domain.MessageModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 */
@Component
@Slf4j
public class Consumer {


    @JmsListener(destination = "delay.queue")
    public void receiveQueue(MessageModel message) {
        log.info("收到消息:{}", message);
    }
}
  1. application.yml
spring:
  activemq:
    broker-url: tcp://localhost:61616
  1. 測試類
package com.example.demoactivemq;

import com.example.demoactivemq.domain.MessageModel;
import com.example.demoactivemq.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = DemoActivemqApplication.class)
@RunWith(SpringRunner.class)
class DemoActivemqApplicationTests {

    /**
     * 消息生產者
     */
    @Autowired
    private Producer producer;

    /**
     * 及時消息隊列測試
     */
    @Test
    public void test() {
        MessageModel messageModel = MessageModel.builder()
                .message("測試消息")
                .titile("消息000")
                .build();
        // 發送消息
        producer.send(Producer.DEFAULT_QUEUE, messageModel);
    }

    /**
     * 延時消息隊列測試
     */
    @Test
    public void test2() {
        for (int i = 0; i < 5; i++) {
            MessageModel messageModel = MessageModel.builder()
                    .titile("延遲10秒執行")
                    .message("測試消息" + i)
                    .build();
            // 發送延遲消息
            producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);
        }
        try {
            // 休眠100秒,等等消息執行
            Thread.currentThread().sleep(100000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

執行結果

2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息0)
2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息1)
2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息2)
2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息3)
2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息4)
2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息0)
2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息1)
2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息2)
2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息3)
2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息4)

比你優秀的人比你還努力,你有什麼資格不去奮鬥!!!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、import 1.import語句用來完成導入其他類,同一個包下的類不需要再導入 不在同一個包下需要手動導入。 2.import語法格式 import 類名; import 包名.*; //import語句需要編寫到package語句之下,class語句之上。 3.java.lang.*;不需要 ...
  • 題目鏈接:http://codeforces.com/problemset/problem/939/A A題 A. Love Triangle time limit per test 1 second memory limit per test 256 megabytes input standar ...
  • 下載Microsoft JDBC Driver 4.0 for SQL Server 在這裡下載:http://www.microsoft.com/zh-cn/download/details.aspx?id=11774 1. 在E盤新建一個文件夾,命名為sqljdbc42,將sqljdbc42.j ...
  • 'Specifying a namespace in include() without providing an app_name ’ 從include()函數可以看出來,這個函數有兩個參數,一個arg,一個namespace,我在代碼中也是兩個參數,但是異常中提示了,沒有提供app_name,還 ...
  • Go沒有像Java那樣的異常機制,它不能拋出異常,而是使用了 panic和recover機制。一定要記住,應當把它作為最後的手段來使用,也就是說,我們的代碼中應當沒有,或者很少有panic這樣的東西。 ...
  • 本文收錄在Python從入門到精通系列文章系列 學完前面的幾個章節後,博主覺得有必要在這裡帶大家做一些練習來鞏固之前所學的知識,雖然迄今為止我們學習的內容只是Python的冰山一角,但是這些內容已經足夠我們來構建程式中的邏輯。對於編程語言的初學者來說,在學習了Python的核心語言元素(變數、類型、 ...
  • 下載中間件 簡介 下載器,無法執行js代碼,本身不支持代理 下載中間件用來hooks進Scrapy的request/response處理過程的框架,一個輕量級的底層系統,用來全局修改scrapy的request和response scrapy框架中的下載中間件,是實現了特殊方法的類,scrapy系統 ...
  • 你好,我是彤哥,本篇是netty系列的第一篇。 歡迎來我的公從號 彤哥讀源碼 系統地學習 源碼&架構 的知識。 簡介 本文主要講述netty系列的整體規劃,並調查一下大家喜歡的學習方式。 知識點 netty系列彤哥準備分成三個大的模塊來完成: 入門篇 入門篇主要講述一些必備的基礎知識,例如IO的五種 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...