用延遲隊列取代定時任務

来源:https://www.cnblogs.com/buguge/archive/2018/12/07/10085302.html
-Advertisement-
Play Games

§1 RabbitMQ延遲隊列 RabbitMQ延遲隊列,主要是藉助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)來實現。 涉及到2個隊列,一個用於發送消息,一個用於消息過期後的轉發目標隊列。 本例中, 定義2組exchange和queu ...


§1 RabbitMQ延遲隊列

RabbitMQ延遲隊列,主要是藉助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)來實現。

涉及到2個隊列,一個用於發送消息,一個用於消息過期後的轉發目標隊列。

 

本例中, 定義2組exchange和queue。

agentpayquery1exchange		agentpayquery1queue(routingkey為delay)
agentpayquery2exchange		agentpayquery2queue(routingkey為delay)
agentpayquery1queue是緩衝隊列,消息過期路由到agentpayquery2queue

 

 

§2 生產者

生產者配置:

<!-- 連接服務配置 -->
<rabbit:connection-factory
        id="connectionFactoryProducer"
        addresses="${mq.ip}"    //192.168.40.40:5672
        username="${username}"
        password="${password}"
        channel-cache-size="${cache.size}"
        publisher-confirms="${publisher.confirms}"
        publisher-returns="${publisher.returns}"
        virtual-host="/"
/>

<!--========================出款查詢 延遲隊列配置 begin =========================-->
<rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue"/>
<rabbit:direct-exchange name="agentpayquery2exchange" durable="true" auto-delete="false" id="agentpayquery2exchange">
    <rabbit:bindings>
        <rabbit:binding queue="agentpayquery2queue" key="delay" />
    </rabbit:bindings>
</rabbit:direct-exchange>


<rabbit:queue id="agentpayquery1queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery1queue" >
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="agentpayquery2exchange"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange name="agentpayquery1exchange" durable="true" auto-delete="false" id="agentpayquery1exchange">
    <rabbit:bindings>
        <rabbit:binding queue="agentpayquery1queue" key="delay" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<!--定義RabbitTemplate實例-->
<rabbit:template id="agentpayQueryMsgTemplate"
                 exchange="agentpayquery1exchange"  routing-key="delay"
                 queue="agentpayquery1queue"
                 connection-factory="connectionFactoryProducer" message-converter="mqMessageConverter"
                 mandatory="true"
                 confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"/>
<!--========================出款查詢 延遲隊列配置 end =========================-->

 

 

生產者消息入隊:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AgentpayQueryProducer {

    private static final Logger log = LogManager.getLogger(AgentpayQueryProducer.class.getSimpleName());

    @Autowired
    private RabbitTemplate agentpayQueryMsgTemplate;

    public void sendDelay(String message, int delaySeconds) {
        String expiration = String.valueOf(delaySeconds * 1000);
        agentpayQueryMsgTemplate.convertAndSend((Object) message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message)
                    throws AmqpException {
                message.getMessageProperties().setExpiration(expiration);
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                log.info("出款查詢數據入隊:{}", new String(message.getBody()));
                return message;
            }
        });
    }
}

 

 

§3消費者

消費端的配置無他:

<!-- 連接服務配置  channel-cache-size="25" -->
<rabbit:connection-factory id="connectionFactory"
                           addresses="${mq.ip}"
                           username="${username}"
                           password="${password}" />

<bean id="agentpayQueryConsumer" class="com.emaxcard.rpc.payment.service.impl.batchpay.AgentpayQueryConsumer" />

<!-- queue litener  觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
<rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue" />

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"

                           max-concurrency="20"
                           concurrency="10"
                           prefetch="10">
    <rabbit:listener ref="agentpayQueryConsumer" queues="agentpayquery2queue" />
</rabbit:listener-container>

 

消息消費:

import com.alibaba.fastjson.JSON;
import com.emaxcard.enums.BatchPayStatus;
import com.emaxcard.exceptions.ResponseException;
import com.emaxcard.payment.vo.PaymentRecord;
import com.emaxcard.rpc.payment.model.PaymentRecordModel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

public class AgentpayQueryConsumer implements MessageListener {

    private static final Logger log = LogManager.getLogger();

    @Autowired
    QueryGatewayService queryGatewayService;
    @Autowired
    AgentpayQueryProducer agentpayQueryProducer;

    @Override
    public void onMessage(Message message) {
        String mqMsg = new String(message.getBody());
        log.info("出款查詢數據出隊:{}", mqMsg);
        PaymentRecord paymentRecordModel;
        try {
            paymentRecordModel = JSON.parseObject(mqMsg, PaymentRecord.class);
        } catch (Exception ex) {
            log.info("消息格式不是PaymentRecordModel,結束。");
            return;
        }

        try {
            BatchPayStatus payStatus = queryGatewayService.queryGateway(paymentRecordModel);

            // 非終態,繼續放入延遲隊列
            if (BatchPayStatus.SUCCESS != payStatus && BatchPayStatus.FAILED != payStatus) {
                if (BatchPayStatus.NOTEXIST == payStatus) {
                    log.info("查詢結果是{},不再處理", payStatus);
                } else {
                    agentpayQueryProducer.sendDelay(mqMsg, 10);
                }
            }
        } catch (Exception ex) {
            if (ex instanceof ResponseException) {
                log.info("轉賬查詢{},paymentId{},處理錯誤:{}",
                        paymentRecordModel.getTransNo(), paymentRecordModel.getPaymentId(), ex.getMessage());
            } else {
                log.error("處理消息異常:", ex);
            }
        }

    }
}

 

 

§4 使用延遲隊列要註意

1. 因為是隊列,所以即使一個消息比在同一隊列中的其他消息提前過期,提前過期的也不會優先進入死信隊列,它們還是按照入庫的順序讓消費者消費。如果第一進去的消息過期時間是1小時,那麼死信隊列的消費者也許等1小時才能收到第一個消息。

2. 當緩衝隊列里一旦出現未設置過期時間的消息,那麼就會造成整個隊列堵塞。消費端也無法消費到消息。通過日誌可以看到,列印出來的都是 BlockingQueueConsumer。

 

 

 

Get messages Ack Mode選擇“Ack message requeue false”,可以將消息消費掉

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 適配器模式(Adapter Pattern)又叫做變壓器模式,變壓器把一種電壓變換為另一種電壓。 定義: 將一個類的介面變換成客戶端所期待的另一種介面,從而使原本因介面不匹配而無法一起工作的兩個類能夠在一起工作。 適配器模式就是將一個介面或類轉換成其它的介面或類,適配器相當於一個包裝器,類圖如下所示 ...
  • 用CEF4Delphi取網頁元素時碰到ElementInnerText里含有"&nbsp;" 比如網頁源碼里是"內容&nbsp;"取出來顯示就變成"內容?" 搜索大部分是說把"&nbsp;"替換成其它字元即可 但實際操作怎麼也替換不了,就算變數為AnsiString也不行 最後用了以下方法解決 參考 ...
  • GitHub 上有一個名為《What the f*ck Python!》的項目,這個有趣的項目意在收集 Python 中那些難以理解和反人類直覺的例子以及鮮為人知的功能特性, 並嘗試討論這些現象背後真正的原理! ...
  • 1.單引號和轉義引導 2.拼接字元串 3.格式化字元串 4.常用方法 #去掉空格和特殊符號 #字元串的搜索和替換 #字元串的測試和替換函數 #字元串的分割 #string模塊 ...
  • 對象是否存活 Java的GC基於可達性分析演算法(Python用引用計數法),通過可達性分析來判定對象是否存活。這個演算法的基本思想是通過一系列"GC Roots"的對象作為起始點,從這些節點開始向下搜索,搜索所走過的路徑稱為引用鏈,當一個對象到GC Roots沒有任何引用鏈相連時(圖論稱之為不可達), ...
  • 1. 動態傳參 *, ** : 形參: 聚合 位置參數* -> 元組 關鍵字** -> 字典 實參: 打散 列表, 字元串, 元素 -> * 字典 -> ** 形參順序: 位置, *args, 預設值, **kwargs "無敵傳參" def func(*args, **kwargs): argum ...
  • 1. 動態傳參 *, ** : 形參: 聚合 位置參數* -> 元組 關鍵字** -> 字典 實參: 打散 列表, 字元串, 元素 -> * 字典 -> ** 形參順序: 位置, *args, 預設值, **kwargs "無敵傳參" def func(*args, **kwargs): argum ...
  • 1.打開文件: f=open(r'E:\PythonProjects\test7\a.txt',mode='rt',encoding='utf-8') 以上三個單引號內分別表示:要打開的文件的路徑,mode為文件打開方式具體介紹在下文,encoding為文件的字元編碼,一般預設為utf-8 2.讀寫 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...