SpringCloud(六) - RabbitMQ安裝,三種消息發送模式,消息發送確認,消息消費確認(自動,手動)

来源:https://www.cnblogs.com/xiaoqigui/archive/2022/11/04/16856769.html
-Advertisement-
Play Games

1、安裝erlang語言環境 1.1 創建 erlang安裝目錄 mkdir erlang 1.2 上傳解壓壓縮包 上傳到: /root/ 解壓縮# tar -zxvf otp_src_22.0.tar.gz 1.3 進入解壓縮目錄,指定目錄並安裝 進入解壓目錄,指定安裝目錄# ./configur ...


1、安裝erlang語言環境

1.1 創建 erlang安裝目錄

mkdir erlang

1.2 上傳解壓壓縮包

上傳到: /root/

解壓縮# tar -zxvf otp_src_22.0.tar.gz

1.3 進入解壓縮目錄,指定目錄並安裝

進入解壓目錄,指定安裝目錄# ./configure --prefix=/usr/local/kh96/erlang

安裝# make install

添加環境變數# echo 'export PATH=$PATH:/usr/local/kh96/erlang/bin' >> /etc/profile

刷新環境變數# source /etc/profile

1.4 測試環境

進入erlang環境#erl

退出# halt().

2、安裝RabbitMQ

2.1上傳解壓壓縮包

第一步xx.tar.xz->xx.tar # /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz

第二步#tar -xvf rabbitmq-server-generic-unix-3.7.15.tar

2.2 添加環境變數

添加環境變數# echo 'export PATH=$PATH:/usr/local/kh96/rabbitmq/rabbitmq_server-3.7.15/sbin' >> /etc/profile

刷新環境變數# source /etc/profile

2.3 啟動

啟動#  rabbitmq-server -detached

查看狀態# rabbitmqctl status

查看防火牆狀態# firewall-cmd --state (建議不開)

2.4 開啟雲服務埠

RabbitMQ 服務埠: 5672

RabbitMQ 監控平臺埠: 15672

開啟web插件允許監控平臺訪問 # rabbitmq-plugins enable rabbitmq_management

2.5 遠程 訪問 15672

公網ip:15672

Username: guest

Password: guest

提示這個這個賬號只允許本地訪問,所以需要添加用戶

2.6 添加用戶

顯示所有用戶# rabbitmqctl list_users

查看guest用戶許可權# rabbitmqctl list_user_permissions guest

添加admin用戶及密碼# rabbitmqctl add_user admin admin

設置限權# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

授予admin用戶administrator角色# rabbitmqctl set_user_tags admin administrator

查看admin用戶許可權# rabbitmqctl list_user_permissions admin

刪除用戶guest# rabbitmqctl delete_user guest

停止RabbitMQ# rabbitmqctl stop

2.7 登錄成功

Username: admin

Password: admin

3、SpringBoot整合

3.0 項目準備

3.0.1 jar包

<!--rabbitmq依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.0.2 配置信息

# 埠
server:
  port: 8104

# RabbitMQ配置
spring:
  rabbitmq:
    host: x.xxx.xx.xx #伺服器公網ip
    port: 5672
    username: admin
    password: admin

3.0.3 常量類

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQ 常量類,系統的所有隊列名,交換機名,路由鍵名等,統一進行配置管理
 */
public class RabbitMQConstant {

    //========================== 直連模式
    /**
     * Direct直連模式 隊列名
     */
    public static final String RABBITMQ_DIRECT_QUEUE_NAME_KH96 ="rabbitmq_direct_queue_name_kh96";

    /**
     * Direct直連模式 交換機名
     */
    public static final String RABBITMQ_DIRECT_EXCHANGE_KH96 ="rabbitmq_direct_exchange_kh96";

    /**
     * Direct直連模式 路由鍵
     */
    public static final String RABBITMQ_DIRECT_ROUTING_KEY_KH96 ="rabbitmq_direct_routing_key_kh96";

    //========================== 扇形模式

    /**
     * Fanout 扇形模式 隊列名one
     */
    public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE ="rabbitmq_fanout_queue_name_kh96_one";

    /**
     * Fanout 扇形模式 隊列名two
     */
    public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO ="rabbitmq_fanout_queue_name_kh96_two";

    /**
     * Fanout 扇形模式 交換機名
     */
    public static final String RABBITMQ_FANOUT_EXCHANGE_KH96 ="rabbitmq_fanout_exchange_kh96";


    //========================== 主題模式
    // -- 隊列
    /**
     * Topic 主題模式 隊列名one
     */
    public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE ="rabbitmq_topic_queue_name_kh96_one";
    /**
     * Topic 主題模式 隊列名two
     */
    public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO ="rabbitmq_topic_queue_name_kh96_two";
    /**
     * Topic 主題模式 隊列名Three
     */
    public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE ="rabbitmq_topic_queue_name_kh96_three";

    //-- 交換機
    /**
     * Topic 主題模式 交換機名
     */
    public static final String RABBITMQ_TOPIC_EXCHANGE_KH96 ="rabbitmq_topic_exchange_kh96";


    //-- 路由鍵
    /**
     * Topic 主題模式 -路由鍵-唯一匹配規則
     */
    public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY="rabbitmq_topic_routing_key_kh96.only";

    /**
     * Topic 主題模式 -路由鍵-單詞匹配規則  * 單個詞
     */
    public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD="rabbitmq_topic_routing_key_kh96.*";

    /**
     * Topic 主題模式 -路由鍵-模糊匹配規則 # 0 或 多個詞
     */
    public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE="rabbitmq_topic_routing_key_kh96.#";

}

3.0.4 手動操作隊列關係

在測試的時候,一定要註意交換機和隊列的綁定關係,只要綁定過的關係就會一直存在需要手動刪除;如果測試結果不正常的時候,看一些交換機和隊列與鍵值的綁定關係;

選擇隊列:

刪除隊列:

3.1 Direct 直連模式

3.1.0 核心構造方法:Queue

  • 核心構造方法:Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
  1. name參數:name – the name of the queue.
    • 指定創建的消息隊列的名字,參數必傳,即創建隊列必須要有隊列名。
  2. durable參數:durable – true if we are declaring a durable queue (the queue will survive a server restart)
    • 指定創建的消息隊列是否需要持久化,預設是true,如果是true,該隊列支持持久化,自動持久化到磁碟,RabbitMQ服務重啟,隊列仍然是可用的(存活的)。
  3. exclusive參數:true if we are declaring an exclusive queue (the queue will only be used by the declarer's connection)
    • 指定創建的消息隊列是否是排他隊列,預設是false,如果是true,該隊列是排他隊列,只有創建當前隊列的連接才可以使用,連接一旦斷開,隊列會自動刪除。
  4. autoDelete參數:true if the server should delete the queue when it is no longer in use
    • 指定創建的消息隊列是否是自動刪除隊列,預設是false,如果是true,該隊列是自動刪除隊列,一旦沒有消息生產者或者消費者使用當前隊列,會被自動刪除。

3.1.1 配置類

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: Direct直連模式,自動配置類,自動創建隊列,交換機,並將隊列綁定到交換機,指定唯一路由
 */
@Configuration
public class RabbitMQDirectConfig {

    //創建 直連隊列
    @Bean
    public Queue directQueue(){
        //創建 直連隊列
        return new Queue(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96,true);
    }

    //創建 直連交換機
    @Bean
    public DirectExchange directExchange(){
        // 創建支持持久化的直連交換機,指定交換機的名稱
        return new DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96);
    }

   	//將直連隊列和直連交換機 進行綁定,並指定綁定的唯一路由鍵
    @Bean
    public Binding directBinding(){
        // 將直連隊列和直連交換機進行綁定,並指定綁定的唯一路由鍵
        return BindingBuilder.bind(directQueue())
                            .to(directExchange())
                            .with(RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96);
    }


}

3.1.2 消息生產者

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: Direct 直連模式 消息生產者
 */
@Slf4j
@Component
public class RabbitMQDirectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @author : huayu
     * @date   : 1/11/2022
     * @param  : [directMsg, directExchange, directRoutingKey]
     * @return : void
     * @description : 使用直連模式,發送消息到直連交換機,通過交換機綁定的唯一路由鍵,將消息發送到綁定的隊列中
     */
    public void sendDirectMsg2DirectExchange(String directExchange,String directRoutingKey,String directMsg){
        log.info("++++++  direct模式消息生產者,發送直連消息:{},到交換機:{},路由鍵:{} ++++++",directMsg,directExchange,directRoutingKey);

        rabbitTemplate.convertAndSend(directExchange,directRoutingKey,directMsg);

    }

}

3.1.3 消費者

3.1.3.1 消費者One
/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: Direct 直連模式消費者 One
 */
@Slf4j
@Component
//指定接聽的 消息隊列 名字
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
public class RabbitMQDirectConsumerOne {

    /**
     * @author : huayu
     * @date   : 1/11/2022
     * @param  : [directMsgJson]
     * @return : void
     * @description : Direct 直連模式消費者One,消費信息
     */
    //指定消息隊列中的消息,交給對應的方法處理
    @RabbitHandler
    public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
        log.info("***** Direct直連模式,消費者One,消費消息:{} ******",directMsgJson);

        // TODO 核心業務邏輯處理

    }


//    @RabbitHandler  //自動根據隊列中的消息類型,自動區分方法
//    public void consumeOtherDirectMsgFromDirectQueue(List<String> directMsgJson){
//        log.info("***** Direct直連模式,消費者Two,消費消息:{} ******",directMsgJson);
//
//        // TODO 核心業務邏輯處理
//
//    }


}
3.1.3.2 消費者Two
/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQDirectConsumerTwo
 */
@Slf4j
@Component
//指定監聽的消息隊列 名字
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
public class RabbitMQDirectConsumerTwo {
    /**
     * @author : huayu
     * @date   : 1/11/2022
     * @param  : [directMsgJson]
     * @return : void
     * @description : Direct 直連模式消費者 Two,消費信息
     */
    //指定消息隊列中的消息,交給對應的方法處理
    @RabbitHandler
    public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
        log.info("***** Direct直連模式,消費者Two,消費消息:{} ******",directMsgJson);

        // TODO 核心業務邏輯處理

    }

}

3.1.4 請求測試方法

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 測試 RabbitMQ 消息隊列的操作入口
 */
@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    private RabbitMQDirectProducer rabbitMQDirectProducer;
    

    /**
     * @author : Administrator
     * @date   : 2022/11/1
     * @param  : [directMsg]
     * @return : com.kgc.sct.util.RequestResult<java.lang.String>
     * @description : 測試direct直連模式,發送和消費消息
     */
    @GetMapping("/direct")
    public RequestResult<String> testRabbitMQDirect(@RequestParam String directMsg){
        log.info("direct直連模式,發送消息");
        //模擬發送5條直連消息
        Stream.of(11,22,33,44,55).forEach(directNo ->{
            //模擬創建消息對象
            Map<String,Object> directMap =new HashMap<>();
            directMap.put("directNo",directNo);
            directMap.put("directData",directMsg);
            directMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

        //調用直連模式消息生產者,發送消息
            rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
                                                         ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
                                                         ,JSON.toJSONString(directMap));

        return ResultBuildUtil.success("使用直連模式。發送消息成功");
    }

}

3.1.5 請求測試

發起請求

3.1.5.1 一個消費者

消費者One消費了隊列中的所有信息(只有一個隊列);

3.1.5.2 兩個消費者

消費者One和消費者Two依次消費了隊列中的所有信息(只有一個隊列);

3.2 Fanout 扇形模式

3.2.1 配置類

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: Fanout扇形模式,自動配置類,自動創建隊列,交換機,並將隊列綁定到交換機
 */
@Configuration
public class RabbitMQFanoutConfig {

    //創建 扇形隊列One
    @Bean
    public Queue fanoutQueueOne(){
        return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE);
    }

    //創建 扇形隊列Two
    @Bean
    public Queue fanoutQueueTwo(){
        return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO);
    }

    // 創建扇形交換機
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96);

    }

    //綁定隊列到扇形交換機,不需要 指定 路由鍵
    @Bean
    public Binding fanoutBindingQueueOne(){
        //綁定隊列到扇形交換機,不需要路由鍵,消息是廣播發送,會給多有綁定的隊列群發信息消息(根本沒有提供with方法)
        return BindingBuilder.bind(fanoutQueueOne())
                            .to(fanoutExchange());
    }

    @Bean
    public Binding fanoutBindingQueueTwo(){
        //綁定隊列到扇形交換機,不需要路由鍵,消息是廣播發送,會給多有綁定的隊列群發信息消息(根本沒有提供with方法)
        return BindingBuilder.bind(fanoutQueueTwo())
                            .to(fanoutExchange());
    }

}

3.2.2 消息生產者

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQFanoutProducer
 */
@Slf4j
@Component
public class RabbitMQFanoutProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @author : huayu
     * @date   : 1/11/2022
     * @param  : [fanoutExchange, fanoutRoutingKey, fanoutMsg]
     * @return : void
     * @description : 使用扇形模式,發送消息到扇形交換機,將消息發送到綁定的隊列中
     */
    public void sendFanoutMsg2FanoutExchange(String fanoutExchange,String fanoutRoutingKey,String fanoutMsg){
        log.info("++++++ Fanout模式消息生產者,發送廣播消息:{},到交換機:{},路由鍵:{} ++++++", fanoutMsg, fanoutExchange, fanoutRoutingKey);
        rabbitTemplate.convertAndSend(fanoutExchange, fanoutRoutingKey, fanoutMsg);
    }

}

3.2.3 消費者

3.2.3.1 消費者One
/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQFanoutConsumerOne
 */
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE)
public class RabbitMQFanoutConsumerOne {

    @RabbitHandler
    public void fanoutConsumeOneFanoutMsgFromFanoutQueueOne(String fanoutMsgJson){
        log.info("****** Fanout扇形模式,消費One,消費隊列One,消息:{}  ******",fanoutMsgJson);

        // TODO 核心業務邏輯處理

    }

}
3.2.3.2 消費者Two
/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQFanoutConsumerTwo
 */
@Slf4j
@Component
//@RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO)
public class RabbitMQFanoutConsumerTwo {

//    @RabbitHandler
    public void fanoutConsumeTwoFanoutMsgFromFanoutQueueTwo(String fanoutMsgJson){
        log.info("****** Fanout扇形模式,消費Two,消費隊列Two,消息:{}  ******",fanoutMsgJson);

        // TODO 核心業務邏輯處理

    }

}

3.2.4 請求測試方法

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 測試 RabbitMQ 消息隊列的操作入口
 */
@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    private RabbitMQFanoutProducer rabbitMQFanoutProducer;
    
 	/**
     * @author : huayu
     * @date   : 1/11/2022
     * @param  : [fanoutMsg]
     * @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
     * @description : 測試扇形(廣播)模式,發送和消費信息
     */
    @GetMapping("/fanout")
    public RequestResult<String> testRabbitMQFanout(@RequestParam String fanoutMsg){
        log.info("------- fanout 扇形模式,發送消息 -------");
        //模擬發送5條直連消息
        Stream.of(66,77,88,99,96).forEach(directNo ->{
            //模擬創建消息對象
            Map<String,Object> fanoutMap =new HashMap<>();
            fanoutMap.put("directNo",directNo);
            fanoutMap.put("directData",fanoutMsg);
            fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

            //調用扇形模式消息生產者,發送消息
            rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96
                                                                ,null
                                                                ,JSON.toJSONString(fanoutMap));

        });

        return ResultBuildUtil.success("使用扇形模式。發送消息成功");
        
    }

}     

3.2.5 請求測試

3.2.5.1 一個消費者

消費者One消費了隊列One中的所有信息;

3.2.5.2 兩個消費者

消費者One消費了隊列One中的所有信息;

消費者Two消費了隊列Two中的所有信息;

3.3 Topic 主題模式

3.3.1 配置類

/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: Topic 主題模式,自動配置類
 */
@Configuration
public class RabbitMQTopicConfig {

    //======== 隊列
    //Topic 主題模式 隊列One
    @Bean
    public Queue topicQueueOne(){
        return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE,true);
    }

    //Topic 主題模式 隊列Two
    @Bean
    public Queue topicQueueTwo(){
        return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO,true);
    }

    //Topic 主題模式 隊列Three
    @Bean
    public Queue topicQueueThree(){
        return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE,true);
    }

    //======= 交換機
    //Topic 主題模式 交換機
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96);

    }

    //======= 綁定
    // 隊列One 綁定 Topic主題模式交換機  和 路由鍵-唯一匹配規則
    @Bean
    public Binding topicBindingQueueOne(){
        return BindingBuilder.bind(topicQueueOne())
                .to(topicExchange())
                .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY);

    }

    // 隊列Two 綁定 Topic主題模式交換機  和 路由鍵-單個單詞詞匹配規則
    @Bean
    public Binding topicBindingQueueTwo(){
        return BindingBuilder.bind(topicQueueTwo())
                .to(topicExchange())
                .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD);

    }

    // 隊列Two 綁定 Topic主題模式交換機  和 路由鍵-模糊匹配規則
    @Bean
    public Binding topicBindingQueueThree(){
        return BindingBuilder.bind(topicQueueThree())
                .to(topicExchange())
                .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE);

    }


}

3.3.2 消息生產者

/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQ 主題模式消息生產者
 */
@Slf4j
@Component
public class RabbitMQTopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @author : huayu
     * @date   : 1/11/2022
     * @param  : [topicExchange, topicRoutingKey, topicMsg]
     * @return : void
     * @description : 使用主題模式,發送消息到主題交換機,主題交換機會根據發送消息的路由鍵 ,根據匹配規則將消息投遞到匹配的隊列中
     */
    public void sendTopicMsg2TopicExchange(String topicExchange,String topicRoutingKey,String topicMsg){
        log.info("++++++  direct模式消息生產者,發送直連消息:{},到交換機:{},路由鍵:{} ++++++",topicMsg,topicExchange,topicRoutingKey);

        rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey,topicMsg);

    }

}

3.3.3 消費者

3.3.3.1 消費者One
/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQTopicConsumerOne
 */
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE)
public class RabbitMQTopicConsumerOne {

    @RabbitHandler
    public void consumeTopicMsgFromTopicQueue(String topicMapJson){
        log.info("****** Topic 主題模式,消費One,消費隊列One,消息:{}  ******",topicMapJson);

        // TODO 核心業務邏輯處理

    }

}
3.3.3.2 消費者Two
/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQTopicConsumerTwo
 */
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO)
public class RabbitMQTopicConsumerTwo {

    @RabbitHandler
    public void consumeTopicMsgFromTopicQueue(String topicMapJson){
        log.info("****** Topic 主題模式,消費 Two,消費隊列 Two,消息:{}  ******",topicMapJson);

        // TODO 核心業務邏輯處理

    }

}
3.3.3.3 消費者Three
/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQTopicConsumerThree
 */
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE)
public class RabbitMQTopicConsumerThree {

    @RabbitHandler
    public void consumeTopicMsgFromTopicQueue(String topicMapJson){
        log.info("****** Topic 主題模式,消費 Three,消費隊列 Three,消息:{}  ******",topicMapJson);

        // TODO 核心業務邏輯處理

    }

}

3.3.4 請求測試方法

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 測試 RabbitMQ 消息隊列的操作入口
 */
@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    private RabbitMQTopicProducer rabbitMQTopicProducer;
    
        @GetMapping("/topic")
    public RequestResult<String> testRabbitMQTopic(@RequestParam String topicMsg){
        log.info("------- topic 主題模式,發送消息 -------");
        //模擬發送5條直連消息
        Stream.of(95,96,97,98,99).forEach(directNo ->{
            //模擬創建消息對象
            Map<String,Object> fanoutMap =new HashMap<>();
            fanoutMap.put("directNo",directNo);
            fanoutMap.put("directData",topicMsg);
            fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

            //調用主題模式消息生產者,發送消息
            //場景1:使用唯一路由鍵 rabbitmq_topic_routing_key_kh96.only , 發送消息
            rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
                                                            ,RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY
                                                            ,JSON.toJSONString(fanoutMap));

            //場景2:使用單詞匹配路由鍵 rabbitmq_topic_routing_key_kh96.* ,發送消息
//            rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
//                                    ,"rabbitmq_topic_routing_key_kh96.abc"
//                                    ,JSON.toJSONString(fanoutMap));

            //場景3:0 或多詞匹配 rabbitmq_topic_routing_key_kh96.# ,發送消息
//            rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
//                                                            ,"rabbitmq_topic_routing_key_kh96.abc.def"
//                                                            ,JSON.toJSONString(fanoutMap));

        });

        return ResultBuildUtil.success("使用主題模式。發送消息成功");
    }

    
}

3.3.5 請求測試

3.3.5.1 場景1:使用唯一路由鍵
發送消息路由鍵名: rabbitmq_topic_routing_key_kh96.only
發起請求:

請求結果:

隊列One,Two,Three都接收到了信息,所以對應的消費者One,Two,Three都消費了信息;

3.3.5.2 場景2:使用單詞匹配路由鍵
發送消息路由鍵名: rabbitmq_topic_routing_key_kh96.abc
發起請求:

請求結果:

隊列Two,Three都接收到了信息,所以對應的消費者Two,Three都消費了信息;

3.3.5.3 場景3:0 或多詞匹配
發送消息路由鍵名: rabbitmq_topic_routing_key_kh96.abc.def
發起請求:

請求結果:

只有隊列Three接收到了信息,所以只有對應的消費者Three消費了信息;

3.3.6 主題模式小結

  1. 當生產者發送消息到交換機,指定的路由鍵一般都是使用句點(.)作為分隔符,分割多個單詞。

    • 比如:詞1.詞...
  2. 所謂單詞:是由一個或多個單片語成,多個單片語成的路由鍵,就代表某種主題的關鍵信息,路由鍵長度最多不能超過256位元組。

  3. 匹配規則格式:* 或者 #

    • *代表單個單詞。
      • 比如 隊列綁定主題交換機的 路由鍵:KH96.* ,代表發送消息的路由鍵是以KH96開頭,後面只能跟一個單詞,如:KH96.aaa,KH96.bbb等。
      • 再比如:綁定路由鍵為:KH96.*.KGC,代表發送消息路由鍵是以KH96開頭,中間可以帶一個單詞,結尾,如:KH96.aa.KGC,KH96.bb.KGC。
      • #代表0或多個單詞,比如 隊列綁定主題交換機的 路由鍵:KH96.#,代表發送消息的路由鍵是以KH96開頭,後面只能跟0個或者多個單詞,如:KH96,KH96.aaa,KH96.aaa.bbb。
      • 再比如:綁定路由鍵為:KH96.#.KGC,代表發送消息路由鍵是以KH96開頭,中間可以帶一個或多個單詞,結尾,如KH96.KGC,KH96.aa.KGC,KH96.aa.bb.KGC。
    1. 備註:

      • 如果主題交換機,隊列綁定的路由鍵使用的不是模糊匹配符,主題交換機跟直連交換機一致。
      • 如果單獨使用#,代表所有隊列都可以收消息,主題交換機跟扇形交換機一致。
  4. 提醒:

    • 主題模式下,隊列綁定的路由鍵,是允許為多個的。

    • 如果路由鍵被更換,之前的路由鍵是不會刪除,仍然會綁定到當前隊列上。

    • 如果有多個路由鍵匹配,規則為:如果其中一個沒有匹配到,會自動匹配其他路由鍵,如果需要刪除歷史路由鍵,需要在RabbitMQ控制台刪除。

3.4 消息 發送確認 - 交換機,隊列 確認

3.4.1 配置信息

# RabbitMQ配置
spring:
  rabbitmq:
    # 打開發送消息確認配置
    publisher-confirms: true # 發送消息到交換機確認,預設false
    publisher-returns: true # 發送消息到隊列確認,預設是false

3.4.2 消息發送確認配置類

  • 觸發機制
    • ConfirmCallback 函數式介面中的唯一抽象方法 confirm : 是否有交換機都會觸發;
      • 標識:true,發送到交換機正常;
      • 標識:false,發送到交換機失敗,進行特殊處理;
    • ReturnCallback 函數式介面中的唯一抽象方法 returnedMessage :交換機存在且隊列不存在才會觸發;
      • 觸發:發送到隊列失敗,進行特殊處理;
/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQ 消息確認機制: 發送確認
 */
@Slf4j
@Configuration
public class RabbitMQSendMsgAck {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        //發送確認,消息是通過rabbitTemplate發的,所以要重置rabbitTemplate才可以實現
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        //開啟觸發回調處理方法,不論消息推送結果是什麼,都會強制觸發回調方法
        rabbitTemplate.setMandatory(true);

        //指定消息發送到RabbitMQ的broker節點,是否正確到達交換機確然
        //是否有交換機都會觸發
        rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) ->{
            log.info("######  發送消息確認回調,數據:{}  ######",correlationData);
            log.info("######  發送消息確認回調,標識:{}  ######",ack);
            log.info("######  發送消息確認回調,原因:{}  ######\n",cause);

            //TODO 如果沒有到交換機,ack返回的是false,可能是交換機被刪除,就需要進行特殊處理的業務,比如給負責人發送信息或郵件

        });

        //消息是否正確到達交換機上綁定的 目標隊列
        //交換機存在且隊列不存在才會觸發
        rabbitTemplate.setReturnCallback( ( message, replyCode, replyText,exchange,routingKey) ->{
            log.info("######  發送消息返回回調,數據:{}  ######",message);
            log.info("######  發送消息返回回調,返回碼:{}  ######",replyCode);
            log.info("######  發送消息返回回調,返回說明:{}  ######",replyText);
            log.info("######  發送消息返回回調,交換機:{}  ######",exchange);
            log.info("######  發送消息返回回調,路由鍵:{}  ######\n",routingKey);

            //TODO 如果沒有到目標隊列,就需要進行特殊處理的業務,比如給負責人發送信息或郵件

        });

        return rabbitTemplate;
    }

}

3.4.3 交換機

/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: Ack 測試交換機,沒有綁定隊列
 */
@Configuration
public class RabbitMQAckConfig {


    //ack 測試交換機,沒有綁定隊列
    @Bean
    public DirectExchange directExchange(){

        return new DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96);

    }

}

3.4.4 請求方法

/**
* @author : huayu
* @date   : 2/11/2022
* @param  : [topicMsg]
* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
* @description : 直連模式測試 Ack  不存在交換機  和 存在交換機
*/
@GetMapping("/sendMsgAck")
public RequestResult<String> RabbitMQSendMsgAck(@RequestParam String ackMsg){
    log.info("------- 直連 模式 測試Ack,發送消息 -------");
    //模擬發送直連消息

    //調用直連模式消息生產者,發送消息
    //測試1: 不存在的 交換機
    rabbitMQDirectProducer.sendDirectMsg2DirectExchange("test_noExchange"
                                                        ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
                                                        ,JSON.toJSONString(ackMsg));

    return ResultBuildUtil.success("使用直連模式 測試Ack。交換機不存在");

    //測試2: 存在的交換機,但是沒有綁定 隊列
    //            rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96
    //                                    ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
    //                                    ,JSON.toJSONString(ackMsg));


    //        return ResultBuildUtil.success("使用直連模式 測試Ack。交換機 沒有綁定隊列");

}

3.2.5 請求測試

3.2.5.1 交換機不存在
發起請求:

請求結果:

交換機不存在,

觸發了ConfirmCallback 函數式介面中的唯一抽象方法 confirm ,

返回標識 false,發送到交換機失敗,

原因,該交換機不存在;

註意:如果沒有到交換機,ack返回的是false,可能是交換機被刪除,就需要進行特殊處理的業務,比如給負責人發送信息或郵件;

3.2.5.2 交換機存在,但是沒有綁定 隊列
發起請求:

請求結果:

交換機存在,

觸發了ConfirmCallback 函數式介面中的唯一抽象方法 confirm ,

返回標識 true,發送到交換機成功;

沒有綁定隊列,

觸發了ReturnCallback 函數式介面中的唯一抽象方法 returnedMessage ,

返回說明 NO_ROUT,發送到隊列失敗;

註意:如果沒有到目標隊列,就需要進行特殊處理的業務,比如給負責人發送信息或郵件;

3.2.5.3 交換機存在,且綁定了隊列
發起請求

請求結果:

交換機存在,且綁定了隊列,

觸發了ConfirmCallback 函數式介面中的唯一抽象方法 confirm ,

返回標識 true,發送到交換機成功;

沒有觸發ReturnCallback 函數式介面中的唯一抽象方法 returnedMessage ,

說明發送到隊列成功;

3.5 消息確認

3.5.1 自動確認

3.5.1.1 配置信息
# RabbitMQ配置
spring:
  rabbitmq:
    # 消費消息確認配置-自動
    listener:
      simple:
        retry:
          enabled: true # 開啟消費消息失敗重試機制
          max-attempts: 5 # 指定重試的次數
          max-interval: 10000 # 最大重試間隔時間,單位毫秒,每次重試的間隔時間,不能比當前設置的值大,如果計算間隔時間是6s,最大時間時間5s,會用5秒
          initial-interval: 1000 # 重試間隔初始時間,單位毫秒
          multiplier: 2 #乘子;重試的間隔時間 * 乘子,就是下一次重試的時間間隔市場,即:1s,2s,4s,8s,16...
3.5.1.2 消費者 模擬異常

註意:測試時為了讓消費者One一定接收到消息,所以註釋掉消費者Two,這樣才可以保證消費者One接收消息,然後觸發異常,重試的效果;

/**
 * Created On : 1/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: Direct 直連模式消費者 One
 */
@Slf4j
@Component
//指定接聽的 消息隊列 名字
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
public class RabbitMQDirectConsumerOne {

    /**
     * @author : huayu
     * @date   : 1/11/2022
     * @param  : [directMsgJson]
     * @return : void
     * @description : Direct 直連模式消費者One,消費信息
     */
    //指定消息隊列中的消息,交給對應的方法處理
    @RabbitHandler
    public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
        log.info("***** Direct直連模式,消費者One,消費消息:{} ******",directMsgJson);

        // TODO 核心業務邏輯處理

        //預設自動確認,模擬消費端消費消息,處理異常,自動重試
        int a = 10 / 0;

    }

}
3.5.1.3 請求方法
/**
* @author : huayu
* @date   : 3/11/2022
* @param  : [ackMsg]
* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
* @description : 測試 消費者自動 重試 
*/
@GetMapping("/consumeAckAuto")
public RequestResult<String> testRabbitMQConsumeAckAuto(@RequestParam String ackMsg){
    log.info("------- 直連 模式 測試Ack 自動 重試,發送消息 -------");
    //模擬發送直連消息
    //消費消息失敗重試機制
  rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
                                                        ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
                                                        ,JSON.toJSONString(ackMsg));

    return ResultBuildUtil.success("使用直連模式 消費確認-自動消費成功");

}
3.5.1.4 請求測試

發起請求:

請求結果:

一共重試了五次

間隔時間為1,2,4,8

(如果還有一次應該為10,因為最後一次計算時間16大於最大間隔時間10,按最大間隔時間10重試);

3.4.2 手動確認

註意:

  • 手動確認需要先將自動確認的配置註釋掉;
  • 使用手動確認,不能再用@RabbitListener 監聽,手動確認相關隊列,需要我們手動配置消費者;
3.4.2.1 消費消息手動確認的監聽器
  • 獲取消息消費的唯一標識 message.getMessageProperties().getDeliveryTag();

  • 執行業務處理

    • 每個消費者在同一個時間點,最多處理一個message,預設是0(全部) channel.basicQos(1);
    • 獲取message的消息內容 message
    • 獲取消息對應的目標隊列,可以實現一些靈活判斷處理message.getMessageProperties().getConsumerQueue()
      • 比如根據不同的目標隊列進行不同的處理
      • 在消息處理的時候如果出錯會被捕獲(消息確認失敗)
    • 消息確認channel.basicAck(deliveryTag,false);
  • 消息確認失敗處理

    • 根據條件判斷設置是否重回隊列 ,是否支持批量處理 channel.basicNack(deliveryTag,true,false);
/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 消費端 消費消息手動確認的監聽器,註意它也是一個消費者,並可以通過 消息監聽容器工廠,動態配置多個
 */
@Slf4j
@Component
public class RabbitMQConsumerManualAckListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws IOException {
        //獲取消息消費的唯一標識,rabbitMQ在推送消息時,會給每個消息攜帶一個唯一標識,值是一個遞增的正整數
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("====== 消費消息的唯一標識:{}  ======",deliveryTag);

        //執行手動確認業務處理
        try{
            //給每個消費者在同一個時間點,最多處理一個message,預設是0(全部),換句話說,在接收到消費者的 ack 確認前,不會分發新的消息給當前的消費者
            //在接收當前消息的ack確認前是不會發送新的消息給它
            channel.basicQos(1);

            //獲取message的消息內容,發送的消息的json字元串
            log.info("====== 消息隊列中完整消息內容:{} ======",message);

            //獲取發送的實際內容,發送消息的json字元串
            log.info("====== 發送的實際內容:{} ======",new String(message.getBody(),"utf-8"));

            //獲取消息對應的目標隊列,可以實現一些靈活判斷
            //TODO 比如根據目標隊列不同,可以做不同的處理
            log.info("======  消息的來源隊列:{} =======",message.getMessageProperties().getConsumerQueue());

            //模擬錯誤 ,當 deliveryTag 為1的時候,進入 報錯 ,捕獲異常,然後(如果設置了重回隊列)將消息重回隊列
            //if(deliveryTag == 1){
            //    int num = 1/0;
            //}

            //消費消息的手動確認,消息確認成功-basicAck
            //第一個參數deliveryTag,消息的唯一標識
            //第二個參數multiple,消息是否支持批量確認,如果是true,代表可以一次性確認標識小於等於當前標識的所有消息
            //如果是false,只會確認當前消息
            channel.basicAck(deliveryTag,false);


        }catch (Exception e){
            //說明消費消息處理失敗,如果不進行確認(自動確認,投遞成功即確認,消費是否正常,不關心),消息就會丟失
            //消息處理失敗確認,代表消息沒有正確消費,註意:此種方式一次只能確認一個消息
            //第一給參數是消息的唯一標識,
            //第二個參數是代表是否重回隊列,如果是true,重新將該消息放入隊列,再次消費
            //註意:第二個參數要謹慎,必須要結合具體業務場景,根據業務判斷是否需要重回隊列,一旦處理不當,機會導致消息迴圈入隊,消息擠壓
            //不重回隊列 require = false
//            channel.basicReject(deliveryTag,false);
            //重回隊列 require = true
            channel.basicReject(deliveryTag,true);

            //消息處理失敗確認,代表消息沒有正確消費,註意,此種方式支持批量
            //第一個參數是消息的唯一標識,
            //第二個參數是代表是否支持批量確認
            //第三給參數代表是否重回隊列
            //不重回隊列 require = false
//            channel.basicNack(deliveryTag,true,false);
            //重回隊列 require = true
//            channel.basicNack(deliveryTag,false,true);

            //TODO 手動消費異常處理

            log.error("====== 消費消息失敗,異常信息:{}  ======",e.getMessage());

        }

    }

}

3.4.2.2 消費消息手動確認配置類
  • 配置消費者的數量 setConcurrentConsumers(2);
  • 最大併發消費者數量 setMaxConcurrentConsumers(5);
  • 消費消息確認機製為手動 setAcknowledgeMode(AcknowledgeMode.MANUAL);
  • 設置監聽消息隊列的名稱,支持多個隊列setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);
  • 設置消息手動確認監聽器 setMessageListener(rabbitMQConsumerManualAckListener);
/**
 * Created On : 2/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RabbitMQ  消費消息手動確認配置類
 */
@Configuration
public class RabbitMQConsumeManualAckConfig {

    @Autowired
    private RabbitMQConsumerManualAckListener rabbitMQConsumerManualAckListener;

    /**
     * @author : huayu
     * @date   : 2/11/2022
     * @param  : [connectionFactory]
     * @return : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
     * @description : 自定義消息監聽器工程對象
     */
    @Bean
    public SimpleMessageListenerContainer simpleBrokerMessageHandler(ConnectionFactory connectionFactory){
        //初始化消息監聽容器的工程對象
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

        //初始化併發消費者的數量,比如是2,代表同時會有 兩個消費者 消費消息
        // ,投遞標識可能會相同
        container.setConcurrentConsumers(2);

        //設置最大的併發消費者數量,數量不能低於初始化併發消費者數量
        //可以動態的設定當前容器的消費者數量,可以實現動態增加和減少消費者的演算法在 SimpleMessageListenerContainer類中實現
        container.setMaxConcurrentConsumers(5);

        //底層動態實現消費者數量的增加減少原理
        // 有consumer已連續十個周期(consecutiveActiveTrigger)處於活動狀態,並且自啟動後最後一個consumer運行至少經過了10秒鐘,則將啟動新的consumer。
        // private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;
        // 停止消費者演算法的時間間隔
        // 有consumer已連續10個周期(consecutiveIdleTrigger)連續空閑狀態,並且上一個consumer至少在60秒之前停止,那麼該consumer將停止
        // private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;
        // 預設連續活動10個周期
        // private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
        // 預設連續空閑10個周期
        // private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;

        //預設的消費消息確認機制是自動,需要改為手動
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //設置監聽消息隊列的名稱,支持多個隊列(隊列名1,隊列名2...),註意前提是指定的隊列必須是存在的
        //監聽 直連模式的 RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96 隊列
        container.setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);

        //監聽 扇形模式的 
        //RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE 隊列
        //和 RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO 隊列
//        container.setQueueNames(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE
//                ,RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO);

        //指定消息確認的處理類,會同時產生多個消費者,參數是上面設置的,
        //註意之前使用直連模式,消息消費者,要註釋掉,防止同類型的監聽器,處理同一隊列
        //如果不是被當前消息確認的處理類消費(使用註解@RabbitListener),會導致消息不執行手動處理
        container.setMessageListener(rabbitMQConsumerManualAckListener);

        // 返回消息監聽容器工廠對象
        return container;

    }


}
3.4.2.3 請求方法
//======================
/**
* @author : huayu
* @date   : 3/11/2022
* @param  : [ackMsg]
* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
* @description : 測試 消費者手動確認
*/
@GetMapping("/consumeAckManual")
public RequestResult<String> testRabbitMQConsumeAckManual(@RequestParam String ackMsg){
    log.info("------- 測試Ack 手動 確認,發送消息 -------");
    //消息手動確認
    //模擬發送直連消息
    //測試1,2
    rabbitMQDirectProducer.sendDirectMsg2DirectExchange(
        RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
        ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
        ,JSON.toJSONString(ackMsg));

    return ResultBuildUtil.success("使用直連模式 手動消費確認-消息確認成功");
    

    //測試3
    //模擬發送扇形消息
    //        rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(
    //                RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96
    //                ,null
    //                ,JSON.toJSONString(ackMsg));
    //
    //
    //        return ResultBuildUtil.success("使用扇形模式 手動消費確認-消息確認成功");

}
3.4.2.4 請求測試
3.4.2.4.1 模擬發送直連消息併成功確認

發送請求:

請求結果:

3.4.2.4.2 模擬發送直連消息,拋出異常,重回隊列

發送請求:

代碼重點:

請求結果:

3.4.2.4.3 模擬發送扇形消息併成功確認

發送請求:

請求結果:





您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡單場景舉例 聚合工程創建示例 說明: 創建 Maven Project:表示創建 maven 項目,new Project 方式創建 創建 Maven Module:表示創建 maven 項目,new Module 方式創建 創建 SpringBoot Module:表示創建 SpringBoot ...
  • 作者:張富春(ahfuzhang),轉載時請註明作者和引用鏈接,謝謝! cnblogs博客 zhihu Github 公眾號:一本正經的瞎扯 首先,我希望所有golang中用於http請求響應的結構,都使用proto3來定義。 麻煩的是,有的情況下某個欄位的類型可能是動態的,對應的JSON類型可能是 ...
  • 哈嘍兄弟們,今天我們來複習下變數。 首先嘗試在一個hello_world.py文件中使用變數,在文件開頭添加一行代碼,並對第二行代碼進行修改,如下所示: message="hello world" print(message) # Python源碼/教程領取扣君羊:279199867 運行這個程式, ...
  • 一.subprocess模塊 subprocess是Python 2.4中新增的一個模塊,它允許你生成新的進程,連接到它們的 input/output/error 管道,並獲取它們的返回(狀態)碼。這個模塊的目的在於替換幾個舊的模塊和方法,如: os.system os.spawn* 1.subpr ...
  • Python這門語言很適合用來寫些實用的小腳本,跑個自動化、爬蟲、演算法什麼的,非常方便。 這也是很多人學習Python的樂趣所在,可能只需要花個禮拜入門語法,就能用第三方庫去解決實際問題。我在Github上就看到過不少Python代碼的項目,幾十行代碼就能實現一個場景功能,非常實用。 比方說倉庫Py ...
  • 一、爬蟲的步驟 1、 需求分析(人做) 2、尋找網站(人)3、下載網站的返回內容(requests)4、通過返回的信息找到需要爬取的數據內容(正則表達式-re,xpath-lxml)5、存儲找到的數據內容(mysql) 二、requests import requests url = 'http:/ ...
  • Elasticsearch Analyzer 內置分詞器 篇主要介紹一下 Elasticsearch中 Analyzer 分詞器的構成 和一些Es中內置的分詞器 以及如何使用它們 前置知識 es 提供了 analyze api 可以方便我們快速的指定 某個分詞器 然後對輸入的text文本進行分詞 幫 ...
  • 公司的pyc做了加密, 前段時間研究了一下怎麼解密. 最開始的思路是反彙編pypy的dll, 找到import代碼的實現, 然後寫一個解碼的函數. 但是對反編譯的東西不熟悉, 想要找到解密的地方比較困難. 最後放棄了這個思路. 後面看到了一篇pyc文件格式的文章, 得知pyc文件其實就是文件頭+ma ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...