一丶簡介 Fanout Exchange 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每檯子網內的主機都獲得了一份複製的消息。Fanout交換機轉發消息是最快的。 業務場景: 1.訂單服務需要同時向簡訊服務和push服 ...
一丶簡介
Fanout Exchange
不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每檯子網內的主機都獲得了一份複製的消息。Fanout交換機轉發消息是最快的。
業務場景:
1.訂單服務需要同時向簡訊服務和push服務發送,兩個服務都有各自的消息隊列。
2.使用Fanout交換器。
二丶配置文件
同樣的創建了兩個項目,一個作為生產者,一個作為消費者。
生產者配置:
server.port=8883 spring.application.name=hello-world spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #設置交換器名稱 mq.config.exchange=order.fanoutView Code
消費者配置:
server.port=8884 spring.application.name=lesson1 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #設置交換器名稱 mq.config.exchange=order.fanout #簡訊消息服務隊列名稱 mq.config.queue.sms=order.sms #push消息服務隊列名稱 mq.config.queue.push=order.push #log消息服務隊列名稱 mq.config.queue.log=order.logView Code
註:本是要配置兩個消息隊列,但是為了測試fanout交換器是否能夠將消息發送到所有消息隊列(準確的說是配置了路由鍵的隊列和沒有配置路由鍵的隊列)多創建的一個。
三丶編寫生產者
package com.example.amqpfanoutprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:發送消息 */ @Component public class FanoutSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交換器 @Value("${mq.config.exchange}") private String exChange; /** * 發送消息的方法 * @param msg */ public void send(String msg){ //向消息隊列發送消息 //參數1:交換器名稱 //參數2:路由鍵,廣播模式時(fanout交換器)沒有路由鍵使用""空字元串代替 //參數3:消息 this.amqpTemplate.convertAndSend(exChange,"",msg); } }View Code
四丶編寫消費者
簡訊服務類:
package com.ant.amqpfanoutconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:綁定隊列 * @QueueBinding value:綁定隊列的名稱 * exchange:配置交換器 * key:路由鍵(廣播模式時不需要路由鍵,所以不寫) * @Queue : value:配置隊列名稱 * autoDelete:是否是一個可刪除的臨時隊列 * @Exchange value:為交換器起個名稱 * type:指定具體的交換器類型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT) ) ) public class SmsReceiver { /** * 接收消息的方法,採用消息隊列監聽機制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("sms-receiver:"+msg); } }View Code
push服務類:
package com.ant.amqpfanoutconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:綁定隊列 * @QueueBinding value:綁定隊列的名稱 * exchange:配置交換器 * key:路由鍵(廣播模式時不需要路由鍵,所以不寫) * @Queue : value:配置隊列名稱 * autoDelete:是否是一個可刪除的臨時隊列 * @Exchange value:為交換器起個名稱 * type:指定具體的交換器類型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.push}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT) ) ) public class PushReceiver { /** * 接收消息的方法,採用消息隊列監聽機制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("push-receiver:"+msg); } }View Code
log服務類:該類是為了測試配置了路由鍵的消息隊列和沒配置路由鍵的消息隊列是否都能接收到fanout交換器發送的消息。
package com.ant.amqpfanoutconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:綁定隊列 * @QueueBinding value:綁定隊列的名稱 * exchange:配置交換器 * key:路由鍵(廣播模式時不需要路由鍵,所以不寫)註:消息隊列配置了路由鍵同樣能接收到fanout交換器傳過來的消息。 * @Queue : value:配置隊列名稱 * autoDelete:是否是一個可刪除的臨時隊列 * @Exchange value:為交換器起個名稱 * type:指定具體的交換器類型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.log}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT), key = "user.log.info" ) ) public class LogReceiver { /** * 接收消息的方法,採用消息隊列監聽機制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("log-receiver:"+msg); } }View Code
五丶測試一發
測試類:
package com.example.amqp; import com.example.amqpfanoutprovider.FanoutSender; import com.example.helloworld.HelloworldApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * Author:aijiaxiang * Date:2020/4/26 * Description: */ @RunWith(SpringRunner.class) @SpringBootTest(classes = HelloworldApplication.class) public class QueueTest { @Autowired private FanoutSender fanoutSender; /** * 測試消息隊列 */ @Test public void test1() throws InterruptedException { fanoutSender.send("hello"); } }View Code
OK,看控制台輸出得出,配置了路由鍵的消息隊列和沒配置路由鍵的消息隊列都能接收到fanout交換器發送的消息!
如有不足之處歡迎指正!