RabbitMQ的六種工作模式總結

来源:https://www.cnblogs.com/xyfer1018/archive/2019/09/25/11581511.html
-Advertisement-
Play Games

最近學習RabbitMQ的使用方式,記錄下來,方便以後使用,也方便和大家共用,相互交流。 RabbitMQ的六種工作模式: 1、Work queues2、Publish/subscribe3、Routing4、Topics5、Header 模式6、RPC 一、Work queues 多個消費端消費同 ...


最近學習RabbitMQ的使用方式,記錄下來,方便以後使用,也方便和大家共用,相互交流。

RabbitMQ的六種工作模式:

1、Work queues
2、Publish/subscribe
3、Routing
4、Topics
5、Header 模式
6、RPC

一、Work queues

多個消費端消費同一個隊列中的消息,隊列採用輪詢的方式將消息是平均發送給消費者;

 

 特點:

1、一條消息只會被一個消費端接收;

2、隊列採用輪詢的方式將消息是平均發送給消費者的;

3、消費者在處理完某條消息後,才會收到下一條消息

生產端:

1、聲明隊列

2、創建連接

3、創建通道

4、通道聲明隊列

5、制定消息

6、發送消息,使用預設交換機

消費端:

1、聲明隊列

2、創建連接

3、創建通道

4、通道聲明隊列

5、重寫消息消費方法

6、執行消息方法

新建兩個maven工程,生產消息的生產端,消費消息的消費端;

pom.xml文件中依賴坐標如下:

<dependencies>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.0</version>
        </dependency>
</dependencies>

 生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、制定消息
6、發送消息,使用預設交換機
*/
public class Producer02 {
    //聲明隊列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接埠
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機名稱為“/”,虛擬機相當於一個獨立的mq伺服器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發佈方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這裡沒有指定交換機,消息將發送給預設交換機,每個隊列也會綁定那個預設的交換機,但是不能顯示綁定或解除綁定
                  * 預設的交換機,routingKey等於隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、重寫消息消費方法
6、執行消息方法
*/
public class Consumer02 {
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收消息調用此方法
                  * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌
                    (收到消息失敗後是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生產端啟動後,控制台列印信息如下:

 RabbitMQ中的已有消息:

 queue中的消息正是生產端發送的消息:

 二、Publish/subscribe 模式

這種模式又稱為發佈訂閱模式,相對於Work queues模式,該模式多了一個交換機,生產端先把消息發送到交換機,再由交換機把消息發送到綁定的隊列中,每個綁定的隊列都能收到由生產端發送的消息。

發佈訂閱模式:

1、每個消費者監聽自己的隊列;

2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有簡訊、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機

7、制定消息

8、發送消息

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機

7、重寫消息消費方法

8、執行消息方法

Publish/subscribe 模式綁定兩個消費端,因此需要有兩個消費端,一個郵件消費端,一個簡訊消費端;

生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {
    //聲明兩個隊列和一個交換機
    //Publish/subscribe發佈訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接埠
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機名稱為“/”,虛擬機相當於一個獨立的mq伺服器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發佈訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定簡訊隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe發佈訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發佈方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這裡沒有指定交換機,消息將發送給預設交換機,每個隊列也會綁定那個預設的交換機,但是不能顯示綁定或解除綁定
                  * 預設的交換機,routingKey等於隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Publish/subscribe發佈訂閱模式
                channel.basicPublish(EXCHANGE,"",null,message.getBytes());
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe發佈訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發佈訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe發佈訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費者接收消息調用此方法
              * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
              * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌
                (收到消息失敗後是否需要重新發送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的消息是:"+msg );
            }
            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

簡訊消費端的代碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe發佈訂閱模式
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發佈訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定簡訊隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe發佈訂閱模式
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費者接收消息調用此方法
              * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
              * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌
                (收到消息失敗後是否需要重新發送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的消息是:"+msg );
            }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

三、Routing 路由模式

Routing 模式又稱路由模式,該種模式除了要綁定交換機外,發消息的時候還要制定routing key,即路由key,隊列通過通道綁定交換機的時候,需要指定自己的routing key,這樣,生產端發送消息的時候也會指定routing key,通過routing key就可以把相應的消息發送到綁定相應routing key的隊列中去。

路由模式:

1、每個消費者監聽自己的隊列,並且設置routingkey;
2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列;

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有簡訊、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機並指定該隊列的routingkey

7、制定消息

8、發送消息並指定routingkey

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機並指定routingkey

7、重寫消息消費方法

8、執行消息方法

按照假設的應用場景,同樣,Routing 路由模式也是一個生產端,兩個消費端,所不同的是,聲明交換機的類型不同,隊列綁定交換機的時候需要指定Routing key,發送消息的時候也需要指定Routing key,這樣根據Routing key就能把相應的消息發送到相應的隊列中去。

生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer03 {
    //聲明兩個隊列和一個交換機
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接埠
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機名稱為“/”,虛擬機相當於一個獨立的mq伺服器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道綁定交換機
            /**
             * 參數明細
             * 1、交換機名稱
             * 2、交換機類型,fanout、topic、direct、headers
             */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定簡訊隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
            //給email隊列發消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送email消息。。。");
                /**
                  * 消息發佈方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這裡沒有指定交換機,消息將發送給預設交換機,每個隊列也會綁定那個預設的交換機,但是不能顯示綁定或解除綁定
                  * 預設的交換機,routingKey等於隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes());
                System.out.println("mq消息發送成功!");
            }
            //給sms隊列發消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送sms消息。。。");
                /**
                  * 消息發佈方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這裡沒有指定交換機,消息將發送給預設交換機,每個隊列也會綁定那個預設的交換機,但是不能顯示綁定或解除綁定
                  * 預設的交換機,routingKey等於隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes());
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer03 {
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收消息調用此方法
                  * @param&nb

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 我們學習瞭解完了創建型設計模式和結構型設計模式,今天我們開始學習並瞭解行為型設計模式。今天我們首先來看這麼一個設計模式——模板方法模式。這個模式我們在平常開發中總會不自覺的使用到。就像我們平時一樣的各種網站模板、建立模板、PPT模板等等。啥意思呢?簡單,也就是把共同的東西拿出來,你需要具體去實 ...
  • 一 創建模型 表和表之間的關係 一對一、多對一、多對多 ,用book表和publish表自己來想想關係,想想裡面的操作,加外鍵約束和不加外鍵約束的區別,一對一的外鍵約束是在一對多的約束上加上唯一約束。 實例:我們來假定下麵這些概念,欄位和關係 作者模型:一個作者有姓名和年齡。 作者詳細模型:把作者的 ...
  • 2019-09-25-23:28:09 內置函數的使用 一.數據類型(4種) 1.int() 2.float() 這個是浮點數 3.bool() 4.complex() 創建一個複數. 第一個參數為實部, 第二個參數為虛部(感覺會很少用到這個) 二.進位轉換 1.bin() 將給的參數轉換成二進位 ...
  • CountVectorizer方法進行特征提取 from sklearn.feature.extraction.text import CountVectorizer 這個方法根據分詞進行數量統計繼續文本分類 文本特征提取 作用:對文本進行特征值化 sklearn.feature_extractio ...
  • 操作流程:1、打開文件 文件柄 = open(文件名,模式,編碼) 打開文件的模式有: 一、基本的打開方式: 1)r:只讀模式【預設】 f = open("file.txt", "r") data = f.read() f.close() print(data ) 2)w:只寫模式【不可讀;文件不存 ...
  • 在application.yml定義配置後,可以使用Environment來讀取配置,也可以使用@Value註解讓業務代碼去讀取配置。 如果屬性較多,可以定義屬性映射對象。 ...
  • /** * 計算兩個時間點的天數差 * @param dt1 第一個時間點 * @param dt2 第二個時間點 * @return int,即要計算的天數差 */ public static int dateDiff(LocalDateTime dt1,LocalDateTime dt2){ / ...
  • 1.創建一個邊界值為1而內部都是0的數組,圖例如下:[提示:]解此題可以先把所有值都設置為1,這是大正方形;其次,把邊界除外小正方形全部設置為0。本題用到numpy的切片原理。多維數組同樣遵循x[start:stop:step]的原理。[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.][ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...