SpringBoot(二十二)集成RabbitMQ---MQ實戰演練

来源:https://www.cnblogs.com/toutou/archive/2019/01/01/springboot_rabbitmq.html
-Advertisement-
Play Games

RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。消息中間件的工作過程可以用生產者消費者模型來表示... ...


RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。

消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息,而消費者從消息隊列中消費信息.

如果你還沒有安裝rabbitmq的,可以看看這篇《centos安裝MQ》

不說了不說了,來一張圖直截了當的看看MQ工作的具體過程:

請叫我頭頭哥_RabbitMQ實戰演練

開局一張圖 故事全靠編.從上圖可看出,對於消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念,生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,並且當消息隊列收到消息之後,接收消息隊列傳來的消息,並且給予相應的處理.消息隊列常用於分散式系統之間互相信息的傳遞.

v基礎概念

對於RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產者和消息隊列之間產生了隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列.那麼RabitMQ的工作流程如下所示:

請叫我頭頭哥_RabbitMQ實戰演練

關於rabbitmq幾個基礎名詞的介紹:

Broker: 簡單來說就是消息隊列伺服器實體。 Exchange: 消息交換機,它指定消息按什麼規則,路由到哪個隊列。 Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。 Binding: 綁定,它的作用就是把exchange和queue按照路由規則綁定起來。 Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞。 vhost: 虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的許可權分離。 producer: 消息生產者,就是投遞消息的程式。 consumer: 消息消費者,就是接受消息的程式。 channel: 消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

交換機的主要作用是接收相應的消息並且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout:

Direct: 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “demo”,則只有被標記為“demo”的消息才被轉發,不會轉發demo.ooo,也不會轉發test.123,只會轉發demo。 Topic: 轉發信息主要是依據通配符,將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。 Headers: 根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被髮送到匹配的消息隊列中. Fanout: 路由廣播的形式,將會把消息發給綁定它的全部隊列,即便設置了key,也會被忽略.

v實戰演練

♛ 2.1 創建MQ

請叫我頭頭哥_RabbitMQ實戰演練

註:若是現有工程引入MQ,則添加Maven引用。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

這裡我們延續之前springboot系列博文中的例子hellospringboot,在已有項目中添加mq的Maven引用。

♛ 2.2 application.properties

在application.properties文件當中引入RabbitMQ基本的配置信息

# ----- MQ -------- #
spring.rabbitmq.host=192.168.11.108
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
♛ 2.3 添加實體類MyModel
package com.demo.mq.model;

import java.io.Serializable;
import java.util.UUID;

/**
 * Created by toutou on 2019/1/1.
 */
public class MyModel implements Serializable {
    private static final long serialVersionUID = 1L;
    private UUID id;
    private String info;

    public UUID getId() {
        return id;
    }

    public void setId(UUID id) {
        this.id = id;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }
}
♛ 2.4 添加RabbitConfig
package com.demo.mq.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * Created by toutou on 2019/1/1.
 */
@Configuration
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;


    public static final String EXCHANGE_A = "my-mq-exchange_A";
    public static final String EXCHANGE_B = "my-mq-exchange_B";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 針對消費者配置
     * 1. 設置交換機類型
     * 2. 將隊列綁定到交換機
     FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
     HeadersExchange :通過添加屬性key-value匹配
     DirectExchange:按照routingkey分發到指定隊列
     TopicExchange:多關鍵字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_A);
    }

    /**
     * 獲取隊列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //隊列持久
    }

    /**
     * 獲取隊列B
     * @return
     */
    @Bean
    public Queue queueB() {
        return new Queue(QUEUE_B, true); //隊列持久
    }

    /**
     * 把交換機,隊列,通過路由關鍵字進行綁定
     * @return
     */
    @Bean
    public Binding binding() {

        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

    /**
     * 一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發到不同的隊列當中去。
     * @return
     */
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
    }

}
♛ 2.5 添加消息的生產者MyProducer
package com.demo.mq.producer;

import com.demo.mq.common.RabbitConfig;
import com.demo.mq.model.MyModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by toutou on 2019/1/1.
 */
@Component
public class MyProducer implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //由於rabbitTemplate的scope屬性設置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動註入
    private RabbitTemplate rabbitTemplate;

    /**
     * 構造方法註入rabbitTemplate
     */
    @Autowired
    public MyProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最後設置的內容
    }

    public void sendMsg(MyModel model) {
        //把消息放入ROUTINGKEY_A對應的隊列當中去,對應的是隊列A
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, model);
    }

    /**
     * 回調
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回調id:" + correlationData);
        if (ack) {
            logger.info("消息成功消費");
        } else {
            logger.info("消息消費失敗:" + cause);
        }
    }
}
♛ 2.6 添加消息的消費者MyReceiver
package com.demo.mq.receiver;

import com.demo.mq.common.RabbitConfig;
import com.demo.mq.model.MyModel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by toutou on 2019/1/1.
 */
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MyReceiver {
    @RabbitHandler
    public void process(MyModel model) {
        System.out.println("接收處理隊列A當中的消息: " + model.getInfo());
    }
}
♛ 2.7 添加MyMQController
package com.demo.controller;

import com.demo.mq.model.MyModel;
import com.demo.mq.producer.MyProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * Created by toutou on 2019/1/1.
 */
@RestController
@Slf4j
public class MyMQController {
    @Autowired
    MyProducer myProducers;

    @GetMapping("/mq/producer")
    public String myProducer(String content){
        MyModel model = new MyModel();
        model.setId(UUID.randomUUID());
        model.setInfo(content);
        myProducers.sendMsg(model);
        return "已發送:" + content;
    }
}
♛ 2.8 項目整體目錄

請叫我頭頭哥_RabbitMQ實戰演練

 

♛ 2.9 調試

2.9.1 在頁面中請求http://localhost:8081/mq/producer?content=hello rabbitmq

請叫我頭頭哥_RabbitMQ實戰演練

2.9.2 查看http://ip:15672/#/queues的變化

關於RabbitMQ Management有疑問的,可以看上篇博文。《淺談RabbitMQ Management》

 

2.9.3 查看消費者日誌記錄

請叫我頭頭哥_RabbitMQ實戰演練

這樣一個完整的rabbitmq實例就有了。

v源碼地址

https://github.com/toutouge/javademo/tree/master/hellospringboot


作  者:請叫我頭頭哥
出  處:http://www.cnblogs.com/toutou/
關於作者:專註於基礎平臺的項目開發。如有問題或建議,請多多賜教!
版權聲明:本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評論和私信都會在第一時間回覆。也歡迎園子的大大們指正錯誤,共同進步。或者直接私信
聲援博主:如果您覺得文章對您有幫助,可以點擊文章右下角推薦一下。您的鼓勵是作者堅持原創和持續寫作的最大動力!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 個人博客原文: "依賴倒置原則" 設計模式六大原則之三:依賴倒置原則。 簡介 姓名 :依賴倒置原則 英文名 :Dependence Inversion Principle 價值觀 :大男子主義的典型代表,什麼都得通過老大或者老爸同意 伴侶 :一定是個溫柔體貼的女子 個人介紹 : 1. High le ...
  • 備忘錄模式通過保存一個對象的某個狀態,以便在需要的時候恢復該對象。 介紹 備忘錄模式屬於行為型模式,它通過在不破壞封裝性的前提下,捕獲一個對象的內部狀態,併在該對象之外保存這個狀態。 類圖描述 代碼實現 1、創建實體類 2、創建狀態處理類 3、創建儲存集合 4、上層調用 總結 備忘錄模式常用於數據的 ...
  • 我們在開發項目中會經常用到第三方的類庫插件,但是如果每次需要使用的時候都會在代碼的某一處去引入,然後在實例化,這樣做感覺很不方便,那麼怎麼實現自動載入呢,下麵簡單介紹使用composer實現自動載入: 原文地址:小時刻個人博客>http://small.aiweimeng.top/index.ph... ...
  • std::future和std::promise std::future std::future期待一個返回,從一個非同步調用的角度來說, future更像是執行函數的返回值 ,C++標準庫使用std::future為一次性事件建模,如果一個事件需要等待特定的一次性事件,那麼這線程可以獲取一個futu ...
  • 新的一年第一篇技術文章希望開個好頭,所以元旦三天我也沒怎麼閑著,希望給大家帶來一篇比較感興趣的乾貨內容。 老讀者應該還記得我在去年國慶節前分享過一篇《設計一個百萬級的消息推送系統》;雖然我在文中有貼一些偽代碼,依然有些朋友希望能直接分享一些可以運行的源碼;這麼久了是時候把坑填上了。 ...
  • 題意 "題目鏈接" $N$個物品,每次得到第$i$個物品的概率為$p_i$,而且有可能什麼也得不到,問期望多少次能收集到全部$N$個物品 Sol 最直觀的做法是直接狀壓,設$f[sta]$表示已經獲得了$sta$這個集合里的所有元素,距離全拿滿的期望,推一推式子直接轉移就好了 主程式代碼: cpp ...
  • 接上文:SpringBoot整合Mybatis【註解版】 一、項目創建 新建一個工程 ​ 選擇Spring Initializr,配置JDK版本 ​ 輸入項目名 ​ 選擇構建web項目所需的staters(啟動器) ​ 選擇與資料庫相關的組件 ​ 分析:Spring Boot基本上將我們實際項目開發 ...
  • [TOC] 環境 windows10 MySQL 8.0.13 IDEA 問題 分析 查閱資料發現這都是因為安裝mysql的時候時區設置的不正確 mysql預設的是美國的時區,而我們中國大陸要比他們遲8小時,採用+8:00格式; 在mysql中查看時區的設置: 解決方法 找到mysql的安裝目錄下的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...