1、安裝erlang語言環境 1.1 創建 erlang安裝目錄 mkdir erlang 1.2 上傳解壓壓縮包 上傳到: /root/ 解壓縮# tar -zxvf otp_src_22.0.tar.gz 1.3 進入解壓縮目錄,指定目錄並安裝 進入解壓目錄,指定安裝目錄# ./configur ...
1、安裝erlang語言環境
1.1 創建 erlang安裝目錄
mkdir erlang
1.2 上傳解壓壓縮包
上傳到: /root/
解壓縮# tar -zxvf otp_src_22.0.tar.gz
1.3 進入解壓縮目錄,指定目錄並安裝
進入解壓目錄,指定安裝目錄# ./configure --prefix=/usr/local/kh96/erlang
安裝# make install
添加環境變數# echo 'export PATH=$PATH:/usr/local/kh96/erlang/bin' >> /etc/profile
刷新環境變數# source /etc/profile
1.4 測試環境
進入erlang環境#erl
退出# halt().
2、安裝RabbitMQ
2.1上傳解壓壓縮包
第一步xx.tar.xz->xx.tar # /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz
第二步#tar -xvf rabbitmq-server-generic-unix-3.7.15.tar
2.2 添加環境變數
添加環境變數# echo 'export PATH=$PATH:/usr/local/kh96/rabbitmq/rabbitmq_server-3.7.15/sbin' >> /etc/profile
刷新環境變數# source /etc/profile
2.3 啟動
啟動# rabbitmq-server -detached
查看狀態# rabbitmqctl status
查看防火牆狀態# firewall-cmd --state (建議不開)
2.4 開啟雲服務埠
RabbitMQ 服務埠: 5672
RabbitMQ 監控平臺埠: 15672
開啟web插件允許監控平臺訪問 # rabbitmq-plugins enable rabbitmq_management
2.5 遠程 訪問 15672
公網ip:15672
Username: guest
Password: guest
提示這個這個賬號只允許本地訪問,所以需要添加用戶
2.6 添加用戶
顯示所有用戶# rabbitmqctl list_users
查看guest用戶許可權# rabbitmqctl list_user_permissions guest
添加admin用戶及密碼# rabbitmqctl add_user admin admin
設置限權# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
授予admin用戶administrator角色# rabbitmqctl set_user_tags admin administrator
查看admin用戶許可權# rabbitmqctl list_user_permissions admin
刪除用戶guest# rabbitmqctl delete_user guest
停止RabbitMQ# rabbitmqctl stop
2.7 登錄成功
Username: admin
Password: admin
3、SpringBoot整合
3.0 項目準備
3.0.1 jar包
<!--rabbitmq依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.0.2 配置信息
# 埠
server:
port: 8104
# RabbitMQ配置
spring:
rabbitmq:
host: x.xxx.xx.xx #伺服器公網ip
port: 5672
username: admin
password: admin
3.0.3 常量類
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQ 常量類,系統的所有隊列名,交換機名,路由鍵名等,統一進行配置管理
*/
public class RabbitMQConstant {
//========================== 直連模式
/**
* Direct直連模式 隊列名
*/
public static final String RABBITMQ_DIRECT_QUEUE_NAME_KH96 ="rabbitmq_direct_queue_name_kh96";
/**
* Direct直連模式 交換機名
*/
public static final String RABBITMQ_DIRECT_EXCHANGE_KH96 ="rabbitmq_direct_exchange_kh96";
/**
* Direct直連模式 路由鍵
*/
public static final String RABBITMQ_DIRECT_ROUTING_KEY_KH96 ="rabbitmq_direct_routing_key_kh96";
//========================== 扇形模式
/**
* Fanout 扇形模式 隊列名one
*/
public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE ="rabbitmq_fanout_queue_name_kh96_one";
/**
* Fanout 扇形模式 隊列名two
*/
public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO ="rabbitmq_fanout_queue_name_kh96_two";
/**
* Fanout 扇形模式 交換機名
*/
public static final String RABBITMQ_FANOUT_EXCHANGE_KH96 ="rabbitmq_fanout_exchange_kh96";
//========================== 主題模式
// -- 隊列
/**
* Topic 主題模式 隊列名one
*/
public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE ="rabbitmq_topic_queue_name_kh96_one";
/**
* Topic 主題模式 隊列名two
*/
public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO ="rabbitmq_topic_queue_name_kh96_two";
/**
* Topic 主題模式 隊列名Three
*/
public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE ="rabbitmq_topic_queue_name_kh96_three";
//-- 交換機
/**
* Topic 主題模式 交換機名
*/
public static final String RABBITMQ_TOPIC_EXCHANGE_KH96 ="rabbitmq_topic_exchange_kh96";
//-- 路由鍵
/**
* Topic 主題模式 -路由鍵-唯一匹配規則
*/
public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY="rabbitmq_topic_routing_key_kh96.only";
/**
* Topic 主題模式 -路由鍵-單詞匹配規則 * 單個詞
*/
public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD="rabbitmq_topic_routing_key_kh96.*";
/**
* Topic 主題模式 -路由鍵-模糊匹配規則 # 0 或 多個詞
*/
public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE="rabbitmq_topic_routing_key_kh96.#";
}
3.0.4 手動操作隊列關係
在測試的時候,一定要註意交換機和隊列的綁定關係,只要綁定過的關係就會一直存在需要手動刪除;如果測試結果不正常的時候,看一些交換機和隊列與鍵值的綁定關係;
選擇隊列:
刪除隊列:
3.1 Direct 直連模式
3.1.0 核心構造方法:Queue
- 核心構造方法:Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
- name參數:name – the name of the queue.
- 指定創建的消息隊列的名字,參數必傳,即創建隊列必須要有隊列名。
- durable參數:durable – true if we are declaring a durable queue (the queue will survive a server restart)
- 指定創建的消息隊列是否需要持久化,預設是true,如果是true,該隊列支持持久化,自動持久化到磁碟,RabbitMQ服務重啟,隊列仍然是可用的(存活的)。
- exclusive參數:true if we are declaring an exclusive queue (the queue will only be used by the declarer's connection)
- 指定創建的消息隊列是否是排他隊列,預設是false,如果是true,該隊列是排他隊列,只有創建當前隊列的連接才可以使用,連接一旦斷開,隊列會自動刪除。
- autoDelete參數:true if the server should delete the queue when it is no longer in use
- 指定創建的消息隊列是否是自動刪除隊列,預設是false,如果是true,該隊列是自動刪除隊列,一旦沒有消息生產者或者消費者使用當前隊列,會被自動刪除。
3.1.1 配置類
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: Direct直連模式,自動配置類,自動創建隊列,交換機,並將隊列綁定到交換機,指定唯一路由
*/
@Configuration
public class RabbitMQDirectConfig {
//創建 直連隊列
@Bean
public Queue directQueue(){
//創建 直連隊列
return new Queue(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96,true);
}
//創建 直連交換機
@Bean
public DirectExchange directExchange(){
// 創建支持持久化的直連交換機,指定交換機的名稱
return new DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96);
}
//將直連隊列和直連交換機 進行綁定,並指定綁定的唯一路由鍵
@Bean
public Binding directBinding(){
// 將直連隊列和直連交換機進行綁定,並指定綁定的唯一路由鍵
return BindingBuilder.bind(directQueue())
.to(directExchange())
.with(RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96);
}
}
3.1.2 消息生產者
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: Direct 直連模式 消息生產者
*/
@Slf4j
@Component
public class RabbitMQDirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @author : huayu
* @date : 1/11/2022
* @param : [directMsg, directExchange, directRoutingKey]
* @return : void
* @description : 使用直連模式,發送消息到直連交換機,通過交換機綁定的唯一路由鍵,將消息發送到綁定的隊列中
*/
public void sendDirectMsg2DirectExchange(String directExchange,String directRoutingKey,String directMsg){
log.info("++++++ direct模式消息生產者,發送直連消息:{},到交換機:{},路由鍵:{} ++++++",directMsg,directExchange,directRoutingKey);
rabbitTemplate.convertAndSend(directExchange,directRoutingKey,directMsg);
}
}
3.1.3 消費者
3.1.3.1 消費者One
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: Direct 直連模式消費者 One
*/
@Slf4j
@Component
//指定接聽的 消息隊列 名字
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
public class RabbitMQDirectConsumerOne {
/**
* @author : huayu
* @date : 1/11/2022
* @param : [directMsgJson]
* @return : void
* @description : Direct 直連模式消費者One,消費信息
*/
//指定消息隊列中的消息,交給對應的方法處理
@RabbitHandler
public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
log.info("***** Direct直連模式,消費者One,消費消息:{} ******",directMsgJson);
// TODO 核心業務邏輯處理
}
// @RabbitHandler //自動根據隊列中的消息類型,自動區分方法
// public void consumeOtherDirectMsgFromDirectQueue(List<String> directMsgJson){
// log.info("***** Direct直連模式,消費者Two,消費消息:{} ******",directMsgJson);
//
// // TODO 核心業務邏輯處理
//
// }
}
3.1.3.2 消費者Two
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQDirectConsumerTwo
*/
@Slf4j
@Component
//指定監聽的消息隊列 名字
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
public class RabbitMQDirectConsumerTwo {
/**
* @author : huayu
* @date : 1/11/2022
* @param : [directMsgJson]
* @return : void
* @description : Direct 直連模式消費者 Two,消費信息
*/
//指定消息隊列中的消息,交給對應的方法處理
@RabbitHandler
public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
log.info("***** Direct直連模式,消費者Two,消費消息:{} ******",directMsgJson);
// TODO 核心業務邏輯處理
}
}
3.1.4 請求測試方法
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 測試 RabbitMQ 消息隊列的操作入口
*/
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitMQDirectProducer rabbitMQDirectProducer;
/**
* @author : Administrator
* @date : 2022/11/1
* @param : [directMsg]
* @return : com.kgc.sct.util.RequestResult<java.lang.String>
* @description : 測試direct直連模式,發送和消費消息
*/
@GetMapping("/direct")
public RequestResult<String> testRabbitMQDirect(@RequestParam String directMsg){
log.info("direct直連模式,發送消息");
//模擬發送5條直連消息
Stream.of(11,22,33,44,55).forEach(directNo ->{
//模擬創建消息對象
Map<String,Object> directMap =new HashMap<>();
directMap.put("directNo",directNo);
directMap.put("directData",directMsg);
directMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
//調用直連模式消息生產者,發送消息
rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
,JSON.toJSONString(directMap));
return ResultBuildUtil.success("使用直連模式。發送消息成功");
}
}
3.1.5 請求測試
發起請求
3.1.5.1 一個消費者
消費者One消費了隊列中的所有信息(只有一個隊列);
3.1.5.2 兩個消費者
消費者One和消費者Two依次消費了隊列中的所有信息(只有一個隊列);
3.2 Fanout 扇形模式
3.2.1 配置類
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: Fanout扇形模式,自動配置類,自動創建隊列,交換機,並將隊列綁定到交換機
*/
@Configuration
public class RabbitMQFanoutConfig {
//創建 扇形隊列One
@Bean
public Queue fanoutQueueOne(){
return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE);
}
//創建 扇形隊列Two
@Bean
public Queue fanoutQueueTwo(){
return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO);
}
// 創建扇形交換機
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96);
}
//綁定隊列到扇形交換機,不需要 指定 路由鍵
@Bean
public Binding fanoutBindingQueueOne(){
//綁定隊列到扇形交換機,不需要路由鍵,消息是廣播發送,會給多有綁定的隊列群發信息消息(根本沒有提供with方法)
return BindingBuilder.bind(fanoutQueueOne())
.to(fanoutExchange());
}
@Bean
public Binding fanoutBindingQueueTwo(){
//綁定隊列到扇形交換機,不需要路由鍵,消息是廣播發送,會給多有綁定的隊列群發信息消息(根本沒有提供with方法)
return BindingBuilder.bind(fanoutQueueTwo())
.to(fanoutExchange());
}
}
3.2.2 消息生產者
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQFanoutProducer
*/
@Slf4j
@Component
public class RabbitMQFanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @author : huayu
* @date : 1/11/2022
* @param : [fanoutExchange, fanoutRoutingKey, fanoutMsg]
* @return : void
* @description : 使用扇形模式,發送消息到扇形交換機,將消息發送到綁定的隊列中
*/
public void sendFanoutMsg2FanoutExchange(String fanoutExchange,String fanoutRoutingKey,String fanoutMsg){
log.info("++++++ Fanout模式消息生產者,發送廣播消息:{},到交換機:{},路由鍵:{} ++++++", fanoutMsg, fanoutExchange, fanoutRoutingKey);
rabbitTemplate.convertAndSend(fanoutExchange, fanoutRoutingKey, fanoutMsg);
}
}
3.2.3 消費者
3.2.3.1 消費者One
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQFanoutConsumerOne
*/
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE)
public class RabbitMQFanoutConsumerOne {
@RabbitHandler
public void fanoutConsumeOneFanoutMsgFromFanoutQueueOne(String fanoutMsgJson){
log.info("****** Fanout扇形模式,消費One,消費隊列One,消息:{} ******",fanoutMsgJson);
// TODO 核心業務邏輯處理
}
}
3.2.3.2 消費者Two
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQFanoutConsumerTwo
*/
@Slf4j
@Component
//@RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO)
public class RabbitMQFanoutConsumerTwo {
// @RabbitHandler
public void fanoutConsumeTwoFanoutMsgFromFanoutQueueTwo(String fanoutMsgJson){
log.info("****** Fanout扇形模式,消費Two,消費隊列Two,消息:{} ******",fanoutMsgJson);
// TODO 核心業務邏輯處理
}
}
3.2.4 請求測試方法
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 測試 RabbitMQ 消息隊列的操作入口
*/
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitMQFanoutProducer rabbitMQFanoutProducer;
/**
* @author : huayu
* @date : 1/11/2022
* @param : [fanoutMsg]
* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
* @description : 測試扇形(廣播)模式,發送和消費信息
*/
@GetMapping("/fanout")
public RequestResult<String> testRabbitMQFanout(@RequestParam String fanoutMsg){
log.info("------- fanout 扇形模式,發送消息 -------");
//模擬發送5條直連消息
Stream.of(66,77,88,99,96).forEach(directNo ->{
//模擬創建消息對象
Map<String,Object> fanoutMap =new HashMap<>();
fanoutMap.put("directNo",directNo);
fanoutMap.put("directData",fanoutMsg);
fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
//調用扇形模式消息生產者,發送消息
rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96
,null
,JSON.toJSONString(fanoutMap));
});
return ResultBuildUtil.success("使用扇形模式。發送消息成功");
}
}
3.2.5 請求測試
3.2.5.1 一個消費者
消費者One消費了隊列One中的所有信息;
3.2.5.2 兩個消費者
消費者One消費了隊列One中的所有信息;
消費者Two消費了隊列Two中的所有信息;
3.3 Topic 主題模式
3.3.1 配置類
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: Topic 主題模式,自動配置類
*/
@Configuration
public class RabbitMQTopicConfig {
//======== 隊列
//Topic 主題模式 隊列One
@Bean
public Queue topicQueueOne(){
return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE,true);
}
//Topic 主題模式 隊列Two
@Bean
public Queue topicQueueTwo(){
return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO,true);
}
//Topic 主題模式 隊列Three
@Bean
public Queue topicQueueThree(){
return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE,true);
}
//======= 交換機
//Topic 主題模式 交換機
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96);
}
//======= 綁定
// 隊列One 綁定 Topic主題模式交換機 和 路由鍵-唯一匹配規則
@Bean
public Binding topicBindingQueueOne(){
return BindingBuilder.bind(topicQueueOne())
.to(topicExchange())
.with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY);
}
// 隊列Two 綁定 Topic主題模式交換機 和 路由鍵-單個單詞詞匹配規則
@Bean
public Binding topicBindingQueueTwo(){
return BindingBuilder.bind(topicQueueTwo())
.to(topicExchange())
.with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD);
}
// 隊列Two 綁定 Topic主題模式交換機 和 路由鍵-模糊匹配規則
@Bean
public Binding topicBindingQueueThree(){
return BindingBuilder.bind(topicQueueThree())
.to(topicExchange())
.with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE);
}
}
3.3.2 消息生產者
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQ 主題模式消息生產者
*/
@Slf4j
@Component
public class RabbitMQTopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @author : huayu
* @date : 1/11/2022
* @param : [topicExchange, topicRoutingKey, topicMsg]
* @return : void
* @description : 使用主題模式,發送消息到主題交換機,主題交換機會根據發送消息的路由鍵 ,根據匹配規則將消息投遞到匹配的隊列中
*/
public void sendTopicMsg2TopicExchange(String topicExchange,String topicRoutingKey,String topicMsg){
log.info("++++++ direct模式消息生產者,發送直連消息:{},到交換機:{},路由鍵:{} ++++++",topicMsg,topicExchange,topicRoutingKey);
rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey,topicMsg);
}
}
3.3.3 消費者
3.3.3.1 消費者One
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQTopicConsumerOne
*/
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE)
public class RabbitMQTopicConsumerOne {
@RabbitHandler
public void consumeTopicMsgFromTopicQueue(String topicMapJson){
log.info("****** Topic 主題模式,消費One,消費隊列One,消息:{} ******",topicMapJson);
// TODO 核心業務邏輯處理
}
}
3.3.3.2 消費者Two
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQTopicConsumerTwo
*/
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO)
public class RabbitMQTopicConsumerTwo {
@RabbitHandler
public void consumeTopicMsgFromTopicQueue(String topicMapJson){
log.info("****** Topic 主題模式,消費 Two,消費隊列 Two,消息:{} ******",topicMapJson);
// TODO 核心業務邏輯處理
}
}
3.3.3.3 消費者Three
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQTopicConsumerThree
*/
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE)
public class RabbitMQTopicConsumerThree {
@RabbitHandler
public void consumeTopicMsgFromTopicQueue(String topicMapJson){
log.info("****** Topic 主題模式,消費 Three,消費隊列 Three,消息:{} ******",topicMapJson);
// TODO 核心業務邏輯處理
}
}
3.3.4 請求測試方法
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 測試 RabbitMQ 消息隊列的操作入口
*/
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitMQTopicProducer rabbitMQTopicProducer;
@GetMapping("/topic")
public RequestResult<String> testRabbitMQTopic(@RequestParam String topicMsg){
log.info("------- topic 主題模式,發送消息 -------");
//模擬發送5條直連消息
Stream.of(95,96,97,98,99).forEach(directNo ->{
//模擬創建消息對象
Map<String,Object> fanoutMap =new HashMap<>();
fanoutMap.put("directNo",directNo);
fanoutMap.put("directData",topicMsg);
fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
//調用主題模式消息生產者,發送消息
//場景1:使用唯一路由鍵 rabbitmq_topic_routing_key_kh96.only , 發送消息
rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
,RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY
,JSON.toJSONString(fanoutMap));
//場景2:使用單詞匹配路由鍵 rabbitmq_topic_routing_key_kh96.* ,發送消息
// rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
// ,"rabbitmq_topic_routing_key_kh96.abc"
// ,JSON.toJSONString(fanoutMap));
//場景3:0 或多詞匹配 rabbitmq_topic_routing_key_kh96.# ,發送消息
// rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
// ,"rabbitmq_topic_routing_key_kh96.abc.def"
// ,JSON.toJSONString(fanoutMap));
});
return ResultBuildUtil.success("使用主題模式。發送消息成功");
}
}
3.3.5 請求測試
3.3.5.1 場景1:使用唯一路由鍵
發送消息路由鍵名: rabbitmq_topic_routing_key_kh96.only
發起請求:
請求結果:
隊列One,Two,Three都接收到了信息,所以對應的消費者One,Two,Three都消費了信息;
3.3.5.2 場景2:使用單詞匹配路由鍵
發送消息路由鍵名: rabbitmq_topic_routing_key_kh96.abc
發起請求:
請求結果:
隊列Two,Three都接收到了信息,所以對應的消費者Two,Three都消費了信息;
3.3.5.3 場景3:0 或多詞匹配
發送消息路由鍵名: rabbitmq_topic_routing_key_kh96.abc.def
發起請求:
請求結果:
只有隊列Three接收到了信息,所以只有對應的消費者Three消費了信息;
3.3.6 主題模式小結
-
當生產者發送消息到交換機,指定的路由鍵一般都是使用句點(.)作為分隔符,分割多個單詞。
- 比如:詞1.詞...
-
所謂單詞:是由一個或多個單片語成,多個單片語成的路由鍵,就代表某種主題的關鍵信息,路由鍵長度最多不能超過256位元組。
-
匹配規則格式:* 或者 #
- *代表單個單詞。
- 比如 隊列綁定主題交換機的 路由鍵:KH96.* ,代表發送消息的路由鍵是以KH96開頭,後面只能跟一個單詞,如:KH96.aaa,KH96.bbb等。
- 再比如:綁定路由鍵為:KH96.*.KGC,代表發送消息路由鍵是以KH96開頭,中間可以帶一個單詞,結尾,如:KH96.aa.KGC,KH96.bb.KGC。
- #代表0或多個單詞,比如 隊列綁定主題交換機的 路由鍵:KH96.#,代表發送消息的路由鍵是以KH96開頭,後面只能跟0個或者多個單詞,如:KH96,KH96.aaa,KH96.aaa.bbb。
- 再比如:綁定路由鍵為:KH96.#.KGC,代表發送消息路由鍵是以KH96開頭,中間可以帶一個或多個單詞,結尾,如KH96.KGC,KH96.aa.KGC,KH96.aa.bb.KGC。
-
備註:
- 如果主題交換機,隊列綁定的路由鍵使用的不是模糊匹配符,主題交換機跟直連交換機一致。
- 如果單獨使用#,代表所有隊列都可以收消息,主題交換機跟扇形交換機一致。
- *代表單個單詞。
-
提醒:
-
主題模式下,隊列綁定的路由鍵,是允許為多個的。
-
如果路由鍵被更換,之前的路由鍵是不會刪除,仍然會綁定到當前隊列上。
-
如果有多個路由鍵匹配,規則為:如果其中一個沒有匹配到,會自動匹配其他路由鍵,如果需要刪除歷史路由鍵,需要在RabbitMQ控制台刪除。
-
3.4 消息 發送確認 - 交換機,隊列 確認
3.4.1 配置信息
# RabbitMQ配置
spring:
rabbitmq:
# 打開發送消息確認配置
publisher-confirms: true # 發送消息到交換機確認,預設false
publisher-returns: true # 發送消息到隊列確認,預設是false
3.4.2 消息發送確認配置類
- 觸發機制
- ConfirmCallback 函數式介面中的唯一抽象方法 confirm : 是否有交換機都會觸發;
- 標識:true,發送到交換機正常;
- 標識:false,發送到交換機失敗,進行特殊處理;
- ReturnCallback 函數式介面中的唯一抽象方法 returnedMessage :交換機存在且隊列不存在才會觸發;
- 觸發:發送到隊列失敗,進行特殊處理;
- ConfirmCallback 函數式介面中的唯一抽象方法 confirm : 是否有交換機都會觸發;
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQ 消息確認機制: 發送確認
*/
@Slf4j
@Configuration
public class RabbitMQSendMsgAck {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
//發送確認,消息是通過rabbitTemplate發的,所以要重置rabbitTemplate才可以實現
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//開啟觸發回調處理方法,不論消息推送結果是什麼,都會強制觸發回調方法
rabbitTemplate.setMandatory(true);
//指定消息發送到RabbitMQ的broker節點,是否正確到達交換機確然
//是否有交換機都會觸發
rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) ->{
log.info("###### 發送消息確認回調,數據:{} ######",correlationData);
log.info("###### 發送消息確認回調,標識:{} ######",ack);
log.info("###### 發送消息確認回調,原因:{} ######\n",cause);
//TODO 如果沒有到交換機,ack返回的是false,可能是交換機被刪除,就需要進行特殊處理的業務,比如給負責人發送信息或郵件
});
//消息是否正確到達交換機上綁定的 目標隊列
//交換機存在且隊列不存在才會觸發
rabbitTemplate.setReturnCallback( ( message, replyCode, replyText,exchange,routingKey) ->{
log.info("###### 發送消息返回回調,數據:{} ######",message);
log.info("###### 發送消息返回回調,返回碼:{} ######",replyCode);
log.info("###### 發送消息返回回調,返回說明:{} ######",replyText);
log.info("###### 發送消息返回回調,交換機:{} ######",exchange);
log.info("###### 發送消息返回回調,路由鍵:{} ######\n",routingKey);
//TODO 如果沒有到目標隊列,就需要進行特殊處理的業務,比如給負責人發送信息或郵件
});
return rabbitTemplate;
}
}
3.4.3 交換機
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: Ack 測試交換機,沒有綁定隊列
*/
@Configuration
public class RabbitMQAckConfig {
//ack 測試交換機,沒有綁定隊列
@Bean
public DirectExchange directExchange(){
return new DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96);
}
}
3.4.4 請求方法
/**
* @author : huayu
* @date : 2/11/2022
* @param : [topicMsg]
* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
* @description : 直連模式測試 Ack 不存在交換機 和 存在交換機
*/
@GetMapping("/sendMsgAck")
public RequestResult<String> RabbitMQSendMsgAck(@RequestParam String ackMsg){
log.info("------- 直連 模式 測試Ack,發送消息 -------");
//模擬發送直連消息
//調用直連模式消息生產者,發送消息
//測試1: 不存在的 交換機
rabbitMQDirectProducer.sendDirectMsg2DirectExchange("test_noExchange"
,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
,JSON.toJSONString(ackMsg));
return ResultBuildUtil.success("使用直連模式 測試Ack。交換機不存在");
//測試2: 存在的交換機,但是沒有綁定 隊列
// rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96
// ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
// ,JSON.toJSONString(ackMsg));
// return ResultBuildUtil.success("使用直連模式 測試Ack。交換機 沒有綁定隊列");
}
3.2.5 請求測試
3.2.5.1 交換機不存在
發起請求:
請求結果:
交換機不存在,
觸發了ConfirmCallback 函數式介面中的唯一抽象方法 confirm ,
返回標識 false,發送到交換機失敗,
原因,該交換機不存在;
註意:如果沒有到交換機,ack返回的是false,可能是交換機被刪除,就需要進行特殊處理的業務,比如給負責人發送信息或郵件;
3.2.5.2 交換機存在,但是沒有綁定 隊列
發起請求:
請求結果:
交換機存在,
觸發了ConfirmCallback 函數式介面中的唯一抽象方法 confirm ,
返回標識 true,發送到交換機成功;
沒有綁定隊列,
觸發了ReturnCallback 函數式介面中的唯一抽象方法 returnedMessage ,
返回說明 NO_ROUT,發送到隊列失敗;
註意:如果沒有到目標隊列,就需要進行特殊處理的業務,比如給負責人發送信息或郵件;
3.2.5.3 交換機存在,且綁定了隊列
發起請求
請求結果:
交換機存在,且綁定了隊列,
觸發了ConfirmCallback 函數式介面中的唯一抽象方法 confirm ,
返回標識 true,發送到交換機成功;
沒有觸發ReturnCallback 函數式介面中的唯一抽象方法 returnedMessage ,
說明發送到隊列成功;
3.5 消息確認
3.5.1 自動確認
3.5.1.1 配置信息
# RabbitMQ配置
spring:
rabbitmq:
# 消費消息確認配置-自動
listener:
simple:
retry:
enabled: true # 開啟消費消息失敗重試機制
max-attempts: 5 # 指定重試的次數
max-interval: 10000 # 最大重試間隔時間,單位毫秒,每次重試的間隔時間,不能比當前設置的值大,如果計算間隔時間是6s,最大時間時間5s,會用5秒
initial-interval: 1000 # 重試間隔初始時間,單位毫秒
multiplier: 2 #乘子;重試的間隔時間 * 乘子,就是下一次重試的時間間隔市場,即:1s,2s,4s,8s,16...
3.5.1.2 消費者 模擬異常
註意:測試時為了讓消費者One一定接收到消息,所以註釋掉消費者Two,這樣才可以保證消費者One接收消息,然後觸發異常,重試的效果;
/**
* Created On : 1/11/2022.
* <p>
* Author : huayu
* <p>
* Description: Direct 直連模式消費者 One
*/
@Slf4j
@Component
//指定接聽的 消息隊列 名字
@RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
public class RabbitMQDirectConsumerOne {
/**
* @author : huayu
* @date : 1/11/2022
* @param : [directMsgJson]
* @return : void
* @description : Direct 直連模式消費者One,消費信息
*/
//指定消息隊列中的消息,交給對應的方法處理
@RabbitHandler
public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
log.info("***** Direct直連模式,消費者One,消費消息:{} ******",directMsgJson);
// TODO 核心業務邏輯處理
//預設自動確認,模擬消費端消費消息,處理異常,自動重試
int a = 10 / 0;
}
}
3.5.1.3 請求方法
/**
* @author : huayu
* @date : 3/11/2022
* @param : [ackMsg]
* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
* @description : 測試 消費者自動 重試
*/
@GetMapping("/consumeAckAuto")
public RequestResult<String> testRabbitMQConsumeAckAuto(@RequestParam String ackMsg){
log.info("------- 直連 模式 測試Ack 自動 重試,發送消息 -------");
//模擬發送直連消息
//消費消息失敗重試機制
rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
,JSON.toJSONString(ackMsg));
return ResultBuildUtil.success("使用直連模式 消費確認-自動消費成功");
}
3.5.1.4 請求測試
發起請求:
請求結果:
一共重試了五次
間隔時間為1,2,4,8
(如果還有一次應該為10,因為最後一次計算時間16大於最大間隔時間10,按最大間隔時間10重試);
3.4.2 手動確認
註意:
- 手動確認需要先將自動確認的配置註釋掉;
- 使用手動確認,不能再用@RabbitListener 監聽,手動確認相關隊列,需要我們手動配置消費者;
3.4.2.1 消費消息手動確認的監聽器
-
獲取消息消費的唯一標識 message.getMessageProperties().getDeliveryTag();
-
執行業務處理
- 每個消費者在同一個時間點,最多處理一個message,預設是0(全部) channel.basicQos(1);
- 獲取message的消息內容 message
- 獲取消息對應的目標隊列,可以實現一些靈活判斷處理message.getMessageProperties().getConsumerQueue()
- 比如根據不同的目標隊列進行不同的處理
- 在消息處理的時候如果出錯會被捕獲(消息確認失敗)
- 消息確認channel.basicAck(deliveryTag,false);
-
消息確認失敗處理
- 根據條件判斷設置是否重回隊列 ,是否支持批量處理 channel.basicNack(deliveryTag,true,false);
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 消費端 消費消息手動確認的監聽器,註意它也是一個消費者,並可以通過 消息監聽容器工廠,動態配置多個
*/
@Slf4j
@Component
public class RabbitMQConsumerManualAckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws IOException {
//獲取消息消費的唯一標識,rabbitMQ在推送消息時,會給每個消息攜帶一個唯一標識,值是一個遞增的正整數
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("====== 消費消息的唯一標識:{} ======",deliveryTag);
//執行手動確認業務處理
try{
//給每個消費者在同一個時間點,最多處理一個message,預設是0(全部),換句話說,在接收到消費者的 ack 確認前,不會分發新的消息給當前的消費者
//在接收當前消息的ack確認前是不會發送新的消息給它
channel.basicQos(1);
//獲取message的消息內容,發送的消息的json字元串
log.info("====== 消息隊列中完整消息內容:{} ======",message);
//獲取發送的實際內容,發送消息的json字元串
log.info("====== 發送的實際內容:{} ======",new String(message.getBody(),"utf-8"));
//獲取消息對應的目標隊列,可以實現一些靈活判斷
//TODO 比如根據目標隊列不同,可以做不同的處理
log.info("====== 消息的來源隊列:{} =======",message.getMessageProperties().getConsumerQueue());
//模擬錯誤 ,當 deliveryTag 為1的時候,進入 報錯 ,捕獲異常,然後(如果設置了重回隊列)將消息重回隊列
//if(deliveryTag == 1){
// int num = 1/0;
//}
//消費消息的手動確認,消息確認成功-basicAck
//第一個參數deliveryTag,消息的唯一標識
//第二個參數multiple,消息是否支持批量確認,如果是true,代表可以一次性確認標識小於等於當前標識的所有消息
//如果是false,只會確認當前消息
channel.basicAck(deliveryTag,false);
}catch (Exception e){
//說明消費消息處理失敗,如果不進行確認(自動確認,投遞成功即確認,消費是否正常,不關心),消息就會丟失
//消息處理失敗確認,代表消息沒有正確消費,註意:此種方式一次只能確認一個消息
//第一給參數是消息的唯一標識,
//第二個參數是代表是否重回隊列,如果是true,重新將該消息放入隊列,再次消費
//註意:第二個參數要謹慎,必須要結合具體業務場景,根據業務判斷是否需要重回隊列,一旦處理不當,機會導致消息迴圈入隊,消息擠壓
//不重回隊列 require = false
// channel.basicReject(deliveryTag,false);
//重回隊列 require = true
channel.basicReject(deliveryTag,true);
//消息處理失敗確認,代表消息沒有正確消費,註意,此種方式支持批量
//第一個參數是消息的唯一標識,
//第二個參數是代表是否支持批量確認
//第三給參數代表是否重回隊列
//不重回隊列 require = false
// channel.basicNack(deliveryTag,true,false);
//重回隊列 require = true
// channel.basicNack(deliveryTag,false,true);
//TODO 手動消費異常處理
log.error("====== 消費消息失敗,異常信息:{} ======",e.getMessage());
}
}
}
3.4.2.2 消費消息手動確認配置類
- 配置消費者的數量 setConcurrentConsumers(2);
- 最大併發消費者數量 setMaxConcurrentConsumers(5);
- 消費消息確認機製為手動 setAcknowledgeMode(AcknowledgeMode.MANUAL);
- 設置監聽消息隊列的名稱,支持多個隊列setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);
- 設置消息手動確認監聽器 setMessageListener(rabbitMQConsumerManualAckListener);
/**
* Created On : 2/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RabbitMQ 消費消息手動確認配置類
*/
@Configuration
public class RabbitMQConsumeManualAckConfig {
@Autowired
private RabbitMQConsumerManualAckListener rabbitMQConsumerManualAckListener;
/**
* @author : huayu
* @date : 2/11/2022
* @param : [connectionFactory]
* @return : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
* @description : 自定義消息監聽器工程對象
*/
@Bean
public SimpleMessageListenerContainer simpleBrokerMessageHandler(ConnectionFactory connectionFactory){
//初始化消息監聽容器的工程對象
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//初始化併發消費者的數量,比如是2,代表同時會有 兩個消費者 消費消息
// ,投遞標識可能會相同
container.setConcurrentConsumers(2);
//設置最大的併發消費者數量,數量不能低於初始化併發消費者數量
//可以動態的設定當前容器的消費者數量,可以實現動態增加和減少消費者的演算法在 SimpleMessageListenerContainer類中實現
container.setMaxConcurrentConsumers(5);
//底層動態實現消費者數量的增加減少原理
// 有consumer已連續十個周期(consecutiveActiveTrigger)處於活動狀態,並且自啟動後最後一個consumer運行至少經過了10秒鐘,則將啟動新的consumer。
// private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;
// 停止消費者演算法的時間間隔
// 有consumer已連續10個周期(consecutiveIdleTrigger)連續空閑狀態,並且上一個consumer至少在60秒之前停止,那麼該consumer將停止
// private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;
// 預設連續活動10個周期
// private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
// 預設連續空閑10個周期
// private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
//預設的消費消息確認機制是自動,需要改為手動
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//設置監聽消息隊列的名稱,支持多個隊列(隊列名1,隊列名2...),註意前提是指定的隊列必須是存在的
//監聽 直連模式的 RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96 隊列
container.setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);
//監聽 扇形模式的
//RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE 隊列
//和 RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO 隊列
// container.setQueueNames(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE
// ,RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO);
//指定消息確認的處理類,會同時產生多個消費者,參數是上面設置的,
//註意之前使用直連模式,消息消費者,要註釋掉,防止同類型的監聽器,處理同一隊列
//如果不是被當前消息確認的處理類消費(使用註解@RabbitListener),會導致消息不執行手動處理
container.setMessageListener(rabbitMQConsumerManualAckListener);
// 返回消息監聽容器工廠對象
return container;
}
}
3.4.2.3 請求方法
//======================
/**
* @author : huayu
* @date : 3/11/2022
* @param : [ackMsg]
* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
* @description : 測試 消費者手動確認
*/
@GetMapping("/consumeAckManual")
public RequestResult<String> testRabbitMQConsumeAckManual(@RequestParam String ackMsg){
log.info("------- 測試Ack 手動 確認,發送消息 -------");
//消息手動確認
//模擬發送直連消息
//測試1,2
rabbitMQDirectProducer.sendDirectMsg2DirectExchange(
RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
,JSON.toJSONString(ackMsg));
return ResultBuildUtil.success("使用直連模式 手動消費確認-消息確認成功");
//測試3
//模擬發送扇形消息
// rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(
// RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96
// ,null
// ,JSON.toJSONString(ackMsg));
//
//
// return ResultBuildUtil.success("使用扇形模式 手動消費確認-消息確認成功");
}
3.4.2.4 請求測試
3.4.2.4.1 模擬發送直連消息併成功確認
發送請求:
請求結果:
3.4.2.4.2 模擬發送直連消息,拋出異常,重回隊列
發送請求:
代碼重點:
請求結果:
3.4.2.4.3 模擬發送扇形消息併成功確認
發送請求:
請求結果: