RabbitMQ-交換機模式

来源:https://www.cnblogs.com/0000cjw/archive/2019/11/10/11831898.html

在說正題之前先解釋一下交換機模式是個籠統的稱呼,它不是一個單獨的模式(包括了訂閱模式,路由模式和主題模式),交換機模式是一個比較常用的模式,主要是為了實現數據的同步。 首先,說一下訂閱模式,就和字面上的意思差不多主要就是一個生產者,多個消費者,同一個消息被多個消費者獲取,先看一下官網的圖示 整體執行 ...


      在說正題之前先解釋一下交換機模式是個籠統的稱呼,它不是一個單獨的模式(包括了訂閱模式,路由模式和主題模式),交換機模式是一個比較常用的模式,主要是為了實現數據的同步。

      首先,說一下訂閱模式,就和字面上的意思差不多主要就是一個生產者,多個消費者,同一個消息被多個消費者獲取,先看一下官網的圖示

 

       整體執行過程就和圖裡一樣,生產者把消息發送到交換機,然後隊列綁定到交換機,消息由交換機發送到隊列,每一個隊列都有一個各自的消費者。這樣

就實現了一個消息被多個消費者所獲取,而且如果有新的消費者加入直接綁定隊列到交換機就可以了,大大的降低了系統間的耦合度。還有一點要註意的就是

當我們把消息發送到一個沒有隊列綁定的交換機時,消息就會丟失,因為消息只能存儲在隊列,而交換機只做交換,不做存儲!

 生產者代碼:  

public class Send {

    private final static String EXCHANGE_NAME = "exchange_name"; //交換機名稱

    public static void main(String[] argv) throws Exception {
        // 獲取MQ連接和通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息內容
        String message = "生產者消息";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" 發送 '" + message + "'");

        channel.close();
        connection.close();
    }
}

 

 

 消費者一號代碼:

public class Recv {

    private final static String QUEUE_NAME = "test_queue_ex";

    private final static String EXCHANGE_NAME = "exchange_name";

    public static void main(String[] argv) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);
        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 消費者一號 '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者二號代碼:

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_ex2";

    private final static String EXCHANGE_NAME = "exchange_name";

    public static void main(String[] argv) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消費者二號 '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

運行代碼之後可以看到

 

 生產者發送的消息已經存儲在了交換機之中。查看綁定關係如下圖所示:

 

 所以,可以得出結論一個消息被多個消費者所消費。訂閱模式也存在著缺陷有時並不是所有數據都需要同步,所以用訂閱模式來做數據同步並不合理。於是就用到了路由模式。

 

 路由模式

 官網圖示如下:

 

 和訂閱模式比較類似,只是type變成了direct類型,路由模式也是先由生產者發送消息到交換機,然後在根據綁定鍵來判斷消息發送到哪一個交換機。如下圖:

 

 和訂閱模式的區別就是生產者發送消息時要先聲明消息的類型,也就是說消息會被哪類消費者所獲取

 

 

    消費者和生產者保持一個類型的時候,就可以接收到對應生產者所發送的消息了。從而可以過濾掉不需要的消息類型。

主題模式

   主題模式個人感覺就和sql語句里的like關鍵字一樣,不用保證消息類型一樣,只要保證其相似就可以接收消息了,相比於路由模式,

主題模式匹配率比較低,但是功能確提高了很多,減少了路由key的創建,如圖所示:

 

 type變成了topic類型,至於其他方面和路由模式一樣就不多說了。

 


您的分享是我們最大的動力!

更多相關文章
  • 之前我們已經講解了 Nginx 的基礎內容,接下來我們開始介紹 Nginx 的架構基礎。 為什麼我們要討論 Nginx 的架構基礎? 因為 Nginx 運行在企業內網的最外層也就是邊緣節點,那麼他處理的的流量是其他應用伺服器處理流量的數倍,甚至幾個數量級,我們知道任何一種問題在不同的數量級下,他的解 ...
  • 過早的優化是萬惡之源。而在真正遇到瓶頸的時候,pprof 可以快速定位到需要優化的地方。 ...
  • 垃圾收集GC(Garbage Collection)是Java語言的核心技術之一, 在Java中,程式員不需要去關心記憶體動態分配和垃圾回收的問題,這一切都交給了JVM來處理。 一. jvm的記憶體結構 垃圾回收都是基於記憶體去回收的,因此,先要對記憶體結構有一個大概的瞭解 Java記憶體運行時區域大概分了三 ...
  • 背景 之前做的海量數據數據展示,在預處理速度和線上渲染上還有有所欠缺,本文中進行一些優化工作,使得九分鐘處理完一千多萬面數據的3 12級矢量切片,線上瀏覽數據請求時間控制在10s左右。 準備 軟體環境:PostGIS(3.0.0rc2 r17909)和 PostgreSQL( 12.0, compi ...
  • Eclipse部署多模塊項目到tomcat,啟動時找不到jar的解決方法。 ...
  • 在使用redis時,一般會設置一個過期時間,當然也有不設置過期時間的,也就是永久不過期。當設置了過期時間,redis是如何判斷是否過期,以及根據什麼策略來進行刪除的。 設置過期時間 expire key time(以秒為單位) 這是最常用的方式setex(String key, int second ...
  • scp是secure copy的簡寫,用於在Linux下進行遠程拷貝文件的命令,和它類似的命令有cp,不過cp只是在本機進行拷貝不能跨伺服器,而且scp傳輸是加密的。可能會稍微影響一下速度。當你伺服器硬碟變為只讀 read only system時,用scp可以幫你把文件移出來。另外,scp還非常不 ...
  • 你可能想創建一個在應用的任何地方都可以訪問的函數,這個教程將幫你實現 👏 很多教程都會說,你在 composer.json 這個文件中通過添加一個自動載入的文件,就可以實現這個需求。但我認為這不是一個好的方式,當你在 helpers.php 文件中添加了更多的函數時,可讀性將變得很差。 下麵我將介 ...
一周排行
  • 前言 上一篇文章介紹IOptions的註冊,本章我們繼續往下看 IOptions IOptions是一個介面裡面只有一個Values屬性,該介面通過OptionsManager實現 OptionsManager OptionsManager實現了IOptions和IOptionsSnapshot,他 ...
  • 在 EF 里有個 `ShadowProperty` (陰影屬性/影子屬性)的概念,你可以通過 FluentAPI 的方式來定義一個不在 .NET model 里定義的屬性,只能通過 EF 里的 `Change Tracker` 來操作這種屬性。 在導出 Excel 的時候,可能希望導出的列並不... ...
  • 使用NPOI操作Excel,無需Office COM組件 部分代碼來自於:https://docs.microsoft.com/zh-tw/previous-versions/ee818993(v=msdn.10)?redirectedfrom=MSDN using System.Data; usi ...
  • Spire.Cloud.Word.Sdk提供了介面SetBackgroudColor()、SetBackgroudImage()、DeleteBackground()、GetBackgroudColor()用於設置、刪除及讀取Word文檔背景。本文將以C#程式為例演示如何來調用API介面實現以上內容 ...
  • 說明:在同一視窗打開鏈接,只要稍加改造就可以實現,這裡實現的是在新Tab頁打開鏈接,並且支持帶type="POST" target="_blank"的鏈接 github和bitbucket上相關問題: 1、WPF empty POST data when using custom popup htt ...
  • 前言 公司項目需要做個畫線縮放,我司稱之為瞳距縮放,簡而言之就是:2張圖,從第一張圖畫一條線,再從第二個圖畫一條線,第二條線以第一條為基準,延長到一致的長度,並同比縮放圖片;文字太枯燥,請先實例圖 例子1:以皮卡丘為例,我要把路飛的拳頭縮放到皮卡丘頭那麼大 例子2:以皮卡丘的基準,縮小路飛,與其身高 ...
  • 9月份的時候,微軟宣佈正式發佈C 8.0,作為.NET Core 3.0發行版的一部分。C 8.0的新特性之一就是預設介面實現。在本文中,我們將一起來聊聊預設介面實現。 作者:依樂祝 原文鏈接:https://www.cnblogs.com/yilezhu/p/12034584.html 提前說下: ...
  • 對於地圖坐標偏移,以leaflet為例,有如下解決辦法 方法1、修改leaflet源碼,解決地圖坐標偏移問題 方法2、將點位真實的經緯度經過偏移演算法,添加到加密的地圖上 方法3、直接對離線地圖瓦片進行糾偏 方法1需要修改源碼 方法2有缺陷,地圖依然是偏移的,如果把地圖經緯度顯示出來,經緯度也是不對的 ...
  • 引用類庫 1.Install-Package Microsoft.Extensions.Caching.Memory MemoryCacheOptions 緩存配置 1.ExpirationScanFrequency 獲取或設置對過期項的連續掃描之間的最短時間間隔 2.SizeLimit 緩存是沒有 ...
  • 原文:https://blogs.msdn.microsoft.com/mazhou/2017/12/12/c-7-series-part-7-ref-returns/ 背景 有兩種方法可以將一個值傳遞給一個方法: 例如,FCL(.NET Framework Class Library)中的Arra ...
x