RabbitMQ 核心概念

来源:https://www.cnblogs.com/haixiang/archive/2019/05/12/10853467.html
-Advertisement-
Play Games

[TOC] RabbitMQ 特點 RabbitMQ 相較於其他消息隊列,有一系列防止消息丟失的措施,擁有強悍的高可用性能,它的吞吐量可能沒有其他消息隊列大,但是其消息的保障性出類拔萃,被廣泛用於金融類業務。與其他消息隊列的比較以及強大的防止消息丟失的能力我們將在後續文章再做介紹。 AMQP 協議 ...


目錄


RabbitMQ 特點

RabbitMQ 相較於其他消息隊列,有一系列防止消息丟失的措施,擁有強悍的高可用性能,它的吞吐量可能沒有其他消息隊列大,但是其消息的保障性出類拔萃,被廣泛用於金融類業務。與其他消息隊列的比較以及強大的防止消息丟失的能力我們將在後續文章再做介紹。

AMQP 協議

AMQP: Advanced Message Queuing Protocol 高級消息隊列協議

AMQP定義:是具有現代特征的二進位協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。

Erlang語言最初在於交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數據交互的性能是非常優秀的
Erlang的優點: Erlang有著和原生Socket一樣的延遲。

RabbitMQ是一個開源的消息代理和隊列伺服器,用來通過普通協議在完全不同的應用之間共用數據, RabbitMQ是使用Erlang語言來編寫的,並且RabbitMQ是基於AMQP協議的。

RabbitMQ 消息傳遞機制

生產者發送消息到指定的 Exchange,Exchange 依據自身的類型(direct、topic等),根據 routing key 將消息發送給 0 - n 個 隊列,隊列再將消息轉發給了消費者。

Server: 又稱Broker, 接受客戶端的連接,實現AMQP實體服務,這裡指RabbitMQ 伺服器

Connection: 連接,應用程式與Broker的網路連接。

Channel: 網路通道,幾乎所有的操作都在 Channel 中進行,Channel是進行消息讀寫的通道。客戶端可建立多個Channel:,每個Channel代表一個會話任務。

Virtual host: 虛似地址,用於迸行邏輯隔離,是最上層的消息路由。一個 Virtual Host 裡面可以有若幹個 Exchange和 Queue ,同一個 VirtualHost 裡面不能有相同名稱的 Exchange 或 Queue。許可權控制的最小粒度是Virtual Host。

Binding: Exchange 和 Queue 之間的虛擬連接,binding 中可以包含 routing key。

Routing key: 一 個路由規則,虛擬機可用它來確定如何路由一個特定消息,即交換機綁定到 Queue 的鍵。

Queue: 也稱為Message Queue,消息隊列,保存消息並將它們轉發給消費者。

Message

消息,伺服器和應用程式之間傳送的數據,由 Properties 和 Body 組成。Properties 可以對消息進行修飾,比如消息的優先順序、延遲等高級特性;,Body 則就 是消息體內容。

properties 中我們可以設置消息過期時間以及是否持久化等,也可以傳入自定義的map屬性,這些在消費端也都可以獲取到。

生產者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;

public class MessageProducer {
    public static void main(String[] args) throws Exception {
        //1. 創建一個 ConnectionFactory 併進行設置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2. 通過連接工廠來創建連接
        Connection connection = factory.newConnection();

        //3. 通過 Connection 來創建 Channel
        Channel channel = connection.createChannel();

        //4. 聲明 使用預設交換機 以隊列名作為 routing key
        String queueName = "msg_queue";

        /**
         * deliverMode 設置為 2 的時候代表持久化消息
         * expiration 意思是設置消息的有效期,超過10秒沒有被消費者接收後會被自動刪除
         * headers 自定義的一些屬性
         * */
        //5. 發送
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("myhead1", "111");
        headers.put("myhead2", "222");

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("100000")
                .headers(headers)
                .build();
        String msg = "test message";
        channel.basicPublish("", queueName, properties, msg.getBytes());
        System.out.println("Send message : " + msg);

        //6. 關閉連接
        channel.close();
        connection.close();

    }
}

消費者

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;

public class MessageConsumer {
    public static void main(String[] args) throws Exception{
        //1. 創建一個 ConnectionFactory 併進行設置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. 通過連接工廠來創建連接
        Connection connection = factory.newConnection();

        //3. 通過 Connection 來創建 Channel
        Channel channel = connection.createChannel();

        //4. 聲明
        String queueName = "msg_queue";
        channel.queueDeclare(queueName, false, false, false, null);

        //5. 創建消費者並接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                Map<String, Object> headers = properties.getHeaders();
                System.out.println("head: " + headers.get("myhead1"));
                System.out.println(" [x] Received '" + message + "'");
                System.out.println("expiration : "+ properties.getExpiration());
            }
        };

        //6. 設置 Channel 消費者綁定隊列
        channel.basicConsume(queueName, true, consumer);
    }
}
Send message : test message

head: 111
 [x] Received 'test message'
100000

Exchange

1. 簡介

Exchange 就是交換機,接收消息,根據路由鍵轉發消息到綁定的隊列。有很多的 Message 進入到 Exchange 中,Exchange 根據 Routing key 將 Message 分發到不同的 Queue 中。

2. 類型

RabbitMQ 中的 Exchange 有多種類型,類型不同,Message 的分發機制不同,如下:

  • fanout:廣播模式。這種類型的 Exchange 會將 Message 分發到綁定到該 Exchange 的所有 Queue。
  • direct:這種類型的 Exchange 會根據 Routing key(精確匹配,將Message分發到指定的Queue。

  • Topic:這種類型的 Exchange 會根據 Routing key(模糊匹配,將Message分發到指定的Queue。
  • headers: 主題交換機有點相似,但是不同於主題交換機的路由是基於路由鍵,頭交換機的路由值基於消息的header數據。 主題交換機路由鍵只有是字元串,而頭交換機可以是整型和哈希值 .

3. 屬性

   /**
     * Declare an exchange, via an interface that allows the complete set of
     * arguments.
     * @see com.rabbitmq.client.AMQP.Exchange.Declare
     * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
     * @param exchange the name of the exchange
     * @param type the exchange type
     * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
     * @param autoDelete true if the server should delete the exchange when it is no longer in use
     * @param internal true if the exchange is internal, i.e. can't be directly
     * published to by a client.
     * @param arguments other properties (construction arguments) for the exchange
     * @return a declaration-confirm method to indicate the exchange was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Exchange.DeclareOk exchangeDeclare(String exchange,
                                       String type,boolean durable,
                                       boolean autoDelete,boolean internal,
                                       Map<String, Object> arguments) throws IOException;
  • Name: 交換機名稱
  • Type: 交換機類型direct、topic、 fanout、 headers
  • Durability: 是否需要持久化,true為持久化
  • Auto Delete: 當最後一個綁定到Exchange. 上的隊列刪除後,自動刪除該Exchange
  • Internal: 當前Exchange是否用於RabbitMQ內部使用,預設為False
  • Arguments: 擴展參數,用於擴展AMQP協議自製定化使用

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 背景 由於項目的新版本有大改動,需求是將一些舊表的數據轉移到新表來,於是使用PHP寫了數據腳本,對MySQL的數據進行讀取,計算,轉移,插入等處理,實現千萬級別數據的計算和轉移。這裡面也遇到一些問題,這裡做下總結: 需求 將幾個舊表的數據拷到新表來,有些欄位發生了變化,有些欄位的值需要計算; 單表數 ...
  • #while print('111')while true: print('我們不一樣') print('在人間') print('光輝歲月')print('222'),,,,,,count = 1flag = true while flag: print(count) count = count ...
  • leepcode 作業詳解 1、給定一個整數數組,判斷是否存在重覆元素。如果任何值在數組中出現至少兩次,函數返回 true。如果數組中每個元素都不相同,則返回 false。 正確解答 第一次用字典去解題,出現bug,雖然在pycharm裡面代碼運行正常,但是leepcode顯示錯誤。 2、給定一個整 ...
  • 引入依賴: 啟動類上添加@EnableFeignClients註解: 寫調用介面: 直接@Autowired註入服務調用介面: 底層使用了動態代理,對介面進行了實現。 並且封裝了RestTemplate遠程調用的代碼。 測試: 搞定~ 補充知識點: ...
  • 力扣題目彙總 1.兩數之和 1.題目描述 給定一個整數數組 和一個目標值 ,請你在該數組中找出和為目標值的那 兩個 整數,並返回他們的數組下標。 你可以假設每種輸入只會對應一個答案。但是,你不能重覆利用這個數組中同樣的元素。 示例: 2.解答 2.迴文數 1題目描述 示例 1: 示例 2: 示例 3 ...
  • 文/開源智造聯合創始人老楊 本文來自《OdooERP系統部署架構指南》的試讀章節。書籍尚未出版,請勿轉載。歡迎您反饋閱讀意見。 從web瀏覽器到PostgreSQL,多層與其他層交互以處理數據 單伺服器架構 易於理解和部署,這是最常見的情況。一個實例或多個實例 多伺服器架構 更難部署和維護,需要更高 ...
  • @Configuration : 配置類 == 配置文件,告訴Spring這是一個配置類@ComponentScan(value="com.atguigu",excludeFilters = { @Filter(type=FilterType.ANNOTATION,classes={Controll ...
  • [TOC] Outline clip_by_value relu clip_by_norm gradient clipping clip_by_value relu clip_by_norm 縮放時不改變梯度方向 gradient clipping Gradient Exploding or van ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...