消息隊列是大型複雜系統解耦利器。本文根據應用廣泛的消息隊列RabbitMQ,介紹Spring Boot應用程式中隊列中間件的開發和應用。 一、RabbitMQ基礎 1、RabbitMQ簡介 RabbitMQ是Spring所在公司Pivotal自己的產品,是基於AMQP高級隊列協議的消息中間件,採用e ...
消息隊列是大型複雜系統解耦利器。本文根據應用廣泛的消息隊列RabbitMQ,介紹Spring Boot應用程式中隊列中間件的開發和應用。
一、RabbitMQ基礎
1、RabbitMQ簡介
RabbitMQ是Spring所在公司Pivotal自己的產品,是基於AMQP高級隊列協議的消息中間件,採用erlang開發,所以你的RabbitMQ隊列伺服器需要erlang環境。
可以直接官方的說法:RabbitMQ is the most widely deployed open source message broker.言簡意賅,一目瞭然。
2、AMQP
高級消息隊列協議(AMQP)是一個非同步消息傳遞所使用的應用層協議規範。作為線路層協議(AMQP是一個抽象的協議,它不負責處理具體的數據),而不是API(例如Java消息系統JMS),AMQP客戶端能夠無視消息的來源任意發送和接受信息。
AMQP的原始用途只是為金融界提供一個可以彼此協作的消息協議,而現在的目標則是為通用消息隊列架構提供通用構建工具。因此,面向消息的中間件(MOM)系統,例如發佈/訂閱隊列,沒有作為基本元素實現。反而通過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一部分,形成了線上路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如之前提到的發佈/訂閱,隊列,事務以及流數據,並且添加了額外的特性,例如更易於擴展,基於內容的路由。
擴展閱讀:既然有高級的消息協議,必然有簡單的協議,STOMP(Simple (or Streaming) Text Orientated Messaging Protocol),也就是簡單消息文本協議,猛擊這裡
3、MSMQ
這裡附帶介紹一下MSMQ。.NET開發者接觸最多的可能還是這個消息隊列,我知道有兩個以.NET作為主要開發語言的公司都選擇MSMQ來開發公共框架如ESB、日誌組件等。
如果你有.NET下MSMQ(微軟消息隊列)開發和使用經驗,一定不會對隊列常用術語陌生。對比一下,對後面RabbitMQ的學習和理解非常有幫助。
邏輯結構如下:
4、基本術語
安裝好RabbitMQ後,可以啟用插件,打開RabbitMQ自帶的後臺,一圖勝千言,你會看到很多似曾相識的技術術語和名詞。
當然你也可以參考這裡的圖片示例一個一個驗證下麵的名詞。
(1)Broker:消息隊列伺服器實體。
(2)Producer:生產者。
(3)Consumer:消費者。
(4)Queue(隊列):消息隊列載體,每個消息都會被投入到一個或多個隊列。Queue是 RabbitMQ 的內部對象,用於存儲消息;消費者Consumer就是通過訂閱隊列來獲取消息的,RabbitMQ 中的消息都只能存儲在 Queue 中,生產者Producer生產消息並最終投遞到 Queue 中,消費者可以從 Queue 中獲取消息並消費;多個消費者可以訂閱同一個 Queue。
(5)Connection(連接):Producer 和 Consumer 通過TCP 連接到 RabbitMQ Server。
(6)Channel(通道):基於 Connection創建,數據流動都是在 Channel 中進行。
(7)Exchange(交換器):生產者將消息發送到 Exchange(交換器),由Exchange 將消息路由到一個或多個 Queue 中(或者丟棄);Exchange 並不存儲消息;Exchange Types 常用的有 Fanout、Direct、Topic 和Header四種類型,每種類型對應不同的路由規則:
Direct:完全匹配,消息路由到那些
Routing Key 與 Binding Key 完全匹配的 Queue 中。比如 Routing Key
為mq_cleint-key,只會轉發mq_cleint-key,不會轉發mq_cleint-key.1,也不會轉發mq_cleint-key.1.2。
Topic:模式匹配,Exchange 會把消息發送到一個或者多個滿足通配符規則的 routing-key 的 Queue。其中*表號匹配一個
word,#匹配多個 word 和路徑,路徑之間通過.隔開。如滿足a.*.c的 routing-key
有a.hello.c;滿足#.hello的 routing-key 有a.b.c.hello。
Fanout:忽略匹配,把所有發送到該 Exchange 的消息路由到所有與它綁定 的Queue 中。
Header:也根據規則匹配,相較於Direct和Topic固定地使用RoutingKey ,Headers 則是一個自定義匹配規則的類型。在隊列與交換器綁定時, 會設定一組鍵值對(Key-Value)規則, 消息中也包括一組鍵值對( Headers 屬性), 當這些鍵值對有一對,,或全部匹配時, 消息被投送到對應隊列。
(8)Binding(綁定):是 Exchange(交換器)將消息路由給 Queue 所需遵循的規則。
(9)Routing Key(路由鍵):消息發送給 Exchange(交換器)時,消息將擁有一個路由鍵(預設為空), Exchange(交換器)根據這個路由鍵將消息發送到匹配的隊列中。
(10)Binding Key(綁定鍵):指定當前 Exchange(交換器)下,什麼樣的 Routing Key(路由鍵)會被下派到當前綁定的 Queue 中。
5、應用場景
我們使用一個技術或組件或中間件,必須要非常理解它的適用場景,否則很容易誤用。
RabbitMQ的經典應用場景包括:非同步處理、應用解耦、流量削峰、日誌處理、消息通訊。
已經有很多人總結了這5種場景下的RabbitMQ實際應用。
推薦閱讀:猛擊這裡
到這裡,RabbitMQ基礎知識介紹結束,下麵開始動手實踐。
添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>RabbitMQ
配置RabbitMQ
## RabbitMQ相關配置 spring.application.name=springbootdemo spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=springbootmq spring.rabbitmq.password=123456application.mq.properties
新增RabbitMQConfig類
package com.power.demo.messaging; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ消息隊列配置類 * <p> * 註意:如果已在配置文件中聲明瞭Queue對象,就不用在RabbitMQ的管理員頁面創建隊列(Queue)了 */ @Configuration public class RabbitMQConfig { /** * 聲明接收字元串的隊列 Hello 預設 * * @return */ @Bean public Queue stringQueue() { //boolean isDurable = true;//是否持久化 //boolean isExclusive = false; //僅創建者可以使用的私有隊列,斷開後自動刪除 //boolean isAutoDelete = false; //當所有消費客戶端連接斷開後,是否自動刪除隊列 //Queue queue = new Queue(MQField.HELLO_STRING_QUEUE, isDurable, isExclusive, isAutoDelete); //return queue; //return new Queue(MQField.HELLO_STRING_QUEUE); //預設支持持久化 return QueueBuilder.durable(MQField.HELLO_STRING_QUEUE) //.exclusive() //.autoDelete() .build(); } /** * 聲明接收Goods對象的隊列 Hello 支持持久化 * * @return */ @Bean public Queue goodsQueue() { return QueueBuilder.durable(MQField.HELLO_GOODS_QUEUE).build(); } /** * 聲明WorkQueue隊列 competing consumers pattern,多個消費者不會重覆消費隊列的相同消息 * * @return */ @Bean public Queue workQueue() { return QueueBuilder.durable(MQField.MY_WORKER_QUEUE).build(); } /** * 消息隊列中最常見的模式:發佈訂閱模式 * <p> * 聲明發佈訂閱模式隊列 Publish/Subscribe * <p> * exchange類型包括:direct, topic, headers 和 fanout **/ /*fanout(廣播)隊列相關聲明開始*/ @Bean public Queue fanOutAQueue() { return QueueBuilder.durable(MQField.MY_FANOUTA_QUEUE).build(); } @Bean public Queue fanOutBQueue() { return QueueBuilder.durable(MQField.MY_FANOUTB_QUEUE).build(); } @Bean FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange(MQField.MY_FANOUT_EXCHANGE).build(); //return new FanoutExchange(MQField.MY_FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue fanOutAQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutAQueue).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue fanOutBQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutBQueue).to(fanoutExchange); } /*fanout隊列相關聲明結束*/ /*topic隊列相關聲明開始*/ @Bean public Queue topicAQueue() { return QueueBuilder.durable(MQField.MY_TOPICA_QUEUE).build(); } @Bean public Queue topicBQueue() { return QueueBuilder.durable(MQField.MY_TOPICB_QUEUE).build(); } @Bean TopicExchange topicExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(MQField.MY_TOPIC_EXCHANGE).build(); } //綁定時,註意隊列名稱與上述方法名一致 @Bean Binding bindingTopicAExchangeMessage(Queue topicAQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicAQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYA); } @Bean Binding bindingTopicBExchangeMessages(Queue topicBQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicBQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYB); } /*topic隊列相關聲明結束*/ /*direct隊列相關聲明開始*/ @Bean public Queue directAQueue() { return QueueBuilder.durable(MQField.MY_DIRECTA_QUEUE).build(); } @Bean public Queue directBQueue() { return QueueBuilder.durable(MQField.MY_DIRECTB_QUEUE).build(); } /** * 聲明Direct交換機 支持持久化. * * @return the exchange */ @Bean DirectExchange directExchange() { return (DirectExchange) ExchangeBuilder.directExchange(MQField.MY_DIRECT_EXCHANGE).durable(true).build(); } @Bean Binding bindingDirectAExchangeMessage(Queue directAQueue, DirectExchange directExchange) { return BindingBuilder.bind(directAQueue).to(directExchange).with(MQField.MY_DIRECT_ROUTINGKEYA); } @Bean Binding bindingDirectBExchangeMessage(Queue directBQueue, DirectExchange directExchange) { return BindingBuilder.bind(directBQueue).to(directExchange) //.with(MQField.MY_DIRECT_ROUTINGKEYB) .with(MQField.MY_DIRECT_ROUTINGKEYB); } /*direct隊列相關聲明結束*/ }RabbitMQConfig
RabbitMQConfig我將常用到的模式都配置在裡面了,註釋已經寫得很清楚,在詳細介紹模式的地方會用到這裡定義的隊列、綁定和交換器。
持久化配置
在RabbitMQConfig類中尤其註意這幾個參數,包括是否可持久化durable;僅創建者可以使用的私有隊列,斷開後自動刪除exclusive;當所有消費客戶端連接斷開後,是否自動刪除隊列autoDelete。其中durable和autoDelete對隊列和交換器都可以配置。
RabbitMQ支持的消息的持久化(durable),也就是將數據寫在磁碟上,為了數據安全考慮,絕大多數場景下我們都會選擇持久化,可能記錄一些不是業務必須的日誌稍微例外。
消息隊列持久化包括3個部分:
(1)、隊列持久化,在聲明時指定Queue.durable為1
(2)、交換器持久化,在聲明時指定Exchange.durable為1
(3)、消息持久化,在投遞時指定消息的delivery_mode為2(而1表示非持久化) 參考:這裡
如果Exchange和Queue都是持久化的,那麼它們之間的Binding也是持久化的;如果Exchange和Queue兩者之間有一個持久化,另一個非持久化,就不允許建立綁定。
二、常見模式
在Spring Boot下使用RabbitMQ非常容易,直接調用AmqpTemplate類封裝好的介面即可。
1、hello world
P為生產者,C為消費者,中間紅色框表示消息隊列。生產者P將消息發送到消息隊列Queue,消費者C對消息進行處理。
生產者:
package com.power.demo.messaging.hello; import com.power.demo.entity.vo.GoodsVO; import com.power.demo.messaging.MQField; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; /** * Hello消息生產者 **/ @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息為空"); return isOK; } rabbitTemplate.convertAndSend(MQField.HELLO_STRING_QUEUE, message); isOK = true; System.out.println(String.format("HelloSender發送字元串消息結果:%s", isOK)); return isOK; } public boolean send(GoodsVO goodsVO) throws Exception { boolean isOK = false; rabbitTemplate.convertAndSend(MQField.HELLO_GOODS_QUEUE, goodsVO); isOK = true; System.out.println(String.format("HelloSender發送對象消息結果:%s", isOK)); return isOK; } }HelloSender
消費者:
package com.power.demo.messaging.hello; import com.power.demo.entity.vo.GoodsVO; import com.power.demo.messaging.MQField; import com.power.demo.util.SerializeUtil; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Hello消息消費者 **/ @Component public class HelloReceiver { @RabbitListener(queues = MQField.HELLO_STRING_QUEUE) @RabbitHandler public void process(String message) { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println("HelloReceiver接收到的字元串消息是 => " + message); } @RabbitListener(queues = MQField.HELLO_GOODS_QUEUE) @RabbitHandler public void process(GoodsVO goodsVO) { System.out.println("------ 接收實體對象 ------"); System.out.println("HelloReceiver接收到的實體對象是 => " + SerializeUtil.Serialize(goodsVO)); } }HelloReceiver
這是最簡單的一種模式,這個最簡單示例,可以看到應用場景里的非同步處理的影子。
在Controller中,新增一個介面:
@RequestMapping(value = "/hello/sendmsg", method = RequestMethod.GET) @ApiOperation("簡單字元串消息測試") @ApiImplicitParams({ @ApiImplicitParam(paramType = "query", name = "message", required = true, value = "字元串消息", dataType = "String") }) public String sendMsg(String message) throws Exception { boolean isOK = helloSender.send(message); return String.valueOf(isOK); }sendmsg
按照傳統方式調用RPC介面,通常都是同步等待介面返回,而使用隊列後,消息生產者直接向RabbitMQ伺服器發送一條消息,不需要同步等待這個消息的處理結果。
示例代碼中,消息消費者會刻意等待5秒(Thread.sleep(5000);)後才處理(列印出)消息,但是實際調用這個介面的時候,非常快就返回成功結果了,因為這個發送消息的動作不需要等待消費者消費消息的結果。
發送的消息,除了簡單消息對象如字元串等,示例里你還看到有一個發送商品對象的消息,也就是說明RabbitMQ支持自定義的複雜對象消息。
2、work queues
P為生產者,C1、C2為消費者,中間紅色框表示消息隊列。生產者P將消息發送到消息隊列Queue,消費者C1和C2對消息進行處理。
這種模式比較容易產生誤解的地方是,多個消費者會不會消費隊列里的同一條消息。答案是不會。
官方的說明是因為消費者根據競爭消費模式(competing consumers pattern)分派任務(Distributing tasks among workers (the competing consumers pattern) )。
對於work queues這種模式,同一條消息M1,要麼C1拉取到,要麼C2拉取到,不會出現C1和C2同時拉取到並消費。
當然,這種模式還可以擴展,除了一個生產者,也可以有多個生產者。
生產者:
package com.power.demo.messaging.workqueues; import com.power.demo.messaging.MQField; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class WorkProducerA { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息為空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message); isOK = true; System.out.println(String.format("WorkProducerA發送字元串消息結果:%s", isOK)); return isOK; } }WorkProducerA
相同隊列下另一個生產者:
package com.power.demo.messaging.workqueues; import com.power.demo.messaging.MQField; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class WorkProducerB { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息為空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message); isOK = true; System.out.println(String.format("WorkProducerB發送字元串消息結果:%s", isOK)); return isOK; } }WorkProducerB
消費者:
package com.power.demo.messaging.workqueues; import com.power.demo.messaging.MQField; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.concurrent.atomic.AtomicInteger; @Component public class WorkConsumerA { private static AtomicInteger atomicInteger = new AtomicInteger(); @RabbitListener(queues = MQField.MY_WORKER_QUEUE) @RabbitHandler public void process(String message) throws Exception { int index = atomicInteger.getAndIncrement(); Thread.sleep(2000); System.out.println("WorkConsumerA接收到的字元串消息是 => " + message); System.out.println("WorkConsumerA自增序號 => " + index); } }WorkConsumerA
另一個消費者:
package com.power.demo.messaging.workqueues; import com.power.demo.messaging.MQField; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.concurrent.atomic.AtomicInteger; @Component public class WorkConsumerB { private static AtomicInteger atomicInteger = new AtomicInteger(); @RabbitListener(queues = MQField.MY_WORKER_QUEUE) @RabbitHandler public void process(String message) throws Exception { int index = atomicInteger.getAndIncrement(); Thread.sleep(10); System.out.println("WorkConsumerB接收到的字元串消息是 => " + message); System.out.println("WorkConsumerB自增序號 => " + index); } }View Code
pub/sub
應用最廣泛的發佈/訂閱模式。
官方的說法是:發送多個消息到多個消費者(Sending messages to many consumers at once.)
這個模式和work queues模式最明顯的區別是,隊列Queue前加了一層,多了Exchange(交換器)。
P為生產者,X為交換器,C1、C2為消費者,中間紅色框表示消息隊列。生產者P將消息不是直接發送到隊列Queue,而是發送到交換器X(註意:交換器Exchange並不存儲消息),然後由交換機X發送給兩個隊列,兩個消費者C1和C2各自監聽一個隊列,來消費消息。
根據交換器類型的不同,又可以分為Fanout、Direct和Topic這三種消費方式,Headers方式實際應用不是非常廣泛,本文暫不討論。
3、fanout
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。
(1)可以理解為路由表的模式
(2)這種模式不需要RoutingKey,即使配置了也忽略
(3)這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定
(4)如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄
Fanout廣播模式實現同一個消息被多個消費者消費,而work queues是同一個消息只能有一個消費者(競爭去)消費。
生產者:
package com.power.demo.messaging.pubsub.fanout; import com.power.demo.entity.vo.GoodsVO; import com.power.demo.messaging.MQField; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(GoodsVO goodsVO) throws Exception { boolean isOK = false; if (goodsVO == null) { System.out.println("消息為空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_FANOUT_EXCHANGE, "", goodsVO); isOK = true; System.out.println(String.format("FanoutSender發送對象消息結果:%s", isOK)); return isOK; } }FanoutSender
消費者:
package com.power.demo.messaging.pubsub.fanout; import com.power.demo.entity.vo.GoodsVO; import com.power.demo.messaging.MQField; import com.power.demo.util.SerializeUtil; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutReceiverA { @RabbitListener(queues = MQField.MY_FANOUTA_QUEUE) @RabbitHandler public void process(GoodsVO goodsVO) { System.out.println("FanoutReceiverA接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO)); } }FanoutReceiverA
另一個消費者:
package com.power.demo.messaging.pubsub.fanout; import com.power.demo.entity.vo.GoodsVO; import com.power.demo.messaging.MQField; import com.power.demo.util.SerializeUtil; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutReceiverB { @RabbitListener(queues = MQField.MY_FANOUTB_QUEUE) @RabbitHandler public void process(GoodsVO goodsVO) { System.out.println("FanoutReceiverB接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO)); } }FanoutReceiverB
4、direct
Fanout是1對多以廣播的方式,發送給所有的消費者。
Direct則是創建消息隊列的時候,指定一個BindingKey。當發送者發送消息的時候,指定對應的RoutingKey,當RoutingKey和消息隊列的BindingKey一致的時候,消息將會被髮送到該消息隊列中。
Direct廣播模式最明顯不同於Fanout模式的地方是,消費者可以進行消息過濾,有選擇的進行接收想要消費的消息,也就是隊列綁定關鍵字,發送者將數據根據關鍵字發送到Exchange,Exchange根據關鍵字判定應該將數據發送(路由)到指定隊列。
任何發送到Direct Exchange的消息都會被轉發到RoutingKey中指定的Queue。
(1)消息傳遞時需要一個“RoutingKey”,可以簡單的理解為要發送到的隊列名字
(2)如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄
生產者:
package com.power.demo.messaging.pubsub.direct; import com.power.demo.messaging.MQField; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class DirectSender { @Autowired private AmqpTemplate rabbitTemplate; public boolean sendDirectA(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息為空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYA, message); isOK = true; System.out.println(String.format("DirectSender發送DirectA字元串消息結果:%s", isOK)); return isOK; } public boolean sendDirectB(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息為空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYB, message); isOK = true; System.out.println(String.format("DirectSender發送DirectB字元串消息結果:%s", isOK)); return isOK; } }DirectSender
消費者:
package com.power.demo.messaging.pubsub.direct; import com.power.demo.messaging.MQField; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectReceiverA { @RabbitListener(queues = MQField.MY_DIRECTA_QUEUE) @RabbitHandler public void process(String message) { System.out.println("DirectReceiverA接收到的字元串消息是 => " + message); } }DirectReceiverA
另一個消費者:
package com.power.demo.messaging.pubsub.direct; import com.power.demo.messaging.MQField; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectReceiverB { @RabbitListener(queues = MQField.MY_DIRECTB_QUEUE) @RabbitHandler public void process(String message) { System.out.println("DirectReceiverB接收到的字元串消息是 => " + message); } }DirectReceiverB
5、topic
Topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字元串),而當發送消息的時候,只有指定的RoutingKey和該模式相匹配的時候,消息才會被髮送到該消息隊列中。
任何發送到Topic Exchange的消息都會被轉發到所有關心RoutingKey中指定話題的Queue上
(1)每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RoutingKey),Exchange會將消息轉發到所有關註主題能與RouteKey模糊匹配的隊列
(2)需要RoutingKey,也需要提前綁定Exchange與Queue
(3)在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RoutingKey為”mq.log.error”的消息會被轉發到該隊列)
(4)“#”表示0個或若幹個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但“log.#”能與上述兩者都匹配
(5)如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息
生產者:
package com.power.demo.messaging.pubsub.topic; import com.power.demo.messaging.MQField; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils;