RabbitMQ學習筆記

来源:https://www.cnblogs.com/loongnuts/archive/2022/11/10/16878640.html
-Advertisement-
Play Games

RabbitMQ 整合RabbitMQ /** * 使用RabbitMQ * 1、引入ampq場景,RabbitAutoConfiguration 就會自動生效 * 2、給容器中自動配置了 * RabbitTemplate、AmqpAdmin、CachingConnectionFactory、Rab ...


RabbitMQ

整合RabbitMQ

/**
 * 使用RabbitMQ
 *  1、引入ampq場景,RabbitAutoConfiguration 就會自動生效
 *  2、給容器中自動配置了
 *      RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
 *      所有的屬性都是在
 *          @EnableConfigurationProperties(RabbitProperties.class)
 *          @ConfigurationProperties(prefix = "spring.rabbitmq")
 *          public class RabbitProperties
 *  3、給配置文件中配置 spring.rabbitmq 信息
 *  4、@EnableRabbit 開啟功能
 *  5、監聽消息:使用 @RabbitListener,必須有 @EnableRabbit
 *      @RabbitListener:類 + 方法上
 *      @RabbitHandler: 只能標在方法上
 */
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# rabbit 配置文件
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

測試

package com.atguigu.gulimall.order;

import com.atguigu.gulimall.order.entity.OrderReturnApplyEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Date;


@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 1、創建Exchange[hello.java.exchange]、Queue、Binding
     *      - 使用 AmqpAdmin 進行創建
     *
     * 2、如何收發消息 -> RabbitTemplate
     *      如果發送的消息是個對象,使用序列化機制,將對象寫出去,對象實現 Serializable 介面
     *      自定義序列化添加配置
     *      @Configuration
     *      public class MyRabbitConfig {
     *          @Bean
     *          public MessageConverter messageConverter() {
     *              return new Jackson2JsonMessageConverter();
     *           }
     *      }
     */

    @Test
    public void sendMessageTest() {
        String msg = "Hello World";
        OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
        orderReturnApplyEntity.setId(1L);
        orderReturnApplyEntity.setSkuName("華為");
        orderReturnApplyEntity.setCreateTime(new Date());
        rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", orderReturnApplyEntity);
        log.info("消息發送完成:{}", orderReturnApplyEntity);
    }

    @Test
    public void createExchange() {
        //amqpAdmin
        /**
         * DirectExchange
         * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
         */
        DirectExchange exchange = new DirectExchange("hello.java.exchange", true,false);
        amqpAdmin.declareExchange(exchange);
        log.info("Exchange[{}]創建成功", "hello.java.exchange");
    }

    @Test
    public void createQueue() {
        /**
         * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
         */
        Queue queue = new Queue("hello-java-queue", true, false,true);
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]創建成功", "hello-java-queue");
    }

    @Test
    public void createBinding() {
        /**
         * public Binding(String destination【目的地】,
         * DestinationType destinationType【目的地類型】,
         * String exchange【交換機】,
         * String routingKey【路由鍵】,
         * Map<String, Object> arguments)【參數】
         * 將 exchange 指定交換機和 destination目的地進行綁定,使用routingKey作為指定路由鍵
         */
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding == 創建成功");
    }
}

測試監聽消息

/**
 * queues:聲明需要監聽的所欲隊列
 *
 * org.springframework.amqp.core.Message;
 *
 * 參數可以寫以下類型
 *  1、Message message;原生消息詳細信息,頭 + 體
 *  2、T<發送的消息的類型> OrderReturnApplyEntity content
 *  3、Channel channel:當前傳輸數據的通道
 *
 *  Queue:可以很多人都來監聽,只要收到消息,隊列刪除消息,而且只有一個人收到此消息
 *      1、訂單服務啟動多個:同一個消息,只能有一個客戶端收到
 *      2、只有一個消息完全處理完,方法運行結束,我們就可以接受到下一個消息
 */
//@RabbitListener(queues = {"hello-java-queue"})
@RabbitHander
public void receiveMessage(Message message, OrderReturnReasonEntity content) {
    System.out.println("接收到消息....:"+ message + "===>內容;" + content + "類型是:" + message.getClass());
    byte[] body = message.getBody();
    //消息頭屬性信息
    MessageProperties properties = message.getMessageProperties();
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("消息處理完成=》" + content.getName());
}

@RabbitListener

簡介:

1.用於標註在監聽類或監聽方法上,接收消息,需要指定監聽的隊列(數組)
2.使用該註解之前,需要在啟動類加上該註解:@EnableRabbit
3.@RabbitListener即可以標註在方法上又可以標註在類上
	標註在類上:表示該類是監聽類,使得@RabbitHandler註解生效
	標註在方法上:表示該方法時監聽方法,會監聽指定隊列獲得消息
4.一般只標註在方法上,並配合@RabbitHandler使用,重載的方式接收不同消息對象

@RabbitHandler

作用:

配合@RabbitListener,使用方法重載的方法接收不同的消息類型

簡介:

1.用於標註在監聽方法上,接收消息,不需要指定監聽的隊列
2.使用該註解之前,需要在啟動類加上該註解:@EnableRabbit
3.@RabbitListener只可以標註在方法,重載的方式接收不同消息對象

發送端消息確認配置

1、配置

2、定製 RabbitTemplate,設置確認回調

# rabbit 配置文件
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

# 開啟發送端確認
spring.rabbitmq.publisher-confirms=true
#開啟發送端消息抵達確認
spring.rabbitmq.publisher-returns=true
#只要抵達隊列。以非同步發送優先回調returnconfirm
spring.rabbitmq.template.mandatory=true

# 手動ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

package com.atguigu.gulimall.order.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;

@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定製 rabbitTemplate
     * 1、服務收到消息就回調
     *      1、spring.rabbitmq.publisher-confirms=true
     *      2、設置確認回調ConfirmCallback
     * 2、消息正確地打隊列進行回調
     *      1、spring.rabbitmq.publisher-returns=true
     *         spring.rabbitmq.template.mandatory=true
     *      2、設置消息抵達隊列的回調
     * 3、消費端確認【保證每一個消息被正確消費,此時才可以讓broker刪除】
     *      1、預設是自動確認,只要消息接受到,自動確認,服務端就會移除這個消息
     *      2、手動確認預設,只要沒有明確告訴MQ,貨物被簽收,沒有ACK,消息一直是unacked狀態。
     *          即使Cosumer宕機,消息也不會丟失,會重新變成Ready,等待下一次新的consumer鏈接發給他
     *      3、如果手動確認:Channel channel -> long deliveryTag = properties.getDeliveryTag(); -> channel.basicAck(deliveryTag, false);
     *          channel.basicAck(deliveryTag, false);           簽收
     *          channel.basicNack(deliveryTag, false, true);    拒簽
     */
    @PostConstruct // MyRabbitConfig 對象創建完成以後執行這個方法
    public void initRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要抵達伺服器,ack就確認為true
             * @param correlationData 當前消息的唯一關聯數據(消息的唯一id)
             * @param ack 是否成功或者失敗
             * @param cause 失敗的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm..." + correlationData + "==> ack:" + ack + "==> cause:" + cause);
            }
        });

        //設置消息抵達隊列的回調
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息沒有投遞給指定的隊列,就觸發失敗回調
             * @param message   投遞失敗的消息詳細信息
             * @param replyCode 回覆的狀態碼
             * @param replyText 回覆的文本內容
             * @param exchange  消息發給那個交換機
             * @param routingKey 當時這個消息使用哪個路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail Message:" + message + "==> replyTest:" + replyText + "==>exchange" + exchange + "==>routingKey:" + routingKey);
            }
        });
    }

}

/**
 * queues:聲明需要監聽的所欲隊列
 * <p>
 * org.springframework.amqp.core.Message;
 * <p>
 * 參數可以寫以下類型
 * 1、Message message;原生消息詳細信息,頭 + 體
 * 2、T<發送的消息的類型> OrderReturnApplyEntity content
 * 3、Channel channel:當前傳輸數據的通道
 * <p>
 * Queue:可以很多人都來監聽,只要收到消息,隊列刪除消息,而且只有一個人收到此消息
 * 1、訂單服務啟動多個:同一個消息,只能有一個客戶端收到
 * 2、只有一個消息完全處理完,方法運行結束,我們就可以接受到下一個消息
 */
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {
    //System.out.println("接收到消息....:"+ message + "===>內容;" + content + "類型是:" + message.getClass());
    System.out.println("接收到消息....:" + content);
    byte[] body = message.getBody();
    //消息頭屬性信息
    MessageProperties properties = message.getMessageProperties();
    /*try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }*/
    System.out.println("消息處理完成=》" + content.getName());
    long deliveryTag = properties.getDeliveryTag();
    System.out.println("deliverTag: " + deliveryTag);
    if (deliveryTag % 2 == 0) {
        //收貨
        // 簽收穫取,非批量模式
        channel.basicAck(deliveryTag, false);
    } else {
        //requeue 重新入隊
        //basicNack(long deliveryTag, boolean multiple, boolean requeue)
        channel.basicNack(deliveryTag, false, true);
        System.out.println("沒有簽收的貨物....." + deliveryTag);
    }
}

最終整合

1.導入mq依賴
<!--amqp高級消息隊列協議,rabbitmq實現-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.ware模塊導入配置
spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
    # 虛擬主機
    virtual-host: /
    # 開啟發送端發送確認,無論是否到達broker都會觸發回調【發送端確認機制+本地事務表】
    publisher-confirm-type: correlated
    # 開啟發送端抵達隊列確認,消息未被隊列接收時觸發回調【發送端確認機制+本地事務表】
    publisher-returns: true
    # 消息在沒有被隊列接收時是否強行退回
    template:
      mandatory: true
    # 消費者手動確認模式,關閉自動確認,否則會消息丟失
    listener:
      simple:
        acknowledge-mode: manual

3.添加註解
// 開啟rabbit
@EnableRabbit

4.創建配置類
/**
 * @Author: wanzenghui
 * @Date: 2021/12/15 0:04
 */
@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        // 使用json序列化器來序列化消息,發送消息時,消息對象會被序列化成json格式
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定製RabbitTemplate
     * 1、服務收到消息就會回調
     * 1、spring.rabbitmq.publisher-confirms: true
     * 2、設置確認回調
     * 2、消息正確抵達隊列就會進行回調
     * 1、spring.rabbitmq.publisher-returns: true
     * spring.rabbitmq.template.mandatory: true
     * 2、設置確認回調ReturnCallback
     * <p>
     * 3、消費端確認(保證每個消息都被正確消費,此時才可以broker刪除這個消息)
     */
    @PostConstruct   // (MyRabbitConfig對象創建完成以後,執行這個方法)
    public void initRabbitTemplate() {
        /**
         * 發送消息觸發confirmCallback回調
         * @param correlationData:當前消息的唯一關聯數據(如果發送消息時未指定此值,則回調時返回null)
         * @param ack:消息是否成功收到(ack=true,消息抵達Broker)
         * @param cause:失敗的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("發送消息觸發confirmCallback回調" +
                    "\ncorrelationData ===> " + correlationData +
                    "\nack ===> " + ack + "" +
                    "\ncause ===> " + cause);
            System.out.println("=================================================");
        });

        /**
         * 消息未到達隊列觸發returnCallback回調
         * 只要消息沒有投遞給指定的隊列,就觸發這個失敗回調
         * @param message:投遞失敗的消息詳細信息
         * @param replyCode:回覆的狀態碼
         * @param replyText:回覆的文本內容
         * @param exchange:接收消息的交換機
         * @param routingKey:接收消息的路由鍵
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 需要修改資料庫 消息的狀態【後期定期重發消息】
            System.out.println("消息未到達隊列觸發returnCallback回調" +
                    "\nmessage ===> " + message +
                    "\nreplyCode ===> " + replyCode +
                    "\nreplyText ===> " + replyText +
                    "\nexchange ===> " + exchange +
                    "\nroutingKey ===> " + routingKey);
            System.out.println("==================================================");
        });
    }
}

5.創建ware解鎖庫存的延時隊列、死信隊列、交換機、綁定關係
/**
 * 創建隊列,交換機,延時隊列,綁定關係 的configuration
 * 1.Broker中的Queue、Exchange、Binding不存在的情況下,會自動創建(在RabbitMQ),不會重覆創建覆蓋
 * 2.懶載入,只有第一次使用的時候才會創建(例如監聽隊列)
 */
@Configuration
public class MyRabbitMQConfig {
    /**
     * 用於首次創建隊列、交換機、綁定關係的監聽
     * @param message
     */
    @RabbitListener(queues = "stock.release.stock.queue")
    public void handle(Message message) {
    }
    
    /**
     * 交換機
     * Topic,可以綁定多個隊列
     */
    @Bean
    public Exchange stockEventExchange() {
        //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
        return new TopicExchange("stock-event-exchange", true, false);
    }

    /**
     * 死信隊列
     */
    @Bean
    public Queue stockReleaseStockQueue() {
        //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        return new Queue("stock.release.stock.queue", true, false, false);
    }

    /**
     * 延時隊列
     */
    @Bean
    public Queue stockDelay() {
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");
        arguments.put("x-dead-letter-routing-key", "stock.release");
        // 消息過期時間 2分鐘
        arguments.put("x-message-ttl", 120000);
        return new Queue("stock.delay.queue", true, false, false,arguments);
    }

    /**
     * 綁定:交換機與死信隊列
     */
    @Bean
    public Binding stockLocked() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,
        // 			Map<String, Object> arguments
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
    }

    /**
     * 綁定:交換機與延時隊列
     */
    @Bean
    public Binding stockLockedBinding() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、節點概述 網頁中的所有內容都是節點(標簽、屬性、文本、註釋等),使用node表示。HTML、DOM樹中的所有節點均可通過JavaScript進行訪問,所有HTML元素(節點)均可被修改,也可以創建或刪除。 利用DOM樹可以把節點劃分為不同的層級關係,常見的是父子兄層級關係。 一般地,節點至少擁有 ...
  • 訪問者模式被認為是最複雜的設計模式,並且使用頻率不高。大多情況下,你不需要使用訪問者模式,但是一旦需要使用它時,那就真的需要使用了。 ...
  • 第一章 LinkedList源碼分析 目標: 理解LinkedList的底層數據結構 深入源碼掌握LinkedList查詢慢,新增快的原因 一、LinkedList的簡介 List介面的鏈接列表實現。實現所有可選的列表操作,並且允許所有元素(包括null)。除了實現List介面外,LinkedLis ...
  • jdk線程池ThreadPoolExecutor工作原理解析(自己動手實現線程池)(一) 線程池介紹 在日常開發中經常會遇到需要使用其它線程將大量任務非同步處理的場景(非同步化以及提升系統的吞吐量),而在使用線程的過程中卻存在著兩個痛點。 在java等很多主流語言中每個邏輯上的線程底層都對應著一個系統線 ...
  • sizeof和strlen在C語言中常用來計算字元大小和長度,在應用中卻有著本質的區別。 sizeof是C語言中的關鍵字,其作用是返回一個對象或類型所占的記憶體位元組數。使用方式為: int i; sizeof(int); *//值為4*` sizeof(i); *//值為4,等價於sizeof(int ...
  • 本章我們正式開始學習Python的入門課程。 在學習Python之前,你要做的第一件事,就是安裝Python環境。 由於Python官方已不再維護Python2.x,所以本系列課程將使用最新的Python3.x版本作為Python環境,不再考慮Python2.x版本的相容性。讓我們面向未來,擁抱Py ...
  • 只有繼承於object的新式類才能有__new__方法,__new__方法在創建類實例對象時由Python解釋器自動調用,一般不用自己定義,Python預設調用該類的直接父類的__new__方法來構造該類的實例,如果該類的父類也沒有重寫__new__,那麼將一直按此規矩追溯至object的__new ...
  • 簡介: 模板方法模式,是行為型的設計模式。 定義一個操作中的演算法的骨架,而將一些步驟延遲到子類當中,使得子類可以不改變一個演算法的結構即可重新定義該演算法的某些特定步驟。 通俗講,模板方法模式是偏向繼承的設計模式,當子類有重覆的動作時候,可將他們提取出來,放在父類進行處理。 適用場景: 演算法結構相同但是 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...