RabbitMQ-消息中間鍵

来源:https://www.cnblogs.com/V-Notes/archive/2023/02/15/17124049.html
-Advertisement-
Play Games

MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅動架構中的Broker。 快速入門 1.publisher實現 public class PublisherTest { @Test public void testSendMessage() throws I ...


MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅動架構中的Broker。

快速入門

1.publisher實現

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設置連接參數,分別是:主機名、埠號、vhost、用戶名、密碼
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("rabbit");
        factory.setPassword("123321");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創建通道Channel
        Channel channel = connection.createChannel();

        // 3.創建隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.發送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("發送消息成功:【" + message + "】");

        // 5.關閉通道和連接
        channel.close();
        connection.close();

    }
}

2.consumer實現

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設置連接參數,分別是:主機名、埠號、vhost、用戶名、密碼
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創建通道Channel
        Channel channel = connection.createChannel();

        // 3.創建隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.訂閱消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.處理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

SpringAMQP-集成SpringBoot

1.Basic Queue 簡單隊列模型

a.導入依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

b.消息發送

首先配置MQ地址,在publisher服務的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主機名
    port: 5672 # 埠
    virtual-host: / # 虛擬主機
    username: rabbit # 用戶名
    password: 123321 # 密碼

然後在publisher服務中編寫測試類SpringAmqpTest,並利用RabbitTemplate實現消息發送:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 隊列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

c.消息接收

首先配置MQ地址,在consumer服務的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主機名
    port: 5672 # 埠
    virtual-host: / # 虛擬主機
    username: rabbit # 用戶名
    password: 123321 # 密碼

然後在consumer服務的cn.itcast.mq.listener包中新建一個類SpringRabbitListener,代碼如下:

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消費者接收到消息:【" + msg + "】");
    }
}

2.WorkQueue-任務隊列

Work queues,也被稱為(Task queues),任務模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息

當消息處理比較耗時的時候,可能生產消息的速度會遠遠大於消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。

此時就可以使用work 模型,多個消費者共同處理消息處理,速度就能大大提高了。

但在預設情況下,消息是平均分配給每個消費者,並沒有考慮到消費者的處理能力。這樣顯然是有問題的。

在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

3.Fanout-廣播

在廣播模式下,消息發送流程是這樣的:

  • 1) 可以有多個隊列
  • 2) 每個隊列都要綁定到Exchange(交換機)
  • 3) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定
  • 4) 交換機把消息發送給綁定過的所有隊列
  • 5) 訂閱隊列的消費者都能拿到消息

a.聲明隊列和交換機

Spring提供了一個介面Exchange,來表示所有不同類型的交換機,

在consumer中創建一個類,聲明隊列和交換機:

@Configuration
public class FanoutConfig {
    /**
     * 聲明交換機
     * @return Fanout類型交換機
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("rabbit.fanout");
    }

    /**
     * 第1個隊列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 綁定隊列和交換機
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2個隊列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 綁定隊列和交換機
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

b.消息發送

在publisher服務的SpringAmqpTest類中添加測試方法:

@Test
public void testFanoutExchange() {
    // 隊列名稱
    String exchangeName = "rabbit.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

c.消息接收

在consumer服務的SpringRabbitListener中添加兩個方法,作為消費者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}

4.Direct-指定

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey
  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

案例需求如下

  1. 利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務中,編寫兩個消費者方法,分別監聽direct.queue1和direct.queue2

  3. 在publisher中編寫測試方法,向itcast. direct發送消息

a.基於註解聲明隊列和交換機

基於@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基於註解方式來聲明。

在consumer的SpringRabbitListener中添加兩個消費者,同時基於註解來聲明隊列和交換機:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "rabbit.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "rabbit.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】");
}

b.消息發送

在publisher服務的SpringAmqpTest類中添加測試方法:

@Test
public void testSendDirectExchange() {
    // 交換機名稱
    String exchangeName = "rabbit.direct";
    // 消息
    String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!";
    // 發送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

5.Topic-主題

Topic類型的ExchangeDirect相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!

案例需求:

實現思路如下:

  1. 並利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務中,編寫兩個消費者方法,分別監聽topic.queue1和topic.queue2

  3. 在publisher中編寫測試方法,向itcast. topic發送消息

a.消息發送

在publisher服務的SpringAmqpTest類中添加測試方法:

/**
     * topicExchange
     */
@Test
public void testSendTopicExchange() {
    // 交換機名稱
    String exchangeName = "rabbit.topic";
    // 消息
    String message = "喜報!孫悟空大戰哥斯拉,勝!";
    // 發送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

b.消息接收

在consumer服務的SpringRabbitListener中添加方法:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "rabbit.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "rabbit.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】");
}

6.消息轉換器

Spring會把你發送的消息序列化為位元組發送給MQ,接收消息的時候,還會把位元組反序列化為Java對象。

只不過,預設情況下Spring採用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:

  • 數據體積過大
  • 有安全漏洞
  • 可讀性差

因此可以使用JSON方式來做序列化和反序列化。

在publisher和consumer兩個服務中都引入依賴:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.5</version>
</dependency>

配置消息轉換器。

在啟動類中添加一個Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

#安裝(Docker)

1.拉取鏡像

docker pull rabbitmq:3-management

2.部署容器

docker run \
 -e RABBITMQ_DEFAULT_USER=rabbit \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 --restart=always \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 第一章 初識SpringMVC 1.1 SpringMVC概述 SpringMVC是Spring子框架 SpringMVC是Spring 為**【展現層|表示層|表述層|控制層】**提供的基於 MVC 設計理念的優秀的 Web 框架,是目前最主流的MVC 框架。 SpringMVC是非侵入式:可以使 ...
  • 中文亂碼處理 1.問題拋出 當表單提交的數據為中文時,會出現亂碼: (1)Monster.java: package com.li.web.datavalid.entity; import org.hibernate.validator.constraints.Email; import org.h ...
  • 這篇文章主要講述一種新的分散式調度策略:共用狀態調度,它包含多個調度器,每個調度器都擁有整個集群的全局資源狀態信息。另外還比較了三種調度策略:單體策略、兩層策略和共用狀態策略。 ...
  • 引言 UICollectionView 是 iOS 平臺上一種強大的視圖佈局工具,能夠很好地實現網格佈局,列表佈局等多種佈局方式。 首先講下今天的目標,我們將要使用 UICollectionView 來創建仿微博的九宮格內容。首先,目標行數為3,每行顯示3張圖片,總共顯示9張圖片。 實現方式 我們往 ...
  • 我是3y,一年CRUD經驗用十年的markdown程式員👨🏻‍💻常年被譽為職業八股文選手 今天繼續更新Austin,給Austin新增一個發送渠道(PUSH通知欄推送) Push通知欄消息是非常常見的,幾乎每個APP都會做這個功能(沒有消息推送的APP不是一個好的APP) 一般我們認為Push ...
  • 工欲善其事必先利其器,在使用Python開發程式之前,在電腦上搭建Python開發環境是必不可少的環節,目前Python最新穩定版本是3.11.1,且支持到2027年,如下圖所示 本文手把手帶你從0 到1搭建Python最新版3.11.1開發環境,堪稱保姆級教程,快快收藏啦~ 一、Python解釋 ...
  • 最近幾年,Java 的技術棧發展的非常快,Java作為一門十分流行的面向對象編程語言,其開發工具也是非常多的,當然因為接觸時間長短以及個人喜好,每個人都有自己的選擇。對此,我對目前市面上常見的Java開發工具做了一些簡單的整理,希望能幫助到一些小伙伴。 一、常見的Java開發工具有哪些? Eclip ...
  • 一、字元串str 大白話的意思其實就是文本類型的數據>>>:引號引起來的部分都是字元串 應用場景:姓名 地址 愛好 name = 'kevin' addr = '浦東新區' hobby = '學習' 定義字元串的四種方式 # 方式1: name = 'kevin' # 方式2: name = "ke ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...