前言 Rabbitmq是一個開源的消息代理軟體,是AMQP協議的實現。核心作用就是創建消息隊列,非同步發送和接收消息。通常用來在高併發中處理削峰填谷、延遲處理、解耦系統之間的強耦合、處理秒殺訂單。 入門rabbitmq之前主要是想瞭解下秒殺排隊訂單入庫後,非同步通知客戶端秒殺結果。 基礎知識 1、基本概 ...
前言
Rabbitmq是一個開源的消息代理軟體,是AMQP協議的實現。核心作用就是創建消息隊列,非同步發送和接收消息。通常用來在高併發中處理削峰填谷、延遲處理、解耦系統之間的強耦合、處理秒殺訂單。 入門rabbitmq之前主要是想瞭解下秒殺排隊訂單入庫後,非同步通知客戶端秒殺結果。
基礎知識
1、基本概念(角色)
瞭解rabbitmq之前先要瞭解3個基本概念:生產者、消費者、代理(隊列)。 rabbitmq在生產者和代理中間做了一層抽象。這樣消息生產者和隊列就沒有直接聯繫,在中間加入了一層交換器(Exchange)。這樣消息生產者把消息交給交換器,交換器根據路由策略再把消息轉發給對應隊列。
2、消息發送原理
首先要發送消息必須先連接到rabbitmq-server。那怎麼連接和發送消息呢?首先你的應用程式和rabbitmq會創建一個TCP鏈接。一旦TCP鏈接並通過認證。認證就是你試圖連接rabbitmq時伺服器的用戶名和密碼。認證通過,你的應用程式和rabbitmq之間就創建了一條AMQP通道(Channel),後續所有的消息都是基於這個通道完成。
3、為什麼不直接通過TCP直接發送消息
對於操作系統來說創建和銷毀TCP連接是非常昂貴的開銷,而在併發高峰期時再去處理TCP創建與銷毀顯然是不合適的。這就造成了TCP的巨大浪費,而且操作系統每秒創建TCP的能力也是有限的,因此直接通過TCP發送消息會很快遇到瓶頸。
交換器(Exchange)
前面提到rabbitmq在你的應用程式和隊列中間增加了一層代理,代理根據路由策略把消息路由到指定隊列。 交換器分為4類:
- direct(預設)
- headers
- fanout
- topic
1、direct。是交換器的預設實現,根據路由規則匹配上就會把消息投遞到對應隊列。
2、headers。 是一個自定義匹配規則類型,在隊列和交換器綁定時,會設置一組鍵值對,消息中也包含一組鍵值對,當這些鍵值對匹配上,則會投遞消息到對應隊列。
3、fanout。是一種發佈訂閱模式,當你發送一條消息時,交換器會把消息廣播到所有附加到這個交換器的隊列上。 對於fanout來說routingkey是無效的。
4、topic。可以更靈活的匹配自己想訂閱的消息。也是routingkey用處最大的一種。類似我們配置request mapping中的通配符。
安裝rabbitmq
我是本機ubuntu16中安裝,沒有配置軟體源,安裝速度倒還能接收。rabbitmq是erlang開發,所以先安裝erlang,再安裝rabbitmq-server
sudo apt-get install erlang
sudo apt-get install rabbitmq-server
安裝完成後查看運行狀態 systemctl status rabbitmq-server
啟動 service rabbitmq-server start
停止 serivce rabbitmq-server stop
重啟 service rabbitmq-server restart
安裝好rabbitmq後,啟用web客戶端 rabbitmq-plugines enable rabbitmq_management。
啟動後預設使用guest/guest訪問,僅支持localhost訪問: http://localhost:15672。 web站點預設埠15672。 rabbitmq預設埠5672
在SpringBoot中集成rabbitmq
1、配置信息
sprimg.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
2、預設交換器(direct)實現
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectConfig { @Bean public Queue directQueue(){ return new Queue("direct",false); //隊列名字,是否持久化 } @Bean public DirectExchange directExchange(){ return new DirectExchange("direct",false,false);//交換器名稱、是否持久化、是否自動刪除 } @Bean Binding binding(Queue queue, DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("direct"); } }
消息生產者(發送者)
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息發送--生產消息 */ @Component public class Sender { @Autowired AmqpTemplate rabbitmqTemplate; public void send(String message){ System.out.println("發送消息:"+message); rabbitmqTemplate.convertAndSend("direct",message); } }
消息消費者(接收者)
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "direct") public class Receiver { @RabbitHandler public void handler(String message){ System.out.println("接收消息:"+message); } }
OK,來測試下,預設情況下,只能本機訪問,我本地是在ubuntu虛擬機中,我在虛擬機中運行demo
import com.zhangfei.mq.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("/rabbitmq") public class MyRabbitmqController { @Autowired Sender sender; @RequestMapping("/sender") @ResponseBody public String sender(){ System.out.println("send string:hello world"); sender.send("hello world"); return "sending..."; } }
運行結果
參考資料
https://www.cnblogs.com/vipstone/p/9950434.html 【推薦。作者:王磊】這裡基礎概念里有示意圖,可以對rabbit涉及到的基礎概念和流程有一個直觀的認識
https://blog.csdn.net/ztx114/article/details/78410727