RabbitMQ基礎教程之使用進階篇

来源:https://www.cnblogs.com/yihuihui/archive/2018/06/02/9127317.html
-Advertisement-
Play Games

RabbitMQ基礎教程之使用進階篇 相關博文,推薦查看: I. 背景 前一篇基本使用篇的博文中,介紹了rabbitmq的三種使用姿勢,可以知道如何向RabbitMQ發送消息以及如何消費,但遺留下幾個疑問,本篇則主要希望弄清楚這幾點 Exchange聲明的問題(是否必須聲明,如果不聲明會怎樣) Ex ...


RabbitMQ基礎教程之使用進階篇

相關博文,推薦查看:

  1. RabbitMq基礎教程之安裝與測試
  2. RabbitMq基礎教程之基本概念
  3. RabbitMQ基礎教程之基本使用篇

I. 背景

前一篇基本使用篇的博文中,介紹了rabbitmq的三種使用姿勢,可以知道如何向RabbitMQ發送消息以及如何消費,但遺留下幾個疑問,本篇則主要希望弄清楚這幾點

  • Exchange聲明的問題(是否必須聲明,如果不聲明會怎樣)
  • Exchange聲明的幾個參數(durable, autoDelete)有啥區別
  • 當沒有隊列和Exchange綁定時,直接往隊列中塞數據,好像不會有數據增加(即先塞數據,然後創建queue,建立綁定,從控制臺上看這個queue裡面也不會有數據)
  • 消息消費的兩種姿勢(一個主動去拿數據,一個是rabbit推數據)對比

II. 基本進階篇

1. Exchange預設場景

將前面的消息發送代碼撈出來,幹掉Exchange的聲明,如下

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //創建連接
        Connection connection = factory.newConnection();

        //創建消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);

        // 發佈消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

在發佈消息時,傳入的Exchange名為“”,再到控制台查看,發現數據被投遞到了(AMQP default)這個交換器,對應的截圖如下

  image

看一下上面的綁定描述內容,重點如下

  • 預設交換器選擇Direct策略
  • 將rountingKey綁定到同名的queue上
  • 不支持顯示的綁定和解綁

上面的代碼為了演示數據的流向,在發佈消息的同時也定義了一個同名的Queue,因此可以在控制臺上看到同名的 "hello" queue,且內部有20條數據

當我們去掉queue的聲明時,會發現另一個問題,投入的數據好像並沒有存下來(因為沒有queue來接收這些數據,而之後再聲明queue時,之前的數據也不會分配過來)

2. 綁定之後才有數據

首先是將控制臺中的hello這個queue刪掉,然後再次執行下麵的代碼(相對於前面的就是註釋了queue的聲明)

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //創建連接
        Connection connection = factory.newConnection();

        //創建消息通道
        Channel channel = connection.createChannel();
        //        channel.queueDeclare(queue, true, false, true, null);

        // 發佈消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

然後從控制臺上看,可以看到有數據寫入Exchange,但是沒有queue來接收這些數據

  IMAGE

然後開啟消費進程,然後再次執行上面的塞入數據,新後面重新塞入的數據可以被消費;但是之前塞入的數據則沒有,消費消息的代碼如下:

public class MyDefaultConsumer {
    public void consumerMsg(String queue) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //創建連接
        Connection connection = factory.newConnection();

        //創建消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    System.out.println(" [ " + queue + " ] Received '" + message);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 取消自動ack
        channel.basicConsume(queue, false, consumer);
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        MyDefaultConsumer consumer = new MyDefaultConsumer();
        consumer.consumerMsg("hello");

        Thread.sleep(1000 * 60 * 10);
    }
}

小結:

  • 通過上面的演示得知一點
  • 當沒有Queue綁定到Exchange時,往Exchange中寫入的消息也不會重新分發到之後綁定的queue上

3. Durable, autoDeleted參數

在定義Queue時,可以指定這兩個參數,這兩個參數的區別是什麼呢?

a. durable

持久化,保證RabbitMQ在退出或者crash等異常情況下數據沒有丟失,需要將queue,exchange和Message都持久化。

若是將queue的持久化標識durable設置為true,則代表是一個持久的隊列,那麼在服務重啟之後,也會存在,因為服務會把持久化的queue存放在硬碟上,當服務重啟的時候,會重新什麼之前被持久化的queue。隊列是可以被持久化,但是裡面的消息是否為持久化那還要看消息的持久化設置。也就是說,重啟之前那個queue裡面還沒有發出去的消息的話,重啟之後那隊列裡面是不是還存在原來的消息,這個就要取決於發生著在發送消息時對消息的設置

b. autoDeleted

自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列

這個比較容易演示了,當一個Queue被設置為自動刪除時,當消費者斷掉之後,queue會被刪除,這個主要針對的是一些不是特別重要的數據,不希望出現消息積累的情況

// 倒數第二個參數,true表示開啟自動刪除
// 正數第二個參數,true表示持久化
channel.queueDeclare(queue, true, false, true, null);

c. 小結

  • 當一個Queue已經聲明好了之後,不能更新durable或者autoDelted值;當需要修改時,需要先刪除再重新聲明
  • 消費的Queue聲明應該和投遞的Queue聲明的 durable,autoDelted屬性一致,否則會報錯
  • 對於重要的數據,一般設置 durable=true, autoDeleted=false
  • 對於設置 autoDeleted=true 的隊列,當沒有消費者之後,隊列會自動被刪除

4. ACK

執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中掛掉了。一旦RabbitMQ將消息分發給了消費者,就會從記憶體中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那麼我們應該將分發給它的任務交付給另一個消費者去處理。

為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。

因此手動ACK的常見手段

// 接收消息之後,主動ack/nak
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            System.out.println(" [ " + queue + " ] Received '" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
};

// 取消自動ack
channel.basicConsume(queue, false, consumer);

手動ack時,有個multiple,其含義表示:

可以理解為每個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄

III. 其他

1. 參考

Java Client API Guide

2. 一灰灰Blog: https://liuyueyi.github.io/hexblog

一灰灰的個人博客,記錄所有學習和工作中的博文,歡迎大家前去逛逛

3. 聲明

盡信書則不如,已上內容,純屬一家之言,因個人能力有限,難免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激

4. 掃描關註

  QrCode
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這是我自己複習總結的,分享給大家,有不足和需要修改的地方,望大家指正,謝謝。 https://pan.baidu.com/s/14GrfC34CmjpnlND8MiT0YA ...
  • python中字元串對象提供了很多方法來操作字元串,功能相當豐富。 這些方法的使用說明見 "官方文檔:string methods" ,本文對它們進行詳細解釋,各位以後可將本文當作手冊。 這裡沒有模式匹配(正則)相關的功能。python中要使用模式匹配相關的方法操作字元串,需要 導入re模塊。關於正 ...
  • 1. python在讀取文件時,read(),readline()和readlines()有什麼區別? 舉例說明: 2、使用一行代碼輸出[1, 4, 9, 16, 25, 36, 49, 64, 81, 100] 3、編寫一個遞歸函數 ...
  • 假設現在已經打包了一個文件(1233444333),要將這個文件傳輸給另一方: 其中的上傳數據模塊和下載模塊可以單獨進行分裝後使用。 結果: ...
  • 時間序列深度學習:狀態 LSTM 模型預測太陽黑子 本文翻譯自《Time Series Deep Learning: Forecasting Sunspots With Keras Stateful Lstm In R》 "原文鏈接" 由於數據科學機器學習和深度學習的發展,時間序列預測在預測準確性方 ...
  • 前言 本人在通過《C語言程式設計:現代方法(第2版)》自學C語言時,發現國內並沒有該書完整的課後習題答案,所以就想把自己在學習過程中所做出的答案分享出來,以供大家參考。這些答案是本人自己解答,並參考GitHub上相關的分享和Chegg.com相關資料。因為並沒有權威的答案來源,所以可能會存在錯誤的地 ...
  • Java的數組以及操作數組一:什麼是數組;二:如何使用 Java 中的數組;三:使用迴圈操作 Java 中的數組;四:使用 Arrays 類操作 Java 中的數組;五:使用 foreach 操作數組;六:Java 中的二維數組; ...
  • 在Spring生態中,JavaConfig 如何優雅的替換 XML ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...