RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。消息中間件的工作過程可以用生產者消費者模型來表示... ...
RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。
消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息,而消費者從消息隊列中消費信息.
如果你還沒有安裝rabbitmq的,可以看看這篇《centos安裝MQ》
不說了不說了,來一張圖直截了當的看看MQ工作的具體過程:
開局一張圖 故事全靠編.從上圖可看出,對於消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念,生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,並且當消息隊列收到消息之後,接收消息隊列傳來的消息,並且給予相應的處理.消息隊列常用於分散式系統之間互相信息的傳遞.
v基礎概念
對於RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產者和消息隊列之間產生了隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列.那麼RabitMQ的工作流程如下所示:
關於rabbitmq幾個基礎名詞的介紹:
Broker: 簡單來說就是消息隊列伺服器實體。 Exchange: 消息交換機,它指定消息按什麼規則,路由到哪個隊列。 Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。 Binding: 綁定,它的作用就是把exchange和queue按照路由規則綁定起來。 Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞。 vhost: 虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的許可權分離。 producer: 消息生產者,就是投遞消息的程式。 consumer: 消息消費者,就是接受消息的程式。 channel: 消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。交換機的主要作用是接收相應的消息並且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout:
Direct: 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “demo”,則只有被標記為“demo”的消息才被轉發,不會轉發demo.ooo,也不會轉發test.123,只會轉發demo。 Topic: 轉發信息主要是依據通配符,將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。 Headers: 根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被髮送到匹配的消息隊列中. Fanout: 路由廣播的形式,將會把消息發給綁定它的全部隊列,即便設置了key,也會被忽略.v實戰演練
♛ 2.1 創建MQ註:若是現有工程引入MQ,則添加Maven引用。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
這裡我們延續之前springboot系列博文中的例子hellospringboot,在已有項目中添加mq的Maven引用。
♛ 2.2 application.properties在application.properties文件當中引入RabbitMQ基本的配置信息
# ----- MQ -------- #
spring.rabbitmq.host=192.168.11.108
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
♛ 2.3 添加實體類MyModel
package com.demo.mq.model; import java.io.Serializable; import java.util.UUID; /** * Created by toutou on 2019/1/1. */ public class MyModel implements Serializable { private static final long serialVersionUID = 1L; private UUID id; private String info; public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } }♛ 2.4 添加RabbitConfig
package com.demo.mq.common; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * Created by toutou on 2019/1/1. */ @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String EXCHANGE_B = "my-mq-exchange_B"; public static final String QUEUE_A = "QUEUE_A"; public static final String QUEUE_B = "QUEUE_B"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; public static final String ROUTINGKEY_B = "spring-boot-routingKey_B"; @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * 針對消費者配置 * 1. 設置交換機類型 * 2. 將隊列綁定到交換機 FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 HeadersExchange :通過添加屬性key-value匹配 DirectExchange:按照routingkey分發到指定隊列 TopicExchange:多關鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } /** * 獲取隊列A * @return */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true); //隊列持久 } /** * 獲取隊列B * @return */ @Bean public Queue queueB() { return new Queue(QUEUE_B, true); //隊列持久 } /** * 把交換機,隊列,通過路由關鍵字進行綁定 * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } /** * 一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發到不同的隊列當中去。 * @return */ @Bean public Binding bindingB(){ return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B); } }♛ 2.5 添加消息的生產者MyProducer
package com.demo.mq.producer; import com.demo.mq.common.RabbitConfig; import com.demo.mq.model.MyModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by toutou on 2019/1/1. */ @Component public class MyProducer implements RabbitTemplate.ConfirmCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //由於rabbitTemplate的scope屬性設置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動註入 private RabbitTemplate rabbitTemplate; /** * 構造方法註入rabbitTemplate */ @Autowired public MyProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最後設置的內容 } public void sendMsg(MyModel model) { //把消息放入ROUTINGKEY_A對應的隊列當中去,對應的是隊列A rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, model); } /** * 回調 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回調id:" + correlationData); if (ack) { logger.info("消息成功消費"); } else { logger.info("消息消費失敗:" + cause); } } }♛ 2.6 添加消息的消費者MyReceiver
package com.demo.mq.receiver; import com.demo.mq.common.RabbitConfig; import com.demo.mq.model.MyModel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Created by toutou on 2019/1/1. */ @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MyReceiver { @RabbitHandler public void process(MyModel model) { System.out.println("接收處理隊列A當中的消息: " + model.getInfo()); } }♛ 2.7 添加MyMQController
package com.demo.controller; import com.demo.mq.model.MyModel; import com.demo.mq.producer.MyProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; /** * Created by toutou on 2019/1/1. */ @RestController @Slf4j public class MyMQController { @Autowired MyProducer myProducers; @GetMapping("/mq/producer") public String myProducer(String content){ MyModel model = new MyModel(); model.setId(UUID.randomUUID()); model.setInfo(content); myProducers.sendMsg(model); return "已發送:" + content; } }♛ 2.8 項目整體目錄
♛ 2.9 調試
2.9.1 在頁面中請求http://localhost:8081/mq/producer?content=hello rabbitmq
2.9.2 查看http://ip:15672/#/queues的變化
關於RabbitMQ Management有疑問的,可以看上篇博文。《淺談RabbitMQ Management》。
2.9.3 查看消費者日誌記錄
這樣一個完整的rabbitmq實例就有了。
v源碼地址
https://github.com/toutouge/javademo/tree/master/hellospringboot
作 者:請叫我頭頭哥
出 處:http://www.cnblogs.com/toutou/
關於作者:專註於基礎平臺的項目開發。如有問題或建議,請多多賜教!
版權聲明:本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評論和私信都會在第一時間回覆。也歡迎園子的大大們指正錯誤,共同進步。或者直接私信我
聲援博主:如果您覺得文章對您有幫助,可以點擊文章右下角【推薦】一下。您的鼓勵是作者堅持原創和持續寫作的最大動力!