1. 認識RabbitMQ 1.1介紹RabbitMQ RabbitMQ 是開源的高級消息隊列協議(Advanced Message Queueing Protocol, AMQP) 的實現,用Erlang 語言編寫,支持多種客戶端。 RabbitMQ是目前應用相當廣泛的消息中間件(其他同類的消息處 ...
1. 認識RabbitMQ
1.1介紹RabbitMQ
RabbitMQ 是開源的高級消息隊列協議(Advanced Message Queueing Protocol, AMQP) 的實現,用Erlang 語言編寫,支持多種客戶端。
RabbitMQ是目前應用相當廣泛的消息中間件(其他同類的消息處理中間件有ActiveMQ、Kafka等)。在企業級應用、微服務應用中,RabbitMQ擔當著十分重要的角色。例如,在業務服務模塊中解耦、非同步通信、高併發限流、超時業務、數據延遲處理等都可以使用RabbitMQ。
RabbitMQ的處理流程如圖12-1所示
圖 12-1
1.2 使用場景
(1)推送通知
“發佈/訂閱”是RabbitMQ的重要功能。可以用"發佈/訂閱"功能來實現通知功能。消費者 (consumer) 一直監聽RabbitMQ的數據。如果RabbitMQ有數據,則消費者會按照“先進先岀” 規則逐條進行消費。而生產者(producer)只需要將數據存入RabbitMQ。這樣既降低了不同系統之間的耦合度,也確保了消息通知的及時性,且不影響系統的性能。
"發佈/訂閱”功能支持三種模式:一對一、一對多、廣播。這三種模式都可以根據規則選擇分發的對象。眾多消費者(consumer)可以根據規則選擇是否接收這些數據,擴展性非常強。
(2)非同步任務
後臺系統接到任務後,將其分解成多個小任務,只要分別完成這些小任務,整個任務便可以完成。但是,如果某個小任務很費時,且延遲執行並不影響整個任務,則可以將該任務放入消息隊列中去處理,以便加快請求響應時間。
如果用戶註冊會員時有一項需求一發送驗證郵件或簡訊驗證碼以完成驗證,則可以使用 RabbitMQ的消息隊列來實現,這樣可以及時提醒用戶操作已經成功。等待收到郵件或驗證碼,然後進行相應的確認,即完成驗證。
(3)多平臺應用的通信
RabbitMQ可以用於不同開發語言開發的應用間的通信(如Java開發的應用程式需要與C++ 開發的應用程式進行通信),實現企業應用集成。由於消息隊列是無關平臺和語言的,而且語義上也不是函數調用,因此RabbitMQ適合作為多個應用之間的松耦合的介面,且不需要發送方和接收方同時線上。
不同語言的軟體解耦,可以最大限度地減少程式之間的相互依賴,提高系統可用性及可擴展性, 同時還增加了消息的可靠傳輸和事務管理功能。
RabbitMQ提供兩種事務模式:
- AMQP事務模式。
- Confirm事務模式。
(4)消息延遲
利用RabbitMQ消息隊列演出功能,可以實現訂單、支付過期定時取消功能。因為延遲隊列存儲延時消息,所以,當消息被髮送以後,消費者不是立即拿到消息,而是等待指定時間後才拿到這個消息進行消費。
當然,死信、計時器、定時任務也可以實現延退或定時功能,但是需要開發者去處理。
要實現消息隊列延遲功能,一般釆用官方提供的插件“rabbitmq_delayed_message_ exchange"來實現,但RabbitMQ版本必須是3.5.8版本以上才支持該插件。如果低於這個版本, 則可以利用“死信”來完成。
(5)遠程過程調用
在實際的應用場景中,有時需要一些同步處理,以等待伺服器端將消息處理完成後再進行下一 步處理,這相當於RPC ( Remote Procedure Call,遠程過程調用)。RabbitMQ也支持RPC。
1.3 特性
RabbitMQ具有以下特性。
- 信息確認:RabbitMQ有以下兩種應答模式。
- 自動應答:當RabbitMQ把消息發送到接收端,接收端從隊列接收消息時,會自動發送應答消息給伺服器端。
- 手動應答:需要開發人員手動調用方法告訴服務端已經收到。
- 隊列持久化:隊列可以被持久化,但是否為持久化,要看持久化設置。
- 信息持久化:設置properties.DeliveryMode值即可。預設值為1,代表不是持久的,2代表持久化。
- 消息拒收:接收端可以拒收消息,而且在發送"reject”命令時,可以選擇是否要把拒收的消息重新放回隊列中。
- 消息的QoS:在接收端設置的。發送端沒有任何變化,接收端的代碼也比較簡單,只需要加上如 "channel.BasicQos(0,1, false);"的代碼即可。
如果實際場景中對個別消息的丟失不是很敏感,則選用自動應答比較理想。
如果是一個消息都不能丟的場景,則需要選用手動應答,在正確處理完以後才應答。 如果選擇了自動應答,那消息重發這個功能就沒有了。
2. RabbitMQ的基本概念
2.1 生產者、消費者和代理
RabbitMQ的角色有以下三種。
- 生產者:消息的創建者,負責創建和推送數據到消息伺服器。
- 消費者:消息的接收方,用於處理數據和確認消息。
- 代理:RabbitMQ本身,扮演“快遞”的角色,本身不生產消息。
生產者和消費者並不屬於RabbitMQ。RabbitMQ只是為生產者和消費者提供發送和接收消息的API。
2.2 消息隊列
Queue (隊列)是RabbitMQ的內部對象,用於存儲生產者的消息直到發送給消費者,也是消費者接收消息的地方。RabbitMQ中的消息也都只能存儲在Queue中,多個消費者可以訂閱同一 個 Queue。
Queue有以下一些重要的屬性。
- 持久性:如果啟用,則隊列將會在消息協商器(broker)重啟前都有效。
- 自動刪除:如果啟用,則隊列將會在所有的消費者停止使用之後自動刪除掉。
- 惰性:如果沒有聲明隊列,則應用程式調用隊列時會導致異常,並不會主動聲明。
- 排他性:如果啟用,則聲明它的消費者才能使用。
2.3 交換機
Exchange (交換機)用於接收、分配消息。生產者先要指定一個“routing key”,然後將消息發送到交換機。這個"routing key"需要與"Exchange Type"及"binding key"聯合使用才能最終生效,然後,交換機將消息路由到一個或多個Queue中,或丟棄。
在虛擬主機的消息協商器(broker)中,每個Exchange都有睢一的名字。
Exchange包含4種類型:direct、topic、fanout、headers。不同的類型代表綁定到隊列的行為不同。
(1)direct
direct類型的行為是“先匹配,再投送”。在綁定隊列時會設定一個routing key,只有在消息的routing key與隊列匹配時,消息才會被交換機投送到綁定的隊列中。允許一個隊列通過一個固定的routing key (通常是隊列的名字)進行綁定。Direct交換機將消息根據其routing key屬性投遞到包含對應key屬性的綁定器上。
Direct Exchange是RabbitMQ預設的交換機模式,也是最簡單的模式。它根據routing key 全文匹配去尋找隊列。
(2)topic
按規則轉發消息(最靈活)。主題交換機(topic exchange )轉發消息主要根據通配符。隊列和交換機的綁定會定義一種路由模式,通配符就要在這種路由模式和路由鍵之間匹配後,交換機才能轉發消息。
在這種交換機模式下,路由鍵必須是一串字元,用”.“隔開。
路由模式必須包含一個星號“*”,主要用於匹配路由鍵指定位置的一個單詞。
topic還支持消息的routing key,用”*“或”#“的模式進行綁定。“*”匹配一個單詞,“#” 匹配0個或多個單詞。例如 “/binding key *.user.#”匹配 routing key 為“cn.user“和“us.user.db”, 但是不匹配“user.hello”
(3)headers
它根據應用程式消息的特定屬性進行匹配,可以在binding key中標記消息為可選或必選。在隊列與交換機綁定時,會設定一組鍵值對規則。消息中也包括一組鍵值對(headers屬性),當這些鍵值対中有一對,或全部匹配時,消息被投送到對應隊列。
(4)fanout
消息廣播的模式,即將消息廣播到所有綁定到它的隊列中,而不考慮routing key的值(不管路由鍵或是路由模式)。如果配置了 routing key。則routing key依然會被忽略。
2.4 綁定
RabbitMQ中通過綁定(binding ),將Exchange與Queue關聯起來。這樣 RabbitMQ 就知道如何正確地將消息路由到指定的 Queue 了。
在綁定 Exchange與 Queue時,—般會指定一個binding key。消賽者將消息發送給Exchange 時,一般會指定一個routing key。如果 binding key 與 routing key 相匹配,則消息將會被路由到對應的Queue中。
綁定是生產者和消費者消息傳遞的連接。生產者發送消息到 Exchange,消費者從Queue接收消息,都是根據綁定來執行的。
2.5 通道
有些應用需要與AMQP代理建立多個連接。但同時開啟多個TCP ( Transmission Control Protocol,傳輸控制協議)連接會消耗過多的系統資源,並使得防火堵的配置變得更加困難。“AMQP 0-9-1“協議用通道(channel)來處理多連接,可以把通道理解成“共用一個TCP連接的多個輕量化連接”。
一個特定通道上的通信與其他通道上的通信是完全隔離的,因此,每個AMQP方法都需要攜帶一個通道號。這樣客戶端就可以指定此方法是為哪個通道準備的。
2.6 消息確認
消息確認(message acknowledgement )是指:當一個消息從隊列中投遞給消費者 (consumer)後,消費者會通知一下消息代理(broker),這個過程可以是自動的,也可以由處理消息的應用的開發者執行。當“消息確認”啟用時,消息代理需要收到來自消費者的確認回執後, 才完全將消息從隊列中刪除。
如果消息無法被成功路由,或被返給發送者並被丟棄,或消息代理執行了延期操作,則消息會 被放入一個“死信”隊列中。此時,消息發送者可以選擇某些參數來處理這些特殊情況。
3. RabbitMQ的六種工作模式
3.1 簡單模式
生產者把消息放入隊列,消費者獲得消息,如圖12-2所示。這個模式只有一個消費者、一個生產者、一個隊列,只需要配置主機參數,其他參數使用預設值即可通信。
圖 12-2
3.2 工作隊列模式
這種模式出現了多個消費者,如圖12-3所示。為了保證消費者之間的負載均衡和同步,需要在消息隊列之間加上同步功能。
工作隊列(任務隊列)背後的主要思想是:避免立即執行資源密集型任務(耗時),以便下一個任務執行時不用等待它完成。工作隊列將任務封裝為消息並將其發送到隊列中。
圖 12-3
3.3 交換機模式
實際上,前兩種模式也使用了交換機,只是使用了採用預設設置的交換機。交換機參數是可以配置的,如果消息配置的交換機參數和RabbitMQ隊列綁定(binding )的交換機名稱相同,則轉發,否則丟棄,如圖12-4所示。
圖 12-4
3.4 Routing 轉發模式
交換機要配置為direct類型,轉發的規則變為檢查隊列的routing key值。如果routing key 值相同,則轉發,否則丟棄,如圖12-5所示。
圖 12-5
3.5 主題轉發模式
這種模式下交換機要配置為topic類型,routing key配置失效。發送到主題交換機的信息, 不能是任意routing key,它必須是一個單詞的列表,用逗號分隔。特點是可以模糊匹配,匹配規則為:*(星號)可以代替一個詞;#(#號)可以代替零個或更多的單詞,其模式情況如圖12-6 所示。
圖 12-6
3.6 RPC 模式
這種模式主要使用在遠程調用的場景下。如果一個應用程式需要另外一個應用程式來最終返回運行結果,那這個過程可能是比較耗時的操作,使用RPC模式是最合適的。其模式情況如圖12-7 所示。
圖 12-7
6種工作模式的主要特點如下。
- 簡單模式:只有一個生產者,一個消費者
- 工作隊列模式:一個生產者,多個消費者,每個消費者獲取到的消息唯一。
- 訂閱模式:一個生產者發送的消息會被多個消費者獲取。
- 路由模式:發送消息到交換機,並且要指定路由key,消費者在將隊列綁定到交換機時需要指定路由key。
- topic模式:根據主題進行匹配,此時隊列需要綁定在一個模式上,“#”匹配一個詞或多個詞,”*“只匹配一個詞。
4. 認識AmqpTemplate介面
Spring AMQP提供了操作AMQP協議的模板類AmqpTemplate,用於發送和接收消息, 它定義發送和接收消息等操作,還提供了 RabbitTemplate用於實現AmqpTemplate介面, 而且還提供了錯誤拋岀類AmqpException,RabbitTemplate支持消息的確認與返回(預設禁用)
4.1 發送消息
(1)send方法
AmqpTemplate模板提供了 send方法用來發送消息,它有以下3個重載:
- void send(Message message) throws AmqpException
- void send(String routingKey, Message message) throws AmqpException
- void send(String exchange, String routingKey, Message message)throws AmqpException
(2)convertAndSend 方法
AmqpTemplate模板還提供了 convertAndSend方法用來發送消息。convertAndSend 方法相當於簡化了的send方法,可以自動處理消息的序列化。下麵通過兩個功能一樣的代碼來比較兩者的區別:
@Test
void contextLoads() {
Message message = MessageBuilder.withBody("body content".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("1")
.setHeader("header","header")
.build();
amqpTemplate.send("QueueHello",message);
}
@Test
void send(){
amqpTemplate.convertAndSend("QueueHello","body content");
}
上面代碼和下麵代碼的效果一樣。
4.2 接收消息
接收消息可以有兩種方式。
- 直接去查詢獲取消息,即調用receive方法。如果該方法沒有獲得消息,則直接返回null, 因為receive方法不阻塞。
- 非同步接收,通過註冊一個Listener (監聽器)來實現消息接收。接收消息需要指定隊列 (Queue),或設置預設的隊列。
AmqpTemplate提供的直接獲得消息的方法是receive
另外,AmqpTemplate也提供了直接接收POJO (代替消息對象)的方法receiveAndConvert,並提供了各種的 Messageconverter 用來處理返回的Object (對象)。
從 Spring-Rabbit 1.3 版本開始,AmqpTemplate 也提供了 receiveAndReply 方法來非同步接收、處理及回覆消息。
4.3 非同步接收消息
Spring AMQP也提供了多種不同的方式來實現非同步接收消息,比如常用的通過 MessageListener (消息監聽器)的方式來實現。
從Spring-rabbit 1.4版本開始,可使用註解@RabbitListener來非同步接收消息,它更為簡便。 使用方法見以下代碼:
@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
@RabbitHandler
public void process(User user){
System.out.println("Receiver object"+user);
}
}
5. 在Spring Boot中集成RabbitMQ
5.1 安裝RabbitMQ
RabbitMQ是用Erlang語言開發的。所以,需要先安裝Erlang環境,再安裝RabbitMQ。
(1)下載 Erlang 環境和 RabbitMQ
到Erlang官網下載Erlang環境。
到 RabbitMQ 官網下載 RabbitMQ。
(2)安裝
下載完成後,先單擊Erlang安裝文件進行安裝,然後單擊RabbitMQ安裝文件進行安裝。在安裝過程中,按照提示一步一步操作即可。在RabbitMQ成功安裝後,會自動啟動伺服器。
(3)開啟網頁管理界面
雖然可以在命令行管理RabbitMQ,但稍微麻煩。RabbitMQ提供了可視化的網頁管理平臺, 可以使用“rabbitmq-plugms.bat enable rabbitmq_management”命令開啟網頁管理界面。
5.2 界面化管理RabbitMQ
(1)概覽
在安裝配置完成後,開啟網頁管理,然後可以通過"http://localhost:15672"進行查看和管理, 輸入預設的用戶名"guest"和密碼"guest"進行登錄。RabbitMQ的後臺界面如圖12-8所示。
圖 12-8
(2)管理交換機
進入交換機管理頁面後,單擊“Add exchange (添加交換機)”按鈕,彈出添加界面,可以看到列出了 RabbitMQ 預設的4種類型,由於筆者已經添加了消息延遲插件,所以會有 “x-delayed-message”類型,如圖 12-9 所示。
圖 12-9
(3)管理管理員
消息中間件的安全配置也是必不可少的。在RabbitMQ中,可以通過命令行創建用戶、設置密碼、綁定角色。常用的命令如下。
- rabbitmqctl.bat list_users:查看現有用戶。
- rabbitmqctl.bat add_user username password:新増用戶。新増的用戶只有用戶名、密碼,沒有管理員、超級管理員等角色。
- rabbitmqctl.bat set_user_tags username administrator:設置角色。角色分為 none、 management、policymaker、monitoring、administrator。
- rabbitmqctl change_password userName newPassword:修改密碼命令。
- rabbitmqctl.bat delete_user username:刪除用戶命令。
還可以在開啟RabbitMQ網頁管理界面之後,用可視化界面進行操作,如圖12-10所示。其 中“Tags”是管理員類型。
在創建用戶後,需要指定用戶訪問一個虛擬機(如圖12-11所示),並且該用戶只能訪問該虛擬機下的隊列和交換機。如果沒有指定,則預設是"No access",而不是“/"(所有)。在一個 RabbitMQ伺服器上可以運行多個vhost,以適應不同的業務需要。這樣做既可以滿足許可權配置的要求,也可以避免不同業務之間隊列、交換機的命名衝突問題,因為不同vhost之間是隔離的,許可權設置可以細化到主題。
圖 12-10 管理管理員
圖 12-11 設置許可權
5.3 在 Spring Boot 中配置 RabbitMQ
(1)添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置 application.properties 文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
6. 在SpringBoot中實現RabbitMQ的四種發送/接收模式
6.1 實例:實現發送和接收隊列
(1)配置隊列
@Bean
public Queue queue(){
return new Queue("Queue1");
}
(2)創建接收者
註意,發送者和接收者的 Queue 名稱必須一致,否則不能接收,見以下代碼:
@Component
//監聽QueueHello的消息隊列
@RabbitListener(queues = "Queue1")
public class ReceiveA {
//@RabbitHandler來實現具體消費
@RabbitHandler
public void QueueReceiver(String Queue1){
System.out.println("Receive A:"+Queue1);
}
}
(3)創建發送者
@Component
public class SendA {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String content){
System.out.println("Sender:"+content);
//使用AmqpTemplate將消息發送到消息隊列中
rabbitTemplate.convertAndSend("Queue1",content);
}
}
(4)測試發送和接收情況
@Test
void QueueSend(){
int i = 2;
for (int j = 0; j < i; j++) {
String msg = "Queue1 msg"+j+new Date();
try {
sendA.send(msg);
}catch (Exception e){
e.printStackTrace();
}
}
}
運行測試,可以看到控制台輸岀如下結果:
Receive A:Queue1 msg0Sun Aug 14 11:16:45 CST 2022
Receive A:Queue1 msg1Sun Aug 14 11:16:45 CST 2022
上述信息表示發送成功,且接收成功。
如果是多個接收者,則會均勻地將消息發送到 N 個接收者中,並不是全部發送一遍, 也會和"一對多” 一樣,接收端仍然會均勻地接收到消息。
6.2 實現發送和接收對象
(1)編輯配置類
package com.intehel.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
@Component
public class RabbitMQConfig {
@Bean
public Queue objectQueue(){
return new Queue("object");
}
}
(2)編寫接收類
@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
@RabbitHandler
public void process(User user){
System.out.println("Receiver object"+user);
}
}
(3)編寫發送類
@Component
public class ObjectSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(User user){
System.out.println("Sender object"+user);
this.amqpTemplate.convertAndSend("object",user);
}
}
(4)編寫測試
@Test
void objectSend(){
try {
User user = new User();
user.setId(1);
user.setMsg("username");
objectSender.send(user);
}catch (Exception e){
e.printStackTrace();
}
}
運行測試,可以看到控制台輸岀如下結果:
Sender objectUser(id=1, msg=username)
Receiver objectUser(id=1, msg=username)
6.3 實例:實現用接收器接收多個主題
(1)配置topic
@Configuration
@Component
public class RabbitMQConfig {
@Bean
public Queue topicA(){
return new Queue("topic.a");
}
@Bean
public Queue topicB(){
return new Queue("topic.b");
}
@Bean
TopicExchange exchange(){
return new TopicExchange("topicExchange");
}
@Bean
Binding bingTopicA(Queue topicA,TopicExchange exchange){
return BindingBuilder.bind(topicA).to(exchange).with("topic.a");
}
@Bean
Binding bingTopicB(Queue topicB,TopicExchange exchange){
return BindingBuilder.bind(topicB).to(exchange).with("topic.#");
}
}
(2)編寫接收者A
@Component
@RabbitListener(queues = "topic.a")
public class TopicReceiveA {
@RabbitHandler
public void process(String message) {
System.out.println("topicReceiveA: " + message);
}
}
(3)編寫接收者B
@Component
@RabbitListener(queues = "topic.b")
public class TopicReceiveB {
@RabbitHandler
public void process(String message) {
System.out.println("topicReceiveB: " + message);
}
}
(4)編寫發送者
@Component
public class TopicSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "topic";
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("topicExchange","topic.1",context);
}
public void sendToA(){
String context = "topicToA";
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("topicExchange","topic.a",context);
}
public void sendToB(){
String context = "topicToB";
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("topicExchange","topic.b",context);
}
}
(5)編寫測試
@Test
public void topic(){
topicSender.send();
}
@Test
public void topicA(){
topicSender.sendToA();
}
@Test
public void topicB(){
topicSender.sendToB();
}
6.4 實現廣播模式
(1)配置fanout
@Configuration
@Component
public class RabbitMQConfig {
@Bean
public Queue fanoutA(){
return new Queue("fanout.A");
}
@Bean
public Queue fanoutB(){
return new Queue("fanout.B");
}
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bingFanoutA(Queue fanoutA,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutA).to(fanoutExchange);
}
@Bean
Binding bingFanoutB(Queue fanoutB,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutB).to(fanoutExchange);
}
}
(2)編寫發送者
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "Fanout";
System.out.println("Sender: " + context);
this.amqpTemplate.convertAndSend("fanoutExchange", "",context);
}
}
(3)編寫接收者A
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiveA {
@RabbitHandler
public void process(String message){
System.out.println("fanout ReceiveA: " + message);
}
}
(4)編寫接收者B
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiveB {
@RabbitHandler
public void process(String message){
System.out.println("fanout ReceiveB: " + message);
}
}
(5)編寫測試
@SpringBootTest
public class FanoutSendControllerTest {
@Autowired
private FanoutSender sender;
public void fanoutSend(){
sender.send();
}
}
運行測試,可以看到控制台輸出如下結果:
fanout ReceiveB: Fanout
fanout ReceiveA: Fanout
6.5 實例:實現消息隊列延遲功能
要實現這個功能,一般使用RabbitMQ的消息隊列延遲功能,即採用官方提供的插件 "rabbitmq_delayed_message_exchange”來實現。但 RabbitMQ 版本必須是 3.5.8 以上才支持該插件,否則得用其“死信”功能。
(1)安裝延遲插件
用rabbitmq-plugins list命令可以查看安裝的插件。如果沒有,則直接訪問官網進行下載,下載完成後,將其解壓到RabbitMQ的plugins目錄。
然後執行下麵的命令進行安裝:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
(2)配置交換機
@Bean
public Queue queueDelay(){
return new Queue("delay_queue_1");
}
@Bean
public CustomExchange delayExchange(){
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-delayed-type","direct");
return new CustomExchange("delayed_exchange","x-delayed-message",true,false,args);
}
@Bean
Binding bingDelayB(Queue queueDelay,CustomExchange delayExchange){
return BindingBuilder.bind(queueDelay).to(delayExchange).with("delay_queue_1").noargs();
}
這裡要使用 CustomExchange,而不是 DirectExchange。CustomExchange 的類型必須是 x-delayed-message
(3)實現消息發送
這裡設置消息延遲5s
@Service
public class CustomSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String queueName, String msg){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息發送時間:"+sdf.format(new Date()));
rabbitTemplate.convertAndSend("delayed_exchange", queueName, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//消息延遲5s
message.getMessageProperties().setHeader("x-delay",5000);
return message;
}
});
}
}
(4)實現消息接收
@Component
public class CustomReceiver {
@RabbitListener(queues = "delay_queue_1")
public void receive(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(new Date()));
System.out.println("Received: 執行取消訂單" + msg);
}
}
(5)測試發送延遲消息
@SpringBootTest
public class FanoutSendControllerTest {
@Autowired
private CustomSender customSender;
@Test
public void send(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
customSender.sendMsg("delay_queue_1","支付超時,取消訂單通知!");
}
}
運行測試,可以看到控制台輸岀如下結果:
消息發送時間:2022-08-14 14:48:41
2022-08-14 14:48:46
Received: 執行取消訂單支付超時,取消訂單通知!
至此,消息隊列延遲功能成功實現。在rabbitmq_delayed_message_exchange插件產生之前,我們大都是使用“死信”功能來達到延遲隊列的效果。
“死信”在創建Queue(隊列)時,要聲明“死信”隊列。隊列里的消息到一定時間沒被消費, 就會變成死信轉發到死信相應的Exchange或Queue中。
延退消息是Exchange到Queue或其他Exchange的延遲。但如果消息延遲到期了,或消息不能被分配給其他的Exchange或Queue,則消息會被丟棄。