rabbitMq 初步

来源:https://www.cnblogs.com/zhenghongxin/archive/2019/04/29/10792818.html
-Advertisement-
Play Games

RabbitMQ的工作原理 它的基本結構 組成部分說明如下: Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue。 Exchange:消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。 Queue:消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的 ...


RabbitMQ的工作原理

它的基本結構

組成部分說明如下:

Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue。

Exchange:消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。

Queue:消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的消費方。

Producer:消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ。

Consumer:消息消費者,即消費方客戶端,接收MQ轉發的消息。

 

Maven舉例配置

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本與spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>

生產者舉例Demo

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //聲明交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啟之後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //聲明QUEUE_INFORM_EMAIL隊列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //聲明QUEUE_INFORM_SMS隊列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}

 

消費者舉例Demo

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //聲明交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啟之後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //聲明QUEUE_INFORM_EMAIL隊列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //聲明QUEUE_INFORM_SMS隊列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}

工作模式 

RabbitMQ有以下幾種工作模式 :

1、Work queues

2、Publish/Subscribe

3、Routing

4、Topics

5、Header

6、RPC

 

Work queues

work queues與入門程式相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。

應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

測試:

1、使用入門程式,啟動多個消費者。

2、生產者發送多個消息。

結果:

1、一條消息只會被一個消費者接收;

2、rabbit採用輪詢的方式將消息是平均發送給消費者的;

3、消費者在處理完某條消息後,才會收到下一條消息。

 

Publish/subscribe 發佈訂閱模式

發佈訂閱模式:

1、每個消費者監聽自己的隊列。

2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收

到消息

 

Routin

路由模式:

1、每個消費者監聽自己的隊列,並且設置routingkey。

2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。

這是一種非常靈活的模式,經常被用到

 

Topics

 

 

路由模式:

1、每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。

2、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列。

 

Header模式

header模式與routing不同的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配

隊列。

案例:

根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種

通知類型都接收的則兩種通知都有效。

 

生產者Demo:

 

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

通知Demo :

String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消費者綁定的header
//headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

發送郵件消費者 : 

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交換機和隊列綁定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消費隊列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

 

RPC

 

 

RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的非同步調用,基於Direct交換機實現,流程如下:

1、客戶端即是生產者就是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。

2、服務端監聽RPC請求隊列的消息,收到消息後執行服務端的方法,得到方法返回的結果

3、服務端將RPC方法 的結果發送到RPC響應隊列

4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.包裝類 把八大基本數據類型封裝到一個類中(包裝類),並提供屬性和方法。讓我們更加方便的操作基本數據類型。但包裝類的出現並不是為了取代基本數據類型,也取代不了。 包裝類位於java.lang包中。 Number 類 Number數值類型是byte、double、float、int、long 和 s ...
  • 什麼是雙端隊列? ArrayDeque是怎麼實現雙端隊列的? ArrayDeque是線程安全的嗎? ArrayDeque是有界的嗎? ...
  • 新聞 ".NET版本的Apache Spark" "Apache Spark預覽版介紹" "F Apache Spark示例" "微軟Build 2019大會(5月6日至8日)" "Rider用於F 的解決方案內的重命名" "Spark+AI峰會——開發智能雲與智能邊緣" "CNTK最新的重大發佈— ...
  • 1.常量是什麼?有什麼存在的意思? 答:舉個例子,公司開發,資料庫的地址用戶名密碼等信息一般固定不變,不需要後面程式改動。 如果用變數,$db = 'xx';其他人寫程式,後面好巧不巧,修改了這個變數,是不是就出問題了。 再比如,程式中要用到圓周率等,用變數存,就怕哪天被修改了,程式上是不會報錯的, ...
  • 1. swagger知識點補充 1.1. 概述 1. 在swagger的使用過程中,除了網上常見的例子,還會有很多細節上的東西需要註意和改寫,這裡我列幾點我使用過程中遇到的問題和改進方式 1.2. 知識點 1.2.1. 模型例子 1. 我們在進行POST的請求的時候,尤其是增加一條數據,我們往往會有 ...
  • 一、引言 在學習集合的時候我們會發現一個問題,將一個對象丟到集合中後,集合併不記住對象的類型,統統都當做Object處理,這樣我們取出來再使用時就得強制轉換類型,導致代碼臃腫,而且加入集合時都是以Object,沒做類型檢查,那麼強制轉換就容易出錯,泛型的誕生就是為解決這些問題。 二、使用泛型 泛型是 ...
  • 前面介紹的文件I/O,不管是寫入文本還是寫入對象,文件中的數據基本是原來的模樣,用記事本之類的文本編輯軟體都能瀏覽個大概。這麼存儲數據,要說方便確實方便,只是不夠經濟划算,原因有二:其一,寫入的數據可能存在大量重覆的信息,但依原樣寫到文件的話,無疑保留了不少冗餘數據,造成空間浪費;其二,寫入的數據多 ...
  • 本例使用的時python2.7環境,python3的操作應該也是差不多的。 需要用到smtplib和email兩個包。 發送文本類型的郵件 下麵看個發送文本郵件的例子(使用網易163的SMTP): 好像網易的SMTP有坑,message['From']和message['To']都要和sender和 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...