一、AMQP 概述 AMQP(Advanced Message Queuing Protocol),高級消息隊列協議。 簡單回憶一下JMS的消息模型,可能會有助於理解AMQP的消息模型。在JMS中,有三個主要的參與者:消息的生產者、消息的消費者以及在生產者和消費者之間傳遞消息的通道(隊列或主題)。在 ...
一、AMQP 概述
AMQP(Advanced Message Queuing Protocol),高級消息隊列協議。
簡單回憶一下JMS的消息模型,可能會有助於理解AMQP的消息模型。在JMS中,有三個主要的參與者:消息的生產者、消息的消費者以及在生產者和消費者之間傳遞消息的通道(隊列或主題)。在JMS中,通道有助於解耦消息的生產者和消費者,但是這兩者依然會與通道相耦合。與之不同的是,AMQP的生產者並不會直接將消息發佈到隊列中。AMQP在消息的生產者以及傳遞信息的隊列之間引入了一種間接的機制:Exchange。如下圖:
哈哈,筆主從今天開始也要學著自己畫圖了。
來看看 AMQP 消息的通信過程。首先,生產者把消息發給 Exchange,並帶有一個 routing key。其次,Exchange 和 隊列 之間 通過 binging 通信,binging 上也有 一個 routing key,AMQP定義了四種不同類型的Exchange,每一種都有不同的路由演算法,根據Exchange的演算法不同,它可能會使用消息的routing key或參數,並與 binding 的routing key或參數進行對比,來決定是否要將信息放到隊列中。然後,消費者從每個隊列中取出消息。
Exchange 的路由演算法:
- Direct:如果 消息的routing key 與 binding的routing key 直接匹配的話,消息將會路由到該隊列上;
- Topic:如果 消息的routing key 與 binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上;
- Headers:如果 消息參數表中的頭信息和值 都與 bingding參數表中 相匹配,消息將會路由到該隊列上;
- Fanout:不管消息的routing key和參數表的頭信息/值是什麼,消息將會路由到所有隊列上。
AMQP 與 JMS 的區別:
1、AMQP為消息定義了線路層(wire-level protocol)的協議,而JMS所定義的是API規範。JMS的API協議能夠確保所有的實現都能通過通用的API來使用,但是並不能保證某個JMS實現所發送的消息能夠被另外不同的JMS實現所使用。而AMQP的線路層協議規範了消息的格式,消息在生產者和消費者間傳送的時候會遵循這個格式。這樣AMQP在互相協作方面就要優於JMS——它不僅能跨不同的AMQP實現,還能跨語言和平臺。
2、JMS 支持TextMessage、MapMessage 等複雜的消息類型;而AMQP 僅支持 byte[] 消息類型(複雜的類型可序列化後發送),個人認為這也是它能夠跨平臺和跨語言使用的原因之一。
3、由於Exchange 提供的路由演算法,AMQP可以提供多樣化的路由方式來傳遞消息到消息隊列,而 JMS 僅支持 隊列 和 主題/訂閱 方式兩種。
二、Spring 集成 RabbitMQ
RabbitMQ是一個流行的開源消息代理,它實現了AMQP。Spring AMQP為RabbitMQ提供了支持,包括RabbitMQ連接工廠、模板以及Spring配置命名空間。
首先,需要安裝 RabbitMQ,我們可以在 http://www.rabbitmq.com/download.html 上找到安裝指南,具體怎麼安裝,不是這篇博文的重點,請筆友們自行解決。
接下來,讓我們一起來看看,Spring 和 RabbitMQ 的集成:
1、pom 依賴
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.3.RELEASE</version> </dependency>
2、連接工廠 和 admin
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.userName}" password="${rabbitmq.password}"/> <rabbit:admin connection-factory="connectionFactory"/>View Code
admin 元素會自動創建一個RabbitMQ管理組件,它會自動創建隊列、Exchange以及binding
3、聲明隊列、Exchange以及binding
聲明隊列:
<rabbit:queue name="queue1"/> <rabbit:queue name="queue2"/> <rabbit:queue name="queue3"/> <rabbit:queue name="queue4"/> <rabbit:queue name="queue5"/> <rabbit:queue name="queue6"/>View Code
聲明 Exchange 以及 binding:
direct-exchange:
<rabbit:direct-exchange name="directExchange"> <rabbit:bindings> <rabbit:binding key="queue1" queue="queue1"/> <rabbit:binding key="queue2" queue="queue2"/> <rabbit:binding key="queue3" queue="queue3"/> </rabbit:bindings> </rabbit:direct-exchange>View Code
如果消息的routing key 與 routing key 直接匹配的話,消息將會路由到該隊列上。
topic-exchange
<rabbit:topic-exchange name="topicExchange"> <rabbit:bindings> <rabbit:binding pattern="routing.*" queue="queue2"/> <rabbit:binding pattern="routing.*" queue="queue3"/> </rabbit:bindings> </rabbit:topic-exchange>View Code
消息的 routing key 與 binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上。
這個通配符匹配特別坑,賊坑!我本來寫了個 "routing*" ,自以為能匹配 "routingrrr" 這樣的字元,不行!然後我又寫了個"routing?"、"rounting.",預想著能不能匹配單個任意字元,不行!
終於我得出了一個結論,只能使用 "*"(匹配 0 個或任意多個)通配符,並且,並且!"*" 前面一定要有 個 "." ! 太可怕了,不知道我總結的對不對哈!
headers-exchange
<rabbit:headers-exchange name="headersExchange"> <rabbit:bindings> <rabbit:binding queue="queue4" key="betty" value="rubble" /> <rabbit:binding queue="queue5" key="barney" value="rubble" /> </rabbit:bindings> </rabbit:headers-exchange>View Code
消息參數表中的頭信息和值都與bingding參數表中相匹配,消息將會路由到該隊列上。
這個用法比較少用,也比較難用,原因是因為它僅支持 發送 byte[] 的消息類型。
fanout-exchange
<rabbit:fanout-exchange name="fanoutExchange"> <rabbit:bindings> <rabbit:binding queue="queue5"/> <rabbit:binding queue="queue6"/> </rabbit:bindings> </rabbit:fanout-exchange>View Code
這個是最簡單粗暴的匹配規則,不管消息的routing key和參數表的頭信息/值是什麼,消息將會路由到所有隊列上。
4、發送和接收消息
還是Spring的那一套,Spring 為我們提供了一個模板 bean(rabbitTemplate) 來發送和接收消息。其中,像前文提到的 jmsTemplate 那樣,rabbitTemplate 也為我們 提供了 convertAndSend() 方法來自動轉換和發送消息,提供了receiveAndConvret() 方法來接收和自動轉換成對象(消息和對象之間預設的消息轉換器是SimpleMessageConverter,它適用於String、Serializable實例以及位元組數組)。另外,rabbitTemplate 也照常提供了 send() 和 receive() 方法來發送和接收消息,不過貌似僅支持發送位元組數組...
配置 rabbitTemplate:
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="directExchange" routing-key="queue1"/>View Code
下麵僅演示 通配符路由方式 和 header 路由方式 發送和接收消息。其他具體詳細的內容可參考我下麵附上的源碼:
通配符路由方式:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class TopicExchange { @Autowired private RabbitTemplate rabbitTemplate; @Test public void convertAndSend(){ List<String> list = new ArrayList<>(); list.add("java"); list.add("python"); list.add("c++"); rabbitTemplate.convertAndSend("topicExchange","routing.123", list); } @Test public void receiveAndConvert(){ List<String> queue2List =(List) rabbitTemplate.receiveAndConvert("queue2"); printList(queue2List); System.out.println("----------------華麗的分隔符-----------------"); List<String> queue3List =(List) rabbitTemplate.receiveAndConvert("queue3"); printList(queue3List); } private <E> void printList(List<E> list){ if (list != null && list.size() > 0){ for (Object o : list){ System.out.println("-----------------"+ o +"---------------"); } } } }View Code
header 路由方式:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class HeadersExchangeTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void convertAndSend(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("betty", "rubble"); messageProperties.setHeader("fred", "flintstone"); messageProperties.setHeader("barney", "rubble"); String str = new String("Hello RabbitMQ"); Message message = new Message(str.getBytes(), messageProperties); rabbitTemplate.convertAndSend("headersExchange","",message); } @Test public void receiveAndConvert(){ Message queue4 = rabbitTemplate.receive("queue4"); System.out.println("第一個輸出:" + new String(queue4.getBody())); Message queue5 = rabbitTemplate.receive("queue5"); System.out.println("第三個輸出:" + new String(queue5.getBody())); } }View Code
5、定義消息驅動的AMQP POJO
用 receive()和 receiveAndConvert()方法都會立即返回,如果隊列中沒有等待的消息時,將會得到 null。Spring AMQP提供了消息驅動POJO的支持,也就是相當於一個監聽器,監聽某些隊列,當消息到達指定隊列的時候,可以立即調用方法處理該消息。
listener-container 配置:
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3" type="direct"> <rabbit:listener ref="handlerListener" method="handler" queue-names="queue5,queue6"/> </rabbit:listener-container>
其中,ref 指定Spring bean 的 id,method 指定 該bean中處理隊列中消息的方法,queue-names 指定要監聽哪些隊列,隊列之間用 "," 分隔。
三、結語
祝大家五一節快樂!
演示源碼下載鏈接:https://github.com/JMCuixy/SpringMessageRabbitMQ
參考資料:《Spring 實戰第四版》