深入解讀RabbitMQ工作原理及簡單使用

来源:https://www.cnblogs.com/vipstone/archive/2018/07/06/9275256.html
-Advertisement-
Play Games

RabbitMQ系列目錄 1. "RabbitMQ在Ubuntu上的環境搭建" 2. "深入解讀RabbitMQ工作原理及簡單使用" 3. Rabbit的幾種工作模式介紹與實踐 4. Rabbit事務與消息確認 5. Rabbit集群搭建 6. 使用HAProxy為RabbitMQ搭建負載均衡 7. ...


RabbitMQ系列目錄

  1. RabbitMQ在Ubuntu上的環境搭建
  2. 深入解讀RabbitMQ工作原理及簡單使用
  3. Rabbit的幾種工作模式介紹與實踐
  4. Rabbit事務與消息確認
  5. Rabbit集群搭建
  6. 使用HAProxy為RabbitMQ搭建負載均衡
  7. REST API控制Rabbit

RabbitMQ簡介

在介紹RabbitMQ之前實現要介紹一下MQ,MQ是什麼?

MQ全稱是Message Queue,可以理解為消息隊列的意思,簡單來說就是消息以管道的方式進行傳遞。

RabbitMQ是一個實現了AMQP(Advanced Message Queuing Protocol)高級消息隊列協議的消息隊列服務,用Erlang語言的。

使用場景

在我們秒殺搶購商品的時候,系統會提醒我們稍等排隊中,而不是像幾年前一樣頁面卡死或報錯給用戶。

像這種排隊結算就用到了消息隊列機制,放入通道裡面一個一個結算處理,而不是某個時間斷突然涌入大批量的查詢新增把資料庫給搞宕機,所以RabbitMQ本質上起到的作用就是削峰填谷,為業務保駕護航。

為什麼選擇RabbitMQ

現在的市面上有很多MQ可以選擇,比如ActiveMQ、ZeroMQ、Appche Qpid,那問題來了為什麼要選擇RabbitMQ?

  1. 除了Qpid,RabbitMQ是唯一一個實現了AMQP標準的消息伺服器;
  2. 可靠性,RabbitMQ的持久化支持,保證了消息的穩定性;
  3. 高併發,RabbitMQ使用了Erlang開發語言,Erlang是為電話交換機開發的語言,天生自帶高併發光環,和高可用特性;
  4. 集群部署簡單,正是應為Erlang使得RabbitMQ集群部署變的超級簡單;
  5. 社區活躍度高,根據網上資料來看,RabbitMQ也是首選;

工作機制

生產者、消費者和代理

在瞭解消息通訊之前首先要瞭解3個概念:生產者、消費者和代理。

生產者:消息的創建者,負責創建和推送數據到消息伺服器;

消費者:消息的接收方,用於處理數據和確認消息;

代理:就是RabbitMQ本身,用於扮演“快遞”的角色,本身不生產消息,只是扮演“快遞”的角色。

消息發送原理

首先你必須連接到Rabbit才能發佈和消費消息,那怎麼連接和發送消息的呢?

你的應用程式和Rabbit Server之間會創建一個TCP連接,一旦TCP打開,並通過了認證,認證就是你試圖連接Rabbit之前發送的Rabbit伺服器連接信息和用戶名和密碼,有點像程式連接資料庫,使用Java有兩種連接認證的方式,後面代碼會詳細介紹,一旦認證通過你的應用程式和Rabbit就創建了一條AMQP通道(Channel)。

通道是創建在“真實”TCP上的虛擬連接,AMQP命令都是通過通道發送出去的,每個通道都會有一個唯一的ID,不論是發佈消息,訂閱隊列或者介紹消息都是通過通道完成的。

為什麼不通過TCP直接發送命令?

對於操作系統來說創建和銷毀TCP會話是非常昂貴的開銷,假設高峰期每秒有成千上萬條連接,每個連接都要創建一條TCP會話,這就造成了TCP連接的巨大浪費,而且操作系統每秒能創建的TCP也是有限的,因此很快就會遇到系統瓶頸。

如果我們每個請求都使用一條TCP連接,既滿足了性能的需要,又能確保每個連接的私密性,這就是引入通道概念的原因。

你必須知道的Rabbit

想要真正的瞭解Rabbit有些名詞是你必須知道的。

包括:ConnectionFactory(連接管理器)、Channel(通道)、Exchange(交換器)、Queue(隊列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。

ConnectionFactory(連接管理器):應用程式與Rabbit之間建立連接的管理器,程式代碼中使用;

Channel(通道):消息推送使用的通道;

Exchange(交換器):用於接受、分配消息;

Queue(隊列):用於存儲生產者的消息;

RoutingKey(路由鍵):用於把生成者的數據分配到交換器上;

BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;

看到上面的解釋,最難理解的路由鍵和綁定鍵了,那麼他們具體怎麼發揮作用的,請看下圖:

關於更多交換器的信息,我們在後面再講。

消息持久化

Rabbit隊列和交換器有一個不可告人的秘密,就是預設情況下重啟伺服器會導致消息丟失,那麼怎麼保證Rabbit在重啟的時候不丟失呢?答案就是消息持久化。

當你把消息發送到Rabbit伺服器的時候,你需要選擇你是否要進行持久化,但這並不能保證Rabbit能從崩潰中恢復,想要Rabbit消息能恢復必須滿足3個條件:

  1. 投遞消息的時候durable設置為true,消息持久化;

  2. 消息已經到達持久化交換器上;

  3. 消息已經到達持久化的隊列;

持久化工作原理

Rabbit會將你的持久化消息寫入磁碟上的持久化日誌文件,等消息被消費之後,Rabbit會把這條消息標識為等待垃圾回收。

持久化的缺點

消息持久化的優點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬碟要比寫入記憶體性能較低很多,從而降低了伺服器的吞吐量,儘管使用SSD硬碟可以使事情得到緩解,但他仍然吸幹了Rabbit的性能,當消息成千上萬條要寫入磁碟的時候,性能是很低的。

所以使用者要根據自己的情況,選擇適合自己的方式。

虛擬主機

每個Rabbit都能創建很多vhost,我們稱之為虛擬主機,每個虛擬主機其實都是mini版的RabbitMQ,擁有自己的隊列,交換器和綁定,擁有自己的許可權機制。

環境搭建

前文我們已經介紹了Ubuntu搭建RabbitMQ的步驟:RabbitMQ在Ubuntu上的環境搭建

如果你是在Windows10上去安裝那就更簡單了,先放下載地址:

Erlang/Rabbit Server百度網盤鏈接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密碼:wct9

當然也可去Erlang和Rabbit官網去下,就是速度比較慢。我的百度雲Rabbit最新版本:3.7.6,Erlang版本:20.2,註意:不要下載最新的Erlang,在Windows10上打開擴展插件有問題,打不開。

  1. 安裝Erlang;

  2. 安裝Rabbit Server;

  3. 進入安裝目錄\sbin下,使用命令“rabbitmq-plugins enable rabbitmq_management”啟動網頁管理插件;

  4. 重啟Rabbit服務;

使用:http://localhost:15672進行測試,預設的登陸賬號為:guest,密碼為:guest

重覆安裝Rabbit Server的坑

如果不是第一次在Windows上安裝Rabbit Server一定要把Rabbit和Erlang卸載乾凈之後,找到註冊表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 刪除其下的所有項。

不然會出現Rabbit安裝之後啟動不了的情況,理論上卸載的順序也是先Rabbit在Erlang。

代碼實現

java版實現,使用maven項目,創建可以查看:MyEclipse2017破解設置與maven項目搭建

項目創建成功之後,添加Rabbit Client jar包,只需要在pom.xml裡面配置,如下信息:

 <dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.7.0</version>
</dependency>

java實現代碼分為兩個類,第一個是創建Rabbit連接,第二是應用類使用最簡單的方式發佈和消費消息。

Rabbit的連接,兩種方式:

方式一:

public static Connection GetRabbitConnection() {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(Config.UserName);
    factory.setPassword(Config.Password);
    factory.setVirtualHost(Config.VHost);
    factory.setHost(Config.Host);
    factory.setPort(Config.Port);
    Connection conn = null;
    try {
        conn = factory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return conn;
}

方式二:

public static Connection GetRabbitConnection2() {
    ConnectionFactory factory = new ConnectionFactory();
    // 連接格式:amqp://userName:password@hostName:portNumber/virtualHost
    String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,
            Config.VHost);
    Connection conn = null;
    try {
        factory.setUri(uri);
        factory.setVirtualHost(Config.VHost);
        conn = factory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return conn;
}

第二部分:應用類,使用最簡單的方式發佈和消費消息

public static void main(String[] args) {
    Publisher(); // 推送消息

    Consumer(); // 消費消息
}

/**
 * 推送消息
 */
public static void Publisher() {
    // 創建一個連接
    Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
    if (conn != null) {
        try {
            // 創建通道
            Channel channel = conn.createChannel();
            // 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接時是否刪除隊列;參數五:消息其他參數】
            channel.queueDeclare(Config.QueueName, false, false, false, null);
            String content = String.format("當前時間:%s", new Date().getTime());
            // 發送內容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其他屬性;參數四:消息主體】
            channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
            System.out.println("已發送消息:" + content);
            // 關閉連接
            channel.close();
            conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消費消息
 */
public static void Consumer() {
    // 創建一個連接
    Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
    if (conn != null) {
        try {
            // 創建通道
            Channel channel = conn.createChannel();
            // 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接時是否刪除隊列;參數五:消息其他參數】
            channel.queueDeclare(Config.QueueName, false, false, false, null);

            // 創建訂閱器,並接受消息
            channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey(); // 隊列名稱
                    String contentType = properties.getContentType(); // 內容類型
                    String content = new String(body, "utf-8"); // 消息正文
                    System.out.println("消息正文:" + content);
                    channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量確認小於index的消息】
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

代碼裡面已經寫了很詳細的註釋,在這裡也不過多的介紹了。

執行效果,如圖:


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • pillow介紹 一、Image類的屬性:1、Format 2、Mode 3、Size 4、Palette 5、Info 二、類的函數:1、New 2、Open 3、Blend 4、Composite 5、Eval 6、Frombuffer 7、Fromstring 8、Merge 三、Image類 ...
  • 效果: 我沒有弄文件夾保存,因為皮膚與英雄都是一一對應,這樣子更加方便操作。 點擊下載皮膚後,會自動從官網下載一個json文件,所以出了新英雄、新皮膚軟體會自動更新。高清圖: 但是有個別新皮膚官網也沒有提供數據,找不到新皮膚下載的選擇項時,點擊影藏皮膚獲取按鈕輸入英雄名字,再點擊隱藏皮膚下載即可。 ...
  • 五、異常 異常概念總結: 練習一:異常的體系 問題: 1. 請描述異常的繼承體系 2. 請描述你對錯誤(Error)的理解 3. 請描述你對異常(Expection的理解) 4. 請描述你對運行時異常(RuntimeException)的理解 答: 1. 異常繼承體係為:異常的根類是 java.la ...
  • 對於CPU密集型的程式,可以使用multiprocessing的Process,Pool等封裝好的類,通過多進程的方式實現並行計算。但是因為進程中的通信成本比較大,對於進程之間需要大量數據交互的程式效率未必有大的提高。 4、 針對迴圈的優化 每種編程語言都會強調需要優化迴圈。當使用Python的時候 ...
  • 2018-07-06 21:06:16 ...
  • 我們之前學習創建線程有Thread和Runnable兩種方式,但是兩種方式都無法獲得執行的結果。 而Callable和Future在任務完成後得到結果。 Future是一個介面,表示一個任務的周期,並提供了相應的方法來判斷是否已經完成或者取消任務,以及獲取任務的結果和取消任務。 FutureTask ...
  • 數組是一個變數,存儲相同數據類型的一組數據 聲明一個變數就是在記憶體空間划出一塊合適的空間 聲明一個數組就是在記憶體空間划出一串連續的空間 數組長度固定不變,避免數組越界 數組是靜態分配記憶體空間的,所有元素存放在一組連續的存儲單元中,一旦分配,不可更改,不便於擴展, 數組便於查找和修改,不利於插入和刪除 ...
  • 先看一下項目圖,基本就理解了整合的內容 這次主角不再是Mybats的配置文件SqlMapConfig.xml了,而是Spring的applicationContext.xml applicationContext.xml Spring同樣開始整合數據源datasource IoC控制反轉生成sqlS ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...