代碼地址: https://gitee.com/Aes_yt/middleware-demo/tree/master/rabbitmq 安裝RabbitMq 1. docker拉取鏡像 docker pull rabbitmq:3.9.29-management 2. 創建rabbitmq容器 do ...
代碼地址: https://gitee.com/Aes_yt/middleware-demo/tree/master/rabbitmq
安裝RabbitMq
1. docker拉取鏡像
docker pull rabbitmq:3.9.29-management
2. 創建rabbitmq容器
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9.29-management
3. 訪問地址
http://{ip地址}:15672/,可以看到RabbitMq的管理後臺界面。賬號密碼預設 guest
消息生產和消費
rabbitmq-producer
-
新建module,引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
yaml配置地址
spring: rabbitmq: host: 192.168.67.131 port: 5672 username: guest password: guest virtual-host: /
-
配置交換機
@Configuration public class RabbitMqConfig { /*定義交換機名稱*/ public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange"; @Bean public Exchange userInfoExchange() { return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build(); } }
-
發送消息
@Test public void sendMessage() { String registerMsg = "user register..." + new Date(); // 1. 發送一條註冊消息 rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME, "user.register.user1", registerMsg, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return msg; }); log.info("消息發送完成:{}", registerMsg); String loginMsg = "user login..." + new Date(); // 2. 發送一條登錄消息 rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME, "user.login.user1", loginMsg, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return msg; }); log.info("消息發送完成:{}", loginMsg); }
rabbitmq-consumer
-
新建module,引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
yaml配置地址
spring: rabbitmq: host: 192.168.67.131 port: 5672 username: guest password: guest virtual-host: /
-
配置隊列綁定關係
@Configuration public class RabbitMqConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private Integer port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; /*定義交換機名稱*/ public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange"; /*定義隊列名稱*/ public static final String USER_REGISTER_QUEUE_NAME = "user_register_queue"; public static final String USER_LOGIN_QUEUE_NAME = "user_login_queue"; public static final String USER_REGISTER_ROUTING_KEY = "user.register.#"; public static final String USER_LOGIN_ROUTING_KEY = "user.login.#"; @Bean public Exchange userInfoExchange() { return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build(); } @Bean public Queue userRegisterQueue() { return QueueBuilder.durable(USER_REGISTER_QUEUE_NAME).build(); } @Bean public Queue userLoginQueue() { return QueueBuilder.durable(USER_LOGIN_QUEUE_NAME).build(); } @Bean public Binding userRegisterBinding() { return BindingBuilder.bind(userRegisterQueue()).to(userInfoExchange()).with(USER_REGISTER_ROUTING_KEY).noargs(); } @Bean public Binding userLoginBinding() { return BindingBuilder.bind(userLoginQueue()).to(userInfoExchange()).with(USER_LOGIN_ROUTING_KEY).noargs(); } }
-
監聽器
@Component @Slf4j public class UserInfoListener { @RabbitListener(queues = RabbitMqConfig.USER_REGISTER_QUEUE_NAME) public void userRegister(String msg){ log.info(msg); } @RabbitListener(queues = RabbitMqConfig.USER_LOGIN_QUEUE_NAME) public void userLogin(String msg){ log.info(msg); } }
-
結果
啟動producer項目和consumer項目,producer發送消息,consumer接收到消息:
producer:
2023-07-08 10:26:10.660 INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息發送完成:user register...Sat Jul 08 10:26:10 CST 2023 2023-07-08 10:26:10.663 INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息發送完成:user login...Sat Jul 08 10:26:10 CST 2023
consumer:
2023-07-08 10:26:10.661 INFO 25108 --- [ntContainer#1-1] c.yt.rabbitmq.listener.UserInfoListener : user register...Sat Jul 08 10:26:10 CST 2023 2023-07-08 10:26:10.665 INFO 25108 --- [ntContainer#0-1] c.yt.rabbitmq.listener.UserInfoListener : user login...Sat Jul 08 10:26:10 CST 2023
交換器類型
交換器類型有四種,fanout,topic,direct,headers。
接下來在代碼中創建三種交換器類型,對應的routingKey和queue綁定如表格所示。發送對應消息,看看是否能接收到 [Y/N]。headers類型不演示。
Exchange | ExchangeType | RoutingKey | MessageKey | Queue | Receive |
---|---|---|---|---|---|
fanout_exchange | fanout | fanout.test.key1 fanout.# |
xxx.yyy.zzz | fanout_test_queue1 fanout_test_queue2 |
Y Y |
topic_exchange | topic | topic.test.# topic.# topic.* |
topic.test.key1 |
topic_test_queue1 topic_test_queue2 topic_test_queue3 |
Y Y N |
direct_exchange | direct | direct.test.key1 direct.test.# direct.test.key3 |
direct.test.key1 direct.test.key2 direct.test.key3 |
direct_test_queue1 direct_test_queue2 direct_test_queue3 && direct_test_queue4 |
Y N Y && Y |
direct 的Routingkey是全匹配,通配符不起作用,所以direct_test_queue2沒有接收到消息。
topic 的通配符,*正好匹配一個詞,#可以匹配一個或多個詞,所以topic_test_queue3沒有接收到消息。