一丶簡介 Topic Exchange 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。 業務場景: ...
一丶簡介
Topic Exchange
將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
業務場景:
1.日誌伺服器記錄用戶服務、商品服務、訂單服務三個服務。
2.日誌伺服器有三個日誌服務:INFO日誌處理服務、ERROR日誌處理服務、全日誌處理服務。
3.使用Topic交換器處理日誌,匹配規則依次為:*.log.info、*.log.error和*.log.*。
二丶配置文件
還是創建兩個項目,一個作為生產者一個作為消費者。
生產者配置:
server.port=8883 spring.application.name=hello-world spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.thymeleaf.cache=false 設置交換器名稱 mq.config.exchange=log.topicView Code
消費者配置:
server.port=8884 spring.application.name=lesson1 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #設置交換器名稱 mq.config.exchange=log.topic #info隊列名稱 mq.config.queue.info=log.info #error隊列名稱 mq.config.queue.error=log.error #log隊列名稱 mq.config.queue.logs=log.allView Code
三丶創建生產者
1.訂單服務
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:模擬訂單服務發送消息 */ @Component public class OrderSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交換器 @Value("${mq.config.exchange}") private String exChange; /** * 發送消息的方法 * @param msg */ public void send(String msg){ //向消息隊列發送消息 //參數1:隊列名稱 //參數2:消息 // this.amqpTemplate.convertAndSend("hello-queue",msg); //向消息隊列發送消息 //參數1:交換器名稱 //參數2:路由鍵 //參數3:消息 this.amqpTemplate.convertAndSend(exChange,"order.log.debug","order.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.info","order.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.warn","order.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.error","order.log.error-"+msg); } }View Code
2.商品服務
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:模擬商品服務發送消息 */ @Component public class ProductSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交換器 @Value("${mq.config.exchange}") private String exChange; /** * 發送消息的方法 * @param msg */ public void send(String msg){ //向消息隊列發送消息 //參數1:隊列名稱 //參數2:消息 // this.amqpTemplate.convertAndSend("hello-queue",msg); //向消息隊列發送消息 //參數1:交換器名稱 //參數2:路由鍵 //參數3:消息 this.amqpTemplate.convertAndSend(exChange,"product.log.debug","product.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.info","product.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.warn","product.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.error","product.log.error-"+msg); } }View Code
3.用戶服務
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:模擬用戶服務發送消息 */ @Component public class UserSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交換器 @Value("${mq.config.exchange}") private String exChange; /** * 發送消息的方法 * @param msg */ public void send(String msg){ //向消息隊列發送消息 //參數1:隊列名稱 //參數2:消息 // this.amqpTemplate.convertAndSend("hello-queue",msg); //向消息隊列發送消息 //參數1:交換器名稱 //參數2:路由鍵 //參數3:消息 this.amqpTemplate.convertAndSend(exChange,"user.log.debug","user.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.info","user.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.warn","user.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.error","user.log.error-"+msg); } }View Code
四丶創建消費者
1.ERROR日誌處理服務
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:綁定隊列 * @QueueBinding value:綁定隊列的名稱 * exchange:配置交換器 * @Queue : value:配置隊列名稱 * autoDelete:是否是一個可刪除的臨時隊列 * @Exchange value:為交換器起個名稱 * type:指定具體的交換器類型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.error" ) ) public class TopicErrorReceiver { /** * 接收消息的方法,採用消息隊列監聽機制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("error-receiver:"+msg); } }View Code
2.INFO日誌處理服務
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:綁定隊列 * @QueueBinding value:綁定隊列的名稱 * exchange:配置交換器 * @Queue : value:配置隊列名稱 * autoDelete:是否是一個可刪除的臨時隊列 * @Exchange value:為交換器起個名稱 * type:指定具體的交換器類型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.info" ) ) public class TopicInfoReceiver { /** * 接收消息的方法,採用消息隊列監聽機制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("info-receiver:"+msg); } }View Code
3.全日誌處理服務
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * * @RabbitListener bindings:綁定隊列 * @QueueBinding value:綁定隊列的名稱 * exchange:配置交換器 * key:路由鍵 * @Queue : value:配置隊列名稱 * autoDelete:是否是一個可刪除的臨時隊列 * @Exchange value:為交換器起個名稱 * type:指定具體的交換器類型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.*" ) ) public class TopicLogReceiver { /** * 接收消息的方法,採用消息隊列監聽機制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("all-receiver:"+msg); } }View Code
五丶老規矩測試一發
package com.example.amqp; import com.example.ampq.Sender; import com.example.amqptopicprovider.OrderSender; import com.example.amqptopicprovider.ProductSender; import com.example.amqptopicprovider.UserSender; import com.example.helloworld.HelloworldApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * Author:aijiaxiang * Date:2020/4/26 * Description: */ @RunWith(SpringRunner.class) @SpringBootTest(classes = HelloworldApplication.class) public class QueueTest { @Autowired private Sender sender; @Autowired private UserSender userSender; @Autowired private ProductSender productSender; @Autowired private OrderSender orderSender; /** * 測試消息隊列 */ // @Test // public void test1() throws InterruptedException { // while (true){ // Thread.sleep(1000); // sender.send("hello"); // } // // } @Test public void test2(){ userSender.send("usersend"); productSender.send("productsend"); orderSender.send("ordersend"); } }View Code
註:日誌服務處理類中的路由鍵是直接採用了硬編碼的方式進行配置,主要是為了方便查看一目瞭然,但是還是推薦將路由鍵配置在配置文件中,使用 "${}" 這個方式來進行讀取。