RabbitMQ基本配置

来源:https://www.cnblogs.com/chenbuting/archive/2023/07/08/17537371.html
-Advertisement-
Play Games

# 1.用戶角色配置 ![image.png](https://cdn.nlark.com/yuque/0/2023/png/38371876/1688636206975-acd927ca-1559-4236-85f2-07283999d50b.png#averageHue=%23f3f2f2&cl ...


1.用戶角色配置

image.png
自帶的guest/guest 超級管理員
五中不同角色配置:

  1. 普通管理者(management):僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
  2. 策略制定者(policymaker):可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息。
  3. 監控者 (monitoring):登錄管理控制台 查看所有的信息(Rabbit的相關節點的信息,記憶體使用信息,磁碟的情況)
  4. 超級管理員 administrator:登錄管理控制台 查看所有的信息 對所有用戶的策略進行操作
  5. 其他:無法登陸管理控制台,通常就是普通的生產者和消費者(僅僅接收信息或者發送消息)。

2.Virtual Hosts配置

每一個 Virtual Hosts相互隔離, 就相當於一個相對獨立的RabbitMQ,===> exchange queue message不是互通
可以理解為:一個Mysql又很多資料庫,每一個資料庫就相當於一個Virtual Hosts

2.1創建Virtual Hosts

image.png

2.2許可權的分配

點擊對應Virtual Hostsv 名字,進入配置頁面
image.png
image.png
該許可權配置參數說明:

  • user:用戶名
  • configure :一個正則表達式,用戶對符合該正則表達式的所有資源擁有 configure 操作的許可權
  • write:一個正則表達式,用戶對符合該正則表達式的所有資源擁有 write 操作的許可權
  • read:一個正則表達式,用戶對符合該正則表達式的所有資源擁有 read 操作的許可權

3.入門案例

使用思路

生產者:  --->按照JDBC思路
1. 創建連接工廠
2. 設置的rabbitMq的服務的主機 預設localhost
3. 設置的rabbitMq的服務的埠 預設5672
4. 設置的rabbitMq的服務的虛擬主機
5. 設置用戶和密碼
6. 創建連接
7. 創建頻道
8. 聲明隊列
9. 創建消息
10. 發送消息
11.關閉資源

(觀察者模式)
RabbitMQ 使用的是發佈訂閱模式
image.png

所需依賴

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.6.0</version>
</dependency>

對Rabbit的連接進行封裝

public class RabbitMQConnectionUtils {
    /**
     * 獲得Rabbit的連接
     * 類比JDBC
     */
    public static Connection getConection() throws IOException, TimeoutException {
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //2.設置的rabbitMq的服務的主機,預設localhost
        connectionFactory.setHost("localhost");
        //3.設置的rabbitMq的服務的埠,預設為5672
        connectionFactory.setPort(5672);
        //4.設置的rabbitMq的服務的虛擬主機
        connectionFactory.setVirtualHost("testmq");
        //5.設置用戶和密碼
        connectionFactory.setUsername("chenbuting");
        connectionFactory.setPassword("123");
        //6.創建連接
        Connection connection=connectionFactory.newConnection();
        return connection;
    }
}

基本消息隊列

生產者Product相關代碼

public class Product {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel= connection.createChannel();
        //8.聲明隊列(創建隊列)
        /*
         * 參數1:指定隊列的名稱
         * 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
         * 參數3:指定是否獨占通道
         * 參數4:是否自動刪除
         * 參數5:指定額外參數
         */
        channel.queueDeclare("simple_01",true,false,false,null);
        //9.創建消息
        String message="hello ,my name is chenbuting";
        //10.發送消息
        /*
         * 參數1:指定交換機  簡單模式中使用預設交換機  指定空字元串
         * 參數2: 指定routingkey  簡單模式  只需要指定隊列名稱即可
         * 參數3: 指定攜帶的額外的參數  null
         * 參數4:要發送的消息本身(位元組數組)
         */
        channel.basicPublish("","simple_01",null,message.getBytes());
        //11.關閉資源
        channel.close();
        connection.close();
    }
}

消費者Consumer相關代碼

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection= RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel=connection.createChannel();
        //8.聲明隊列
        channel.queueDeclare("simple_01",true,false,false,null);
        //9.接受消息
        //處理消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
             *consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
             * envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
             * properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
             * body: 消息的正文,以位元組數組的形式提供。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息本身"+new String(body,"utf-8"));
                System.out.println("exchange"+envelope.getExchange());
                System.out.println("RoutingKey"+envelope.getRoutingKey());
                System.out.println("消息的序號"+envelope.getDeliveryTag());
            }
        };
        //監聽器----->觀察者設計模式
        channel.basicConsume("simple_01",true,consumer);
        //10.建議不要關閉資源     建議一直監聽消息
    }
}

image.png

idea允許開啟多個實例的方式

image.png

工作消息隊列

生產者相關代碼

public class WorkProduct {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel= connection.createChannel();
        //8.聲明隊列(創建隊列)
        /*
         * 參數1:指定隊列的名稱
         * 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
         * 參數3:指定是否獨占通道
         * 參數4:是否自動刪除
         * 參數5:指定額外參數
         */
        channel.queueDeclare("working_02",true,false,false,null);
        for (int i=0;i<30;i++) {
            //9.創建消息
            String message = "hello ,my name is CHENBUTING"+i;
            //10.發送消息
            /*
             * 參數1:指定交換機  簡單模式中使用預設交換機  指定空字元串
             * 參數2: 指定routingkey  簡單模式  只需要指定隊列名稱即可
             * 參數3: 指定攜帶的額外的參數  null
             * 參數4:要發送的消息本身(位元組數組)
             */
            channel.basicPublish("", "working_02", null, message.getBytes());
        }
        //11.關閉資源
        channel.close();
        connection.close();
    }
}

消費者相關代碼

WorkConsumer_1和WorkConsumer_2代碼內容

public class WorkConsumer_1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection= RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel=connection.createChannel();
        //8.聲明隊列
        channel.queueDeclare("working_02",true,false,false,null);
        //9.接受消息
        //處理消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
             *consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
             * envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
             * properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
             * body: 消息的正文,以位元組數組的形式提供。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息本身"+new String(body,"utf-8"));
                System.out.println("exchange"+envelope.getExchange());
                System.out.println("RoutingKey"+envelope.getRoutingKey());
                System.out.println("消息的序號"+envelope.getDeliveryTag());
            }
        };
        //監聽器----->觀察者設計模式
        channel.basicConsume("working_02",true,consumer);
        //10.建議不要關閉資源     建議一直監聽消息
    }
}

結果
image.pngimage.png

訂閱模式

Fanout模式

Fanout生產者的相關代碼

public class FanoutProduct {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel= connection.createChannel();
        //創建交換機
        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT);
        //8.聲明隊列(創建隊列)
        /*
         * 參數1:指定隊列的名稱
         * 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
         * 參數3:指定是否獨占通道
         * 參數4:是否自動刪除
         * 參數5:指定額外參數
         */
        channel.queueDeclare("fanout_queue1",true,false,false,null);
        channel.queueDeclare("fanout_queue2",true,false,false,null);
        //9.創建消息
        String message = "my name is Fanout";
        //10.發送消息
        /*
         *這段代碼使用 RabbitMQ 的 Java 客戶端庫中的 `queueBind` 方法來將一個隊列綁定到一個 fanout 類型的交換機上。下麵是各個參數的介紹:
                1. `fanout_queue1`: 隊列名稱,表示要綁定的隊列的名稱。
                2. `exchange_fanout`: 交換機名稱,表示要進行綁定的交換機的名稱。
                3. `""`: 路由鍵,表示要使用的路由鍵。
         */
        channel.queueBind("fanout_queue1","exchange_fanout","");
        channel.queueBind("fanout_queue2","exchange_fanout","");
        /*
         * 參數1:指定交換機  簡單模式中使用預設交換機  指定空字元串
         * 參數2: 指定routingkey  簡單模式  只需要指定隊列名稱即可
         * 參數3: 指定攜帶的額外的參數  null
         * 參數4:要發送的消息本身(位元組數組)
         */
        channel.basicPublish("exchange_fanout", "", null, message.getBytes());
        //11.關閉資源
        channel.close();
        connection.close();
    }
}

Fanout消費者的相關代碼

public class FanoutConsumer_1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection= RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel=connection.createChannel();
        //8.聲明隊列
        channel.queueDeclare("fanout_queue1",true,false,false,null);
        //9.接受消息
        //處理消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
             *consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
             * envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
             * properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
             * body: 消息的正文,以位元組數組的形式提供。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息本身"+new String(body,"utf-8"));
                System.out.println("exchange"+envelope.getExchange());
                System.out.println("RoutingKey"+envelope.getRoutingKey());
                System.out.println("消息的序號"+envelope.getDeliveryTag());
            }
        };
        //監聽器----->觀察者設計模式
        channel.basicConsume("fanout_queue1",true,consumer);
        //10.建議不要關閉資源     建議一直監聽消息
    }
}

direct模式

direct生產者模式

public class DirectProduct {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel= connection.createChannel();
        //創建交換機
        channel.exchangeDeclare("exchange_direct", BuiltinExchangeType.DIRECT);
        //8.聲明隊列(創建隊列)
        /*
         * 參數1:指定隊列的名稱
         * 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
         * 參數3:指定是否獨占通道
         * 參數4:是否自動刪除
         * 參數5:指定額外參數
         */
        channel.queueDeclare("direct_queue1",true,false,false,null);
        channel.queueDeclare("direct_queue2",true,false,false,null);
        //9.創建消息
        String message1 = "this is add";
        String message2 = "this is select";
        //10.發送消息
        /*
         *這段代碼使用 RabbitMQ 的 Java 客戶端庫中的 `queueBind` 方法來將一個隊列綁定到一個 fanout 類型的交換機上。下麵是各個參數的介紹:
                1. `fanout_queue1`: 隊列名稱,表示要綁定的隊列的名稱。
                2. `exchange_fanout`: 交換機名稱,表示要進行綁定的交換機的名稱。
                3. `""`: 路由鍵,表示要使用的路由鍵。
         */
        channel.queueBind("direct_queue1","exchange_direct","user.add");
        channel.queueBind("direct_queue2","exchange_direct","user.select");
        /*
         * 參數1:指定交換機  簡單模式中使用預設交換機  指定空字元串
         * 參數2: 指定routingkey  簡單模式  只需要指定隊列名稱即可
         * 參數3: 指定攜帶的額外的參數  null
         * 參數4:要發送的消息本身(位元組數組)
         */
        channel.basicPublish("exchange_direct", "user.add", null, message1.getBytes());
        channel.basicPublish("exchange_direct", "user.select", null, message2.getBytes());
        //11.關閉資源
        channel.close();
        connection.close();
    }
}

direct消費者模式

public class DirectConsumer_1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection= RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel=connection.createChannel();
        //8.聲明隊列
        channel.queueDeclare("direct_queue1",true,false,false,null);
        //9.接受消息
        //處理消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
             *consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
             * envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
             * properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
             * body: 消息的正文,以位元組數組的形式提供。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息本身"+new String(body,"utf-8"));
                System.out.println("exchange"+envelope.getExchange());
                System.out.println("RoutingKey"+envelope.getRoutingKey());
                System.out.println("消息的序號"+envelope.getDeliveryTag());
            }
        };
        //監聽器----->觀察者設計模式
        channel.basicConsume("direct_queue1",true,consumer);
        //10.建議不要關閉資源     建議一直監聽消息
    }
}

Topic模式

生產模式相關代碼

public class TopicProduct {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel= connection.createChannel();
        //創建交換機
        channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC);
        //8.聲明隊列(創建隊列)
        /*
         * 參數1:指定隊列的名稱
         * 參數2:指定消息是否持久化,一般設置為true,是否保存到磁碟當中去
         * 參數3:指定是否獨占通道
         * 參數4:是否自動刪除
         * 參數5:指定額外參數
         */
        channel.queueDeclare("topic_queue1",true,false,false,null);
        channel.queueDeclare("topic_queue2",true,false,false,null);
        //9.創建消息
        String message1 = "this is add";
        String message2 = "this is select";
        String message3 = "this is update";
        String message4 = "this is delete";

        //10.發送消息
        /*
         *這段代碼使用 RabbitMQ 的 Java 客戶端庫中的 `queueBind` 方法來將一個隊列綁定到一個 fanout 類型的交換機上。下麵是各個參數的介紹:
                1. `fanout_queue1`: 隊列名稱,表示要綁定的隊列的名稱。
                2. `exchange_fanout`: 交換機名稱,表示要進行綁定的交換機的名稱。
                3. `""`: 路由鍵,表示要使用的路由鍵。
         */
        channel.queueBind("topic_queue1","exchange_topic","user.*");
        channel.queueBind("topic_queue2","exchange_topic","item.*");
        /*
         * 參數1:指定交換機  簡單模式中使用預設交換機  指定空字元串
         * 參數2: 指定routingkey  簡單模式  只需要指定隊列名稱即可
         * 參數3: 指定攜帶的額外的參數  null
         * 參數4:要發送的消息本身(位元組數組)
         */
        channel.basicPublish("exchange_topic", "user.add", null, message1.getBytes());
        channel.basicPublish("exchange_topic", "user.select", null, message2.getBytes());
        channel.basicPublish("exchange_topic", "item.update", null, message3.getBytes());
        channel.basicPublish("exchange_topic", "item.delete", null, message4.getBytes());
        //11.關閉資源
        channel.close();
        connection.close();
    }
}

消費者相關代碼

public class TopicConsumer_1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //獲取連接
        Connection connection= RabbitMQConnectionUtils.getConection();
        //7.創建頻道
        Channel channel=connection.createChannel();
        //8.聲明隊列
        channel.queueDeclare("topic_queue1",true,false,false,null);
        //9.接受消息
        //處理消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
             *consumerTag: 消費者標簽,用於標識特定的消費者。每個消費者都有一個唯一的標簽,它可以用於取消訂閱或標識消息是由哪個消費者處理的。
             * envelope: 信封對象,包含與消息傳遞相關的元數據,如交換機、路由鍵、傳遞標簽等。
             * properties: AMQP 的基本屬性,包含附加的消息屬性,如消息持久性、優先順序、時間戳等。
             * body: 消息的正文,以位元組數組的形式提供。
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息本身"+new String(body,"utf-8"));
                System.out.println("exchange"+envelope.getExchange());
                System.out.println("RoutingKey"+envelope.getRoutingKey());
                System.out.println("消息的序號"+envelope.getDeliveryTag());
            }
        };
        //監聽器----->觀察者設計模式
        channel.basicConsume("topic_queue1",true,consumer);
        //10.建議不要關閉資源     建議一直監聽消息
    }
}

主意:

image.png
不要忘記在使用訂閱模式時修改該處的交換機分發策略

4. springboot整合RabbitMQ

本次整合需要創建2個springboot項目,一個為生產者,一個為消費者。

direct exchange(直連型交換機)

生產者相關的整合

相關依賴

<!--rabbitmq-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

application相關配置

spring:
  #給項目來個名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 伺服器
  rabbitmq:
    host: localhost
    port: 5672
    username: chenbuting
    password: root
    #虛擬host 可以不設置,使用server預設host
    #virtual-host: JCcccHost

配置類

@Configuration
public class DirectRabbitConfig {

    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,預設是false,持久化隊列:會被存儲在磁碟上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:預設也是false,只能被當前創建的連接使用,而且當連接關閉後隊列即被刪除。此參考優先順序高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般設置一下隊列的持久化就好,其餘兩個就是預設false
        return new Queue("TestDirectQueue",true);
    }

    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    }

    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }

    //單獨的 Direct 類型的交換機
    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }



}

controller層

@RestController
public class SendMessageController {
 
    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法
 
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
}

image.png

消費者相關的整合

所需pom依賴

<!--rabbitmq-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

application相關配置

spring:
  #給項目來個名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 伺服器
  rabbitmq:
    host: localhost
    port: 5672
    username: chenbuting
    password: root
    #虛擬host 可以不設置,使用server預設host
    #virtual-host: JCcccHost

相關配置類(消費者單獨使用,可以不用添加配置,直接監聽就行,直接使用註釋監聽器監聽對應的隊列即可,配置的話,則消費者也可當做生產者的身份,這樣也就可以推送消息了

@Configuration
public class DirectRabbitConfig {

    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue",true);
    }

    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange");
    }

    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
}

創建消息監聽

@Component
@RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue
public class DirectReceiver {
 
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消費者收到消息  : " + testMessage.toString());
    }
 
}

Topic Exchange主題交換機

生產者相關整合

相關配置類

 @Configuration
public class TopicRabbitConfig {
    //綁定鍵
    public final static String man = "topic.man";
    public final static String woman = "topic.woman";
 
    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.man);
    }
 
    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.woman);
    }
 
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }
 
 
    //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
    //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
    }
 
    //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#
    // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }
 
}

添加兩個介面,用於將消息推送到交換機上

    @GetMapping("/sendTopicMessage1")
    public String sendTopicMessage1() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: M A N ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> manMap = new HashMap<>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
        return "ok";
    }
 
    @GetMapping("/sendTopicMessage2")
    public String sendTopicMessage2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
        return "ok";
    }
}

消費者相關整合

創建TopicManReceiver

@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
 
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("TopicManReceiver消費者收到消息  : " + testMessage.toString());
    }
}

創建TopicTotalReceiver

@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
 
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("TopicTotalReceiver消費者收到消息  : " + testMessage.toString());
    }
}

其他類似

本文來自博客園,作者:陳步汀,轉載請註明原文鏈接:https://www.cnblogs.com/chenbuting/p/17537371.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 介紹 今天介紹一個非常簡單的入門級小案例,就是地圖的捲簾效果實現,各大地圖引擎供應商都有相關示例,很奇怪高德居然沒有,我看了下文檔發現其實也是可以簡單實現的,演示代碼放到文末。本文用到了圖層掩模,即圖層遮罩,讓圖層只在指定範圍內顯示。 實 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 背景 因為我們日常開發項目的時候,需要和同事對接api和文檔還有UI圖,所以有時候要同時打開多個視窗,併在多個視窗中切換,來選擇自己要的信息,如果api和文檔不多的情況還好,但是有時候就是要做大量的頁面,為了提升效率我決定自己做一個截圖工 ...
  • 一. 源碼展示: 1. Object.equals: ①引用類型地址值比較,直接返回結果:true || false public class Object { public boolean equals(Object obj) { return (this == obj); } } 2. Stri ...
  • ## 1.1 概述 > 說白了就是鍵值對的映射關係 > > 不會丟失數據本身關聯的結構,但不關註數據的順序 > > 是一種可變類型 ```py 格式:dic = {鍵:值, 鍵:值} ``` * 鍵的類型:字典的鍵可以是任何不可變的類型,如浮點數,字元串,元組 ## 1.2 函數dict 可以從其他 ...
  • ### 目錄 *1:什麼是AQS?* *2:AQS都有那些用途?* *3:我們如何使用AQS* *4:AQS的實現原理* *5:對AQS的設計與實現的一些思考* ### 1:什麼是AQS ​ 隨著電腦的算力越來越強大,各種各樣的並行編程模型也隨即踴躍而來,但當我們要在並行計算中使用共用資源的時候, ...
  • 電腦COM口數據測試一、基本使用流程 程式需要以管理員身份運行,COM口迴路測試需短接2,3pin,測試時候使用控制台,配置測試相關路徑,併在測試完成後 1.測試配置路徑D:\bigdata\INI\FWCOM.ini 2.測試完成後需要在路徑D:\bigdata\LOG\生成測試FWCOM.lo ...
  • # C++ 慣用法之 Copy-Swap 拷貝交換 > 這是“C++ 慣用法”合集的第 3 篇,前面 2 篇分別介紹了 RAII 和 PIMPL 兩種慣用法: > > - [RAII: Resouce Acquistion Is Initialization](https://www.cnblogs ...
  • **本文首先介紹了Django模板系統的基礎知識,接著探討瞭如何安裝和配置Django模板系統,然後深入解析了Django模板的基本結構、標簽和過濾器的用法,闡述瞭如何在模板中展示模型數據,最後使用一個實際項目的例子來演示如何在實際開發中使用Django模板系統。** ## Django模板系統的簡 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...