# 1.用戶角色配置 ![image.png](https://cdn.nlark.com/yuque/0/2023/png/38371876/1688636206975-acd927ca-1559-4236-85f2-07283999d50b.png#averageHue=%23f3f2f2&cl ...
1.用戶角色配置
自帶的guest/guest 超級管理員
五中不同角色配置:
- 普通管理者(management):僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
- 策略制定者(policymaker):可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息。
- 監控者 (monitoring):登錄管理控制台 查看所有的信息(Rabbit的相關節點的信息,記憶體使用信息,磁碟的情況)
- 超級管理員 administrator:登錄管理控制台 查看所有的信息 對所有用戶的策略進行操作
- 其他:無法登陸管理控制台,通常就是普通的生產者和消費者(僅僅接收信息或者發送消息)。
2.Virtual Hosts配置
每一個 Virtual Hosts相互隔離, 就相當於一個相對獨立的RabbitMQ,===> exchange queue message不是互通
可以理解為:一個Mysql又很多資料庫,每一個資料庫就相當於一個Virtual Hosts
2.1創建Virtual Hosts
2.2許可權的分配
點擊對應Virtual Hostsv 名字,進入配置頁面
該許可權配置參數說明:
- user:用戶名
- configure :一個正則表達式,用戶對符合該正則表達式的所有資源擁有 configure 操作的許可權
- write:一個正則表達式,用戶對符合該正則表達式的所有資源擁有 write 操作的許可權
- read:一個正則表達式,用戶對符合該正則表達式的所有資源擁有 read 操作的許可權
3.入門案例
使用思路
生產者: --->按照JDBC思路
1. 創建連接工廠
2. 設置的rabbitMq的服務的主機 預設localhost
3. 設置的rabbitMq的服務的埠 預設5672
4. 設置的rabbitMq的服務的虛擬主機
5. 設置用戶和密碼
6. 創建連接
7. 創建頻道
8. 聲明隊列
9. 創建消息
10. 發送消息
11.關閉資源
所需依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
對Rabbit的連接進行封裝
public class RabbitMQConnectionUtils {
/**
* 獲得Rabbit的連接
* 類比JDBC
*/
public static Connection getConection() throws IOException, TimeoutException {
//1.創建連接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//2.設置的rabbitMq的服務的主機,預設localhost
connectionFactory.setHost("localhost");
//3.設置的rabbitMq的服務的埠,預設為5672
connectionFactory.setPort(5672);
//4.設置的rabbitMq的服務的虛擬主機
connectionFactory.setVirtualHost("testmq");
//5.設置用戶和密碼
connectionFactory.setUsername("chenbuting");
connectionFactory.setPassword("123");
//6.創建連接
Connection connection=connectionFactory.newConnection();
return connection;
}
}
基本消息隊列
生產者Product相關代碼
public class Product {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection=RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel= connection.createChannel();
//8.聲明隊列(創建隊列)
/*
* 參數1:指定隊列的名稱
* 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
* 參數3:指定是否獨占通道
* 參數4:是否自動刪除
* 參數5:指定額外參數
*/
channel.queueDeclare("simple_01",true,false,false,null);
//9.創建消息
String message="hello ,my name is chenbuting";
//10.發送消息
/*
* 參數1:指定交換機 簡單模式中使用預設交換機 指定空字元串
* 參數2: 指定routingkey 簡單模式 只需要指定隊列名稱即可
* 參數3: 指定攜帶的額外的參數 null
* 參數4:要發送的消息本身(位元組數組)
*/
channel.basicPublish("","simple_01",null,message.getBytes());
//11.關閉資源
channel.close();
connection.close();
}
}
消費者Consumer相關代碼
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection= RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel=connection.createChannel();
//8.聲明隊列
channel.queueDeclare("simple_01",true,false,false,null);
//9.接受消息
//處理消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
*consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
* envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
* properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
* body: 消息的正文,以位元組數組的形式提供。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息本身"+new String(body,"utf-8"));
System.out.println("exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("消息的序號"+envelope.getDeliveryTag());
}
};
//監聽器----->觀察者設計模式
channel.basicConsume("simple_01",true,consumer);
//10.建議不要關閉資源 建議一直監聽消息
}
}
idea允許開啟多個實例的方式
工作消息隊列
生產者相關代碼
public class WorkProduct {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection=RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel= connection.createChannel();
//8.聲明隊列(創建隊列)
/*
* 參數1:指定隊列的名稱
* 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
* 參數3:指定是否獨占通道
* 參數4:是否自動刪除
* 參數5:指定額外參數
*/
channel.queueDeclare("working_02",true,false,false,null);
for (int i=0;i<30;i++) {
//9.創建消息
String message = "hello ,my name is CHENBUTING"+i;
//10.發送消息
/*
* 參數1:指定交換機 簡單模式中使用預設交換機 指定空字元串
* 參數2: 指定routingkey 簡單模式 只需要指定隊列名稱即可
* 參數3: 指定攜帶的額外的參數 null
* 參數4:要發送的消息本身(位元組數組)
*/
channel.basicPublish("", "working_02", null, message.getBytes());
}
//11.關閉資源
channel.close();
connection.close();
}
}
消費者相關代碼
WorkConsumer_1和WorkConsumer_2代碼內容
public class WorkConsumer_1 {
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection= RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel=connection.createChannel();
//8.聲明隊列
channel.queueDeclare("working_02",true,false,false,null);
//9.接受消息
//處理消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
*consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
* envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
* properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
* body: 消息的正文,以位元組數組的形式提供。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息本身"+new String(body,"utf-8"));
System.out.println("exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("消息的序號"+envelope.getDeliveryTag());
}
};
//監聽器----->觀察者設計模式
channel.basicConsume("working_02",true,consumer);
//10.建議不要關閉資源 建議一直監聽消息
}
}
訂閱模式
Fanout模式
Fanout生產者的相關代碼
public class FanoutProduct {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection=RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel= connection.createChannel();
//創建交換機
channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT);
//8.聲明隊列(創建隊列)
/*
* 參數1:指定隊列的名稱
* 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
* 參數3:指定是否獨占通道
* 參數4:是否自動刪除
* 參數5:指定額外參數
*/
channel.queueDeclare("fanout_queue1",true,false,false,null);
channel.queueDeclare("fanout_queue2",true,false,false,null);
//9.創建消息
String message = "my name is Fanout";
//10.發送消息
/*
*這段代碼使用 RabbitMQ 的 Java 客戶端庫中的 `queueBind` 方法來將一個隊列綁定到一個 fanout 類型的交換機上。下麵是各個參數的介紹:
1. `fanout_queue1`: 隊列名稱,表示要綁定的隊列的名稱。
2. `exchange_fanout`: 交換機名稱,表示要進行綁定的交換機的名稱。
3. `""`: 路由鍵,表示要使用的路由鍵。
*/
channel.queueBind("fanout_queue1","exchange_fanout","");
channel.queueBind("fanout_queue2","exchange_fanout","");
/*
* 參數1:指定交換機 簡單模式中使用預設交換機 指定空字元串
* 參數2: 指定routingkey 簡單模式 只需要指定隊列名稱即可
* 參數3: 指定攜帶的額外的參數 null
* 參數4:要發送的消息本身(位元組數組)
*/
channel.basicPublish("exchange_fanout", "", null, message.getBytes());
//11.關閉資源
channel.close();
connection.close();
}
}
Fanout消費者的相關代碼
public class FanoutConsumer_1 {
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection= RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel=connection.createChannel();
//8.聲明隊列
channel.queueDeclare("fanout_queue1",true,false,false,null);
//9.接受消息
//處理消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
*consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
* envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
* properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
* body: 消息的正文,以位元組數組的形式提供。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息本身"+new String(body,"utf-8"));
System.out.println("exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("消息的序號"+envelope.getDeliveryTag());
}
};
//監聽器----->觀察者設計模式
channel.basicConsume("fanout_queue1",true,consumer);
//10.建議不要關閉資源 建議一直監聽消息
}
}
direct模式
direct生產者模式
public class DirectProduct {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection=RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel= connection.createChannel();
//創建交換機
channel.exchangeDeclare("exchange_direct", BuiltinExchangeType.DIRECT);
//8.聲明隊列(創建隊列)
/*
* 參數1:指定隊列的名稱
* 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
* 參數3:指定是否獨占通道
* 參數4:是否自動刪除
* 參數5:指定額外參數
*/
channel.queueDeclare("direct_queue1",true,false,false,null);
channel.queueDeclare("direct_queue2",true,false,false,null);
//9.創建消息
String message1 = "this is add";
String message2 = "this is select";
//10.發送消息
/*
*這段代碼使用 RabbitMQ 的 Java 客戶端庫中的 `queueBind` 方法來將一個隊列綁定到一個 fanout 類型的交換機上。下麵是各個參數的介紹:
1. `fanout_queue1`: 隊列名稱,表示要綁定的隊列的名稱。
2. `exchange_fanout`: 交換機名稱,表示要進行綁定的交換機的名稱。
3. `""`: 路由鍵,表示要使用的路由鍵。
*/
channel.queueBind("direct_queue1","exchange_direct","user.add");
channel.queueBind("direct_queue2","exchange_direct","user.select");
/*
* 參數1:指定交換機 簡單模式中使用預設交換機 指定空字元串
* 參數2: 指定routingkey 簡單模式 只需要指定隊列名稱即可
* 參數3: 指定攜帶的額外的參數 null
* 參數4:要發送的消息本身(位元組數組)
*/
channel.basicPublish("exchange_direct", "user.add", null, message1.getBytes());
channel.basicPublish("exchange_direct", "user.select", null, message2.getBytes());
//11.關閉資源
channel.close();
connection.close();
}
}
direct消費者模式
public class DirectConsumer_1 {
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection= RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel=connection.createChannel();
//8.聲明隊列
channel.queueDeclare("direct_queue1",true,false,false,null);
//9.接受消息
//處理消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
*consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
* envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
* properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
* body: 消息的正文,以位元組數組的形式提供。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息本身"+new String(body,"utf-8"));
System.out.println("exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("消息的序號"+envelope.getDeliveryTag());
}
};
//監聽器----->觀察者設計模式
channel.basicConsume("direct_queue1",true,consumer);
//10.建議不要關閉資源 建議一直監聽消息
}
}
Topic模式
生產模式相關代碼
public class TopicProduct {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection=RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel= connection.createChannel();
//創建交換機
channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC);
//8.聲明隊列(創建隊列)
/*
* 參數1:指定隊列的名稱
* 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
* 參數3:指定是否獨占通道
* 參數4:是否自動刪除
* 參數5:指定額外參數
*/
channel.queueDeclare("topic_queue1",true,false,false,null);
channel.queueDeclare("topic_queue2",true,false,false,null);
//9.創建消息
String message1 = "this is add";
String message2 = "this is select";
String message3 = "this is update";
String message4 = "this is delete";
//10.發送消息
/*
*這段代碼使用 RabbitMQ 的 Java 客戶端庫中的 `queueBind` 方法來將一個隊列綁定到一個 fanout 類型的交換機上。下麵是各個參數的介紹:
1. `fanout_queue1`: 隊列名稱,表示要綁定的隊列的名稱。
2. `exchange_fanout`: 交換機名稱,表示要進行綁定的交換機的名稱。
3. `""`: 路由鍵,表示要使用的路由鍵。
*/
channel.queueBind("topic_queue1","exchange_topic","user.*");
channel.queueBind("topic_queue2","exchange_topic","item.*");
/*
* 參數1:指定交換機 簡單模式中使用預設交換機 指定空字元串
* 參數2: 指定routingkey 簡單模式 只需要指定隊列名稱即可
* 參數3: 指定攜帶的額外的參數 null
* 參數4:要發送的消息本身(位元組數組)
*/
channel.basicPublish("exchange_topic", "user.add", null, message1.getBytes());
channel.basicPublish("exchange_topic", "user.select", null, message2.getBytes());
channel.basicPublish("exchange_topic", "item.update", null, message3.getBytes());
channel.basicPublish("exchange_topic", "item.delete", null, message4.getBytes());
//11.關閉資源
channel.close();
connection.close();
}
}
消費者相關代碼
public class TopicConsumer_1 {
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連接
Connection connection= RabbitMQConnectionUtils.getConection();
//7.創建頻道
Channel channel=connection.createChannel();
//8.聲明隊列
channel.queueDeclare("topic_queue1",true,false,false,null);
//9.接受消息
//處理消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
*consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
* envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
* properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
* body: 消息的正文,以位元組數組的形式提供。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息本身"+new String(body,"utf-8"));
System.out.println("exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("消息的序號"+envelope.getDeliveryTag());
}
};
//監聽器----->觀察者設計模式
channel.basicConsume("topic_queue1",true,consumer);
//10.建議不要關閉資源 建議一直監聽消息
}
}
主意:
4. springboot整合RabbitMQ
本次整合需要創建2個springboot項目,一個為生產者,一個為消費者。
direct exchange(直連型交換機)
生產者相關的整合
相關依賴
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application相關配置
spring:
#給項目來個名字
application:
name: rabbitmq-provider
#配置rabbitMq 伺服器
rabbitmq:
host: localhost
port: 5672
username: chenbuting
password: root
#虛擬host 可以不設置,使用server預設host
#virtual-host: JCcccHost
配置類
@Configuration
public class DirectRabbitConfig {
//隊列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,預設是false,持久化隊列:會被存儲在磁碟上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
// exclusive:預設也是false,只能被當前創建的連接使用,而且當連接關閉後隊列即被刪除。此參考優先順序高於durable
// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
// return new Queue("TestDirectQueue",true,true,false);
//一般設置一下隊列的持久化就好,其餘兩個就是預設false
return new Queue("TestDirectQueue",true);
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
//單獨的 Direct 類型的交換機
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
controller層
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
消費者相關的整合
所需pom依賴
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application相關配置
spring:
#給項目來個名字
application:
name: rabbitmq-provider
#配置rabbitMq 伺服器
rabbitmq:
host: localhost
port: 5672
username: chenbuting
password: root
#虛擬host 可以不設置,使用server預設host
#virtual-host: JCcccHost
相關配置類(消費者單獨使用,可以不用添加配置,直接監聽就行,直接使用註釋監聽器監聽對應的隊列即可,配置的話,則消費者也可當做生產者的身份,這樣也就可以推送消息了)
@Configuration
public class DirectRabbitConfig {
//隊列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
//綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
創建消息監聽
@Component
@RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString());
}
}
Topic Exchange主題交換機
生產者相關整合
相關配置類
@Configuration
public class TopicRabbitConfig {
//綁定鍵
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
//這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#
// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
添加兩個介面,用於將消息推送到交換機上
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}
}
消費者相關整合
創建TopicManReceiver
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicManReceiver消費者收到消息 : " + testMessage.toString());
}
}
創建TopicTotalReceiver
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicTotalReceiver消費者收到消息 : " + testMessage.toString());
}
}
其他類似
本文來自博客園,作者:陳步汀,轉載請註明原文鏈接:https://www.cnblogs.com/chenbuting/p/17537371.html