rabbitmq延遲隊列demo

来源:https://www.cnblogs.com/buguge/archive/2018/12/12/10110932.html
-Advertisement-
Play Games

工程結構: 定義jar包依賴的版本,版本很重要,rabbit依賴spring,必須一致,否則報錯: dependencies: spring-applicationContext: mq-applicationContext-producer.xml: mq-applicationContext-c ...


 

工程結構:

 

定義jar包依賴的版本,版本很重要,rabbit依賴spring,必須一致,否則報錯:

<properties>
    <springframework.version>4.2.7.RELEASE</springframework.version>
    <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version>
    <junit.version>4.12</junit.version>
</properties>

dependencies:

<dependencies>

    <!-- LOGGING begin -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.5</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.0.13</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.0.13</version>
    </dependency>
    <!-- 代碼直接調用common-logging會被橋接到slf4j -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>jcl-over-slf4j</artifactId>
        <version>1.7.5</version>
    </dependency>
    <!-- LOGGING end -->

    <!--springframework-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${springframework.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${springframework.version}</version>
    </dependency>

    <!-- rabbitmq spring依賴 -->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>${spring-rabbit.version}</version>
    </dependency>

    <!--common utils-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.3.2</version>
    </dependency>

    <!--test begin-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>${junit.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${springframework.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <!--test end-->
</dependencies>

 

spring-applicationContext:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd 
         http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="fileEncoding" value="UTF-8"></property>
        <property name="locations">
            <list>
                <value>classpath:applicationContext.properties</value>
            </list>
        </property>
    </bean>

    <context:annotation-config/>

    <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
    <!-- 配置掃描路徑 -->
    <context:component-scan base-package="demo"></context:component-scan>

    <!--rabbit server參數 -->
    <rabbit:connection-factory id="connectionFactory"
                               username="${paycenter.mq.user.username}"
                               password="${paycenter.mq.user.password}"
                               addresses="${paycenter.mq.user.host}"></rabbit:connection-factory>

    <import resource="classpath:mq-applicationContext-producer.xml"/>
    <import resource="classpath:mq-applicationContext-consumer.xml"/>
</beans>

 

mq-applicationContext-producer.xml:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">

    <!--通過指定下麵的admin信息,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring amqp預設的是jackson 的一個插件,目的將生產者生產的數據轉換為json存入消息隊列 -->
    <bean id="mqMessageConverter"
          class="org.springframework.amqp.support.converter.SimpleMessageConverter">
    </bean>

    <!--<bean id="publisherConfirmsReturns" class="com.emaxcard.mq.rabbit.PublisherConfirmsReturns"></bean>-->


    <!--========================延遲隊列配置 begin =========================-->
    <rabbit:queue id="agentpayqueryQueue2" durable="true" auto-delete="true" exclusive="false"
                  name="agentpayqueryQueue2"/>
    <rabbit:direct-exchange id="agentpayqueryExchange2" durable="true" auto-delete="true" name="agentpayqueryExchange2">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayqueryQueue2" key="delay"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <rabbit:queue id="agentpayqueryQueue1" durable="true" auto-delete="true" exclusive="false"
                  name="agentpayqueryQueue1">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="agentpayqueryExchange2"/>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:direct-exchange id="agentpayqueryExchange1" durable="true" auto-delete="true" name="agentpayqueryExchange1">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayqueryQueue1" key="delay"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定義RabbitTemplate實例-->
    <!--confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"-->
    <rabbit:template id="agentpayQueryMsgTemplate"
                     exchange="agentpayqueryExchange1" routing-key="delay"
                     connection-factory="connectionFactory" message-converter="mqMessageConverter"
                     mandatory="true"
    />
    <!--========================延遲隊列配置 end =========================-->

</beans>

 

mq-applicationContext-consumer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">


    <bean id="agentpayQueryConsumer" class="demo.TestMQConsumer" />

    <!-- TODO 後續刪除
    receive-timeout:等待接收超時時長 影響連接創建和銷毀

    concurrency:消費者個數
    max-concurrency:最大消費者個數
    min-start-interval:陸續啟動  減少併發環境(或是三方系統突然的網路延遲) 大量連接導致的性能耗損
    min-stop-interval:陸續銷毀   減少突然的安靜 導致大量可用連接被銷毀
    min-consecutive-active: 連續N次沒有接收發生超時  則認定為需要創建 消費者
    min-consecutive-idle: 連續N次發生了接收超時   則認定消費者需要銷毀

    prefetch:每個消費者預讀條數 因為非同步調用三方 性能瓶頸在網路與三方系統所以預讀取條數設置為1(預設為5) 只有一條消息被ACK才會接收下一條消息
    transaction-size:會影響prefetch的數量
    -->
    <!--  監聽器 -->
    <!-- queue litener  觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"
                               max-concurrency="20"
                               concurrency="5"
                               prefetch="10">
        <rabbit:listener ref="agentpayQueryConsumer" queue-names="agentpayqueryQueue2" />
    </rabbit:listener-container>
</beans>

 

 

Producer類:
package demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class TestMQProducer {

    private static Logger logger = LoggerFactory.getLogger(TestMQProducer.class.getSimpleName());

    @Autowired
    private RabbitTemplate agentpayQueryMsgTemplate;

    @Test
    public void test() throws Exception {
        for (int i = 0; i <= 100; i++) {
            Object data = String.valueOf(i);
            agentpayQueryMsgTemplate.convertAndSend(data);
            logger.info("入隊:{}", data);
        }
        Thread.sleep(12000);
    }
}

 

 

Consumer類:
package demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class TestMQConsumer implements MessageListener {

    private static Logger logger = LoggerFactory.getLogger(TestMQConsumer.class.getSimpleName());

    public void onMessage(Message message) {
        String data = new String(message.getBody());

        try {
            //模擬處理慢
            Thread.sleep(1);

            logger.info("出隊:{}", data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

 

 

 至此代碼就完畢了。

 

說明:上面定義隊列時我把auto-delete屬性設置為true, 所以,當消費者消費完並關閉連接後,隊列會自動刪除。exchange也如是。(通過mq控制台看,慄子中的agentpayqueryQueue2和agentpayqueryExchange2在執行完就自動消失了,agentpayqueryQueue1和agentpayqueryExchange1還存在。)

spring-rabbit-x.xml里對queue和exchange的auto-delete屬性的解釋:

Flag indicating that an queue will be deleted when it is no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:queue)

Flag indicating that an exchange will be deleted when no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:exchange)

 

消費端的concurrency說明:

同樣,看spring-rabbit-x.xml的解釋:

The number of concurrent consumers to start for each listener initially.
See also 'max-concurrency'.

 

上面我設置的值是5,從mq控制台里看queue的consumer見下圖:

從出隊日誌,可以看出來,共有5個線程在消費這些消息。

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 不管是給字元串賦值,還是對字元串格式化,都屬於往字元串填充內容,一旦內容填充完畢,則需開展進一步的處理。譬如一段Word文本,常見的加工操作就有查找、替換、追加、截取等等,按照字元串的處理結果異同,可將這些操作方法歸為三大類,分別說明如下。一、判斷字元串是否具備某種特征該類方法主要用來判斷字元串是否 ...
  • 一、 更新系統 #yum -y install epel-release #yum clean all && yum makecache #yum -y update 二、安裝python3 系統自帶的python版本是2,並且沒有安裝pip 1、python2安裝pip #yum -y insta ...
  • 項目結構搭建 ...
  • with/as 使用open打開過文件的對with/as都已經非常熟悉,其實with/as是對try/finally的一種替代方案。 當某個對象支持一種稱為"環境管理協議"的協議時,就會通過環境管理器來自動執行某些善後清理工作,就像finally一樣:不管中途是否發生異常,最終都會執行某些清理操作。 ...
  • 參考:https://www.iteye.com/topic/1122740 上一節,主要分析了 被標記為事務的方法互相調用,事務失效的原因,思考比較多,這一節主要說說解決方案,思考會少一些。 解決方案的核心: 通過代理對象去調用方法 1.把方法放到不同的類: 我們需要新建一個介面: 再定義一個類去 ...
  • 1、登錄,三次登錄鎖定用戶 用戶信息的文件 黑名單的文件 1、黑名單裡面檢測,不讓登錄 2、用戶名單密碼判定 2、三級菜單 dic = { "省":{ "市":['縣] } } #字典,列表 dic.keys() dic["省"].keys() dic["省"]["市"] dic["省"].keys ...
  • map和其他語言的hashmap是一樣的,是一個kv的數據集合,是按照哈希演算法得到k的一個整數,將v存到一個數組的k位。 ...
  • 內置函數思維導圖:https://www.processon.com/mindmap/5c10c08ae4b07c3e3334f22d 內置函數 作用域相關: locals() 返回當前作用域中的名字 globals() 返回全局作用域中的名字 迭代器相關: range() 生成數據 next()  ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...