SpringBoot之ActiveMQ實現延遲消息

来源:https://www.cnblogs.com/muyl/archive/2019/11/12/11845882.html
-Advertisement-
Play Games

一、安裝activeMQ ​ 安裝步驟參照網上教程,本文不做介紹 二、修改activeMQ配置文件 ​ broker新增配置信息 schedulerSupport="true" 三、創建SpringBoot工程 ]() 1. 配置ActiveMQ工廠信息,信任包必須配置否則會報錯 2. 消息實體類 ...


一、安裝activeMQ

​ 安裝步驟參照網上教程,本文不做介紹

二、修改activeMQ配置文件

​ broker新增配置信息 schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

三、創建SpringBoot工程

file

  1. 配置ActiveMQ工廠信息,信任包必須配置否則會報錯
package com.example.demoactivemq.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/**
 * @author shanks on 2019-11-12
 */
@Configuration
public class ActiveMqConfig {

    @Bean
    public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        // 設置信任序列化包集合
        List<String> models = new ArrayList<>();
        models.add("com.example.demoactivemq.domain");
        factory.setTrustedPackages(models);

        return factory;
    }

}
  1. 消息實體類
package com.example.demoactivemq.domain;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * @author shanks on 2019-11-12
 */

@Builder
@Data
public class MessageModel implements Serializable {
    private String titile;
    private String message;
}
  1. 生產者
package com.example.demoactivemq.producer;


import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.io.Serializable;


/**
 * 消息生產者
 *
 * @author shanks
 */
@Service
@Slf4j
public class Producer {

    public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");

    @Autowired
    private JmsMessagingTemplate template;

    /**
     * 發送消息
     *
     * @param destination destination是發送到的隊列
     * @param message     message是待發送的消息
     */
    public <T extends Serializable> void send(Destination destination, T message) {
        template.convertAndSend(destination, message);
    }

    /**
     * 延時發送
     *
     * @param destination 發送的隊列
     * @param data        發送的消息
     * @param time        延遲時間
     */
    public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 獲取連接工廠
        ConnectionFactory connectionFactory = template.getConnectionFactory();
        try {
            // 獲取連接
            connection = connectionFactory.createConnection();
            connection.start();
            // 獲取session,true開啟事務,false關閉事務
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建一個消息隊列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //設置延遲時間
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 發送消息
            producer.send(message);
            log.info("發送消息:{}", data);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 消費者
package com.example.demoactivemq.producer;


import com.example.demoactivemq.domain.MessageModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 */
@Component
@Slf4j
public class Consumer {


    @JmsListener(destination = "delay.queue")
    public void receiveQueue(MessageModel message) {
        log.info("收到消息:{}", message);
    }
}
  1. application.yml
spring:
  activemq:
    broker-url: tcp://localhost:61616
  1. 測試類
package com.example.demoactivemq;

import com.example.demoactivemq.domain.MessageModel;
import com.example.demoactivemq.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = DemoActivemqApplication.class)
@RunWith(SpringRunner.class)
class DemoActivemqApplicationTests {

    /**
     * 消息生產者
     */
    @Autowired
    private Producer producer;

    /**
     * 及時消息隊列測試
     */
    @Test
    public void test() {
        MessageModel messageModel = MessageModel.builder()
                .message("測試消息")
                .titile("消息000")
                .build();
        // 發送消息
        producer.send(Producer.DEFAULT_QUEUE, messageModel);
    }

    /**
     * 延時消息隊列測試
     */
    @Test
    public void test2() {
        for (int i = 0; i < 5; i++) {
            MessageModel messageModel = MessageModel.builder()
                    .titile("延遲10秒執行")
                    .message("測試消息" + i)
                    .build();
            // 發送延遲消息
            producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);
        }
        try {
            // 休眠100秒,等等消息執行
            Thread.currentThread().sleep(100000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

執行結果

2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息0)
2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息1)
2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息2)
2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息3)
2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息4)
2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息0)
2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息1)
2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息2)
2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息3)
2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息4)

比你優秀的人比你還努力,你有什麼資格不去奮鬥!!!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、import 1.import語句用來完成導入其他類,同一個包下的類不需要再導入 不在同一個包下需要手動導入。 2.import語法格式 import 類名; import 包名.*; //import語句需要編寫到package語句之下,class語句之上。 3.java.lang.*;不需要 ...
  • 題目鏈接:http://codeforces.com/problemset/problem/939/A A題 A. Love Triangle time limit per test 1 second memory limit per test 256 megabytes input standar ...
  • 下載Microsoft JDBC Driver 4.0 for SQL Server 在這裡下載:http://www.microsoft.com/zh-cn/download/details.aspx?id=11774 1. 在E盤新建一個文件夾,命名為sqljdbc42,將sqljdbc42.j ...
  • 'Specifying a namespace in include() without providing an app_name ’ 從include()函數可以看出來,這個函數有兩個參數,一個arg,一個namespace,我在代碼中也是兩個參數,但是異常中提示了,沒有提供app_name,還 ...
  • Go沒有像Java那樣的異常機制,它不能拋出異常,而是使用了 panic和recover機制。一定要記住,應當把它作為最後的手段來使用,也就是說,我們的代碼中應當沒有,或者很少有panic這樣的東西。 ...
  • 本文收錄在Python從入門到精通系列文章系列 學完前面的幾個章節後,博主覺得有必要在這裡帶大家做一些練習來鞏固之前所學的知識,雖然迄今為止我們學習的內容只是Python的冰山一角,但是這些內容已經足夠我們來構建程式中的邏輯。對於編程語言的初學者來說,在學習了Python的核心語言元素(變數、類型、 ...
  • 下載中間件 簡介 下載器,無法執行js代碼,本身不支持代理 下載中間件用來hooks進Scrapy的request/response處理過程的框架,一個輕量級的底層系統,用來全局修改scrapy的request和response scrapy框架中的下載中間件,是實現了特殊方法的類,scrapy系統 ...
  • 你好,我是彤哥,本篇是netty系列的第一篇。 歡迎來我的公從號 彤哥讀源碼 系統地學習 源碼&架構 的知識。 簡介 本文主要講述netty系列的整體規劃,並調查一下大家喜歡的學習方式。 知識點 netty系列彤哥準備分成三個大的模塊來完成: 入門篇 入門篇主要講述一些必備的基礎知識,例如IO的五種 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...