RabbitMQ的工作原理 它的基本結構 組成部分說明如下: Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue。 Exchange:消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。 Queue:消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的 ...
RabbitMQ的工作原理
它的基本結構
組成部分說明如下:
Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue。
Exchange:消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。
Queue:消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的消費方。
Producer:消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ。
Consumer:消息消費者,即消費方客戶端,接收MQ轉發的消息。
Maven舉例配置
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp‐client</artifactId> <version>4.0.3</version><!‐‐此版本與spring boot 1.5.9版本匹配‐‐> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐logging</artifactId> </dependency>
生產者舉例Demo
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //聲明交換機 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重啟之後交換機還在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //聲明QUEUE_INFORM_EMAIL隊列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //聲明QUEUE_INFORM_SMS隊列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
消費者舉例Demo
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //聲明交換機 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重啟之後交換機還在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //聲明QUEUE_INFORM_EMAIL隊列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //聲明QUEUE_INFORM_SMS隊列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
工作模式
RabbitMQ有以下幾種工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
Work queues
work queues與入門程式相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。
應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
測試:
1、使用入門程式,啟動多個消費者。
2、生產者發送多個消息。
結果:
1、一條消息只會被一個消費者接收;
2、rabbit採用輪詢的方式將消息是平均發送給消費者的;
3、消費者在處理完某條消息後,才會收到下一條消息。
Publish/subscribe 發佈訂閱模式
發佈訂閱模式:
1、每個消費者監聽自己的隊列。
2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息
Routin
路由模式:
1、每個消費者監聽自己的隊列,並且設置routingkey。
2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。
這是一種非常靈活的模式,經常被用到
Topics
路由模式:
1、每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。
2、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列。
Header模式
header模式與routing不同的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配
隊列。
案例:
根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種
通知類型都接收的則兩種通知都有效。
生產者Demo:
Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知Demo :
String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");//匹配email通知消費者綁定的header //headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
發送郵件消費者 :
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_email", "email"); //交換機和隊列綁定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); //指定消費隊列 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
RPC
RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的非同步調用,基於Direct交換機實現,流程如下:
1、客戶端即是生產者就是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。
2、服務端監聽RPC請求隊列的消息,收到消息後執行服務端的方法,得到方法返回的結果
3、服務端將RPC方法 的結果發送到RPC響應隊列
4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。